Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 55971 invoked from network); 17 Jan 2008 18:12:07 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 17 Jan 2008 18:12:07 -0000 Received: (qmail 59215 invoked by uid 500); 17 Jan 2008 18:11:57 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 59179 invoked by uid 500); 17 Jan 2008 18:11:57 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 59168 invoked by uid 99); 17 Jan 2008 18:11:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jan 2008 10:11:57 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jan 2008 18:11:39 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5CF1F1A983E; Thu, 17 Jan 2008 10:11:45 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r612903 [2/2] - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/dfs/ Date: Thu, 17 Jan 2008 18:11:42 -0000 To: hadoop-commits@lucene.apache.org From: dhruba@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080117181145.5CF1F1A983E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Thu Jan 17 10:11:35 2008 @@ -465,6 +465,19 @@ protected static File getMetaFile( File f ) { return new File( f.getAbsolutePath() + METADATA_EXTENSION ); } + + static class ActiveFile { + File file; + List threads = new ArrayList(2); + + ActiveFile(File f, List list) { + file = f; + if (list != null) { + threads.addAll(list); + } + threads.add(Thread.currentThread()); + } + } protected File getMetaFile(Block b) throws IOException { File blockFile = getBlockFile( b ); @@ -487,7 +500,7 @@ } FSVolumeSet volumes; - private HashMap ongoingCreates = new HashMap(); + private HashMap ongoingCreates = new HashMap(); private int maxBlocksPerDir = 0; private HashMap volumeMap = null; private HashMap blockMap = null; @@ -570,20 +583,34 @@ } BlockWriteStreams createBlockWriteStreams( File f ) throws IOException { - return new BlockWriteStreams(new FileOutputStream(f), - new FileOutputStream( getMetaFile( f ) )); + return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()), + new FileOutputStream( new RandomAccessFile( getMetaFile( f ) , "rw" ).getFD() )); } /** * Start writing to a block file + * If isRecovery is true and the block pre-exists, then we kill all + volumeMap.put(b, v); + volumeMap.put(b, v); + * other threads that might be writing to this block, and then reopen the file. */ - public BlockWriteStreams writeToBlock(Block b) throws IOException { + public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException { // // Make sure the block isn't a valid one - we're still creating it! // if (isValidBlock(b)) { - throw new IOException("Block " + b + " is valid, and cannot be written to."); + if (!isRecovery) { + throw new IOException("Block " + b + " is valid, and cannot be written to."); + } + // If the block was succesfully finalized because all packets + // were successfully processed at the Datanode but the ack for + // some of the packets were not received by the client. The client + // re-opens the connection and retries sending those packets. The + // client will now fail because this datanode has no way of + // unfinalizing this block. + // + throw new IOException("Reopen Block " + b + " is valid, and cannot be written to."); } long blockSize = b.getNumBytes(); @@ -591,33 +618,56 @@ // Serialize access to /tmp, and check if file already there. // File f = null; + List threads = null; synchronized (this) { // // Is it already in the create process? // - if (ongoingCreates.containsKey(b)) { - // check how old is the temp file - wait 1 hour - File tmp = ongoingCreates.get(b); - if ((System.currentTimeMillis() - tmp.lastModified()) < - blockWriteTimeout) { - throw new IOException("Block " + b + - " has already been started (though not completed), and thus cannot be created."); + ActiveFile activeFile = ongoingCreates.get(b); + if (activeFile != null) { + f = activeFile.file; + threads = activeFile.threads; + + if (!isRecovery) { + // check how old is the temp file - wait 1 hour + if ((System.currentTimeMillis() - f.lastModified()) < + blockWriteTimeout) { + throw new IOException("Block " + b + + " has already been started (though not completed), and thus cannot be created."); + } else { + // stale temp file - remove + if (!f.delete()) { + throw new IOException("Can't write the block - unable to remove stale temp file " + f); + } + f = null; + } } else { - // stale temp file - remove - if (!tmp.delete()) { - throw new IOException("Can't write the block - unable to remove stale temp file " + tmp); + for (Thread thread:threads) { + thread.interrupt(); } - ongoingCreates.remove(b); } + ongoingCreates.remove(b); } FSVolume v = null; - synchronized (volumes) { - v = volumes.getNextVolume(blockSize); - // create temporary file to hold block in the designated volume - f = createTmpFile(v, b); + if (!isRecovery) { + synchronized (volumes) { + v = volumes.getNextVolume(blockSize); + // create temporary file to hold block in the designated volume + f = createTmpFile(v, b); + } + volumeMap.put(b, v); } - ongoingCreates.put(b, f); - volumeMap.put(b, v); + ongoingCreates.put(b, new ActiveFile(f, threads)); + } + + try { + if (threads != null) { + for (Thread thread:threads) { + thread.join(); + } + } + } catch (InterruptedException e) { + throw new IOException("Recovery waiting for thread interrupted."); } // @@ -628,6 +678,29 @@ return createBlockWriteStreams( f ); } + /** + * Retrieves the offset in the block to which the + * the next write will write data to. + */ + public long getChannelPosition(Block b, BlockWriteStreams streams) + throws IOException { + FileOutputStream file = (FileOutputStream) streams.dataOut; + return file.getChannel().position(); + } + + /** + * Sets the offset in the block to which the + * the next write will write data to. + */ + public void setChannelPosition(Block b, BlockWriteStreams streams, + long dataOffset, long ckOffset) + throws IOException { + FileOutputStream file = (FileOutputStream) streams.dataOut; + file.getChannel().position(dataOffset); + file = (FileOutputStream) streams.checksumOut; + file.getChannel().position(ckOffset); + } + File createTmpFile( FSVolume vol, Block blk ) throws IOException { if ( vol == null ) { synchronized ( this ) { @@ -654,7 +727,7 @@ * Complete the block write! */ public synchronized void finalizeBlock(Block b) throws IOException { - File f = ongoingCreates.get(b); + File f = ongoingCreates.get(b).file; if (f == null || !f.exists()) { throw new IOException("No temporary file " + f + " for block " + b); } @@ -669,6 +742,15 @@ } /** + * Remove the temporary block file (if any) + */ + public synchronized void unfinalizeBlock(Block b) throws IOException { + ongoingCreates.remove(b); + volumeMap.remove(b); + DataNode.LOG.warn("Block " + b + " unfinalized and removed. " ); + } + + /** * Return a table of block data */ public Block[] getBlockReport() { @@ -773,6 +855,10 @@ public String toString() { return "FSDataset{dirpath='"+volumes+"'}"; + } + + public long getBlockSize(Block b) { + return blockMap.get(b).length(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java Thu Jan 17 10:11:35 2008 @@ -147,11 +147,12 @@ /** * Creates the block and returns output streams to write data and CRC * @param b + * @param isRecovery True if this is part of erro recovery, otherwise false * @return a BlockWriteStreams object to allow writing the block data * and CRC * @throws IOException */ - public BlockWriteStreams writeToBlock(Block b) throws IOException; + public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException; /** * Finalizes the block previously opened for writing using writeToBlock. @@ -163,6 +164,14 @@ public void finalizeBlock(Block b) throws IOException; /** + * Unfinalizes the block previously opened for writing using writeToBlock. + * The temporary file associated with this block is deleted. + * @param b + * @throws IOException + */ + public void unfinalizeBlock(Block b) throws IOException; + + /** * Returns the block report - the full list of blocks stored * @return - the block report - the full list of blocks stored */ @@ -192,5 +201,28 @@ * Stringifies the name of the storage */ public String toString(); + + /** + * Returns the current offset in the data stream. + * @param b + * @param stream The stream to the data file and checksum file + * @return the position of the file pointer in the data stream + * @throws IOException + */ + public long getChannelPosition(Block b, BlockWriteStreams stream) throws IOException; + + /** + * Sets the file pointer of the data stream and checksum stream to + * the specified values. + * @param b + * @param stream The stream for the data file and checksum file + * @param dataOffset The position to which the file pointre for the data stream + * should be set + * @param ckOffset The position to which the file pointre for the checksum stream + * should be set + * @throws IOException + */ + public void setChannelPosition(Block b, BlockWriteStreams stream, long dataOffset, + long ckOffset) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jan 17 10:11:35 2008 @@ -2430,12 +2430,13 @@ // if (!isInSafeMode()) { NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: " - +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName()); + +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName()+" size "+block.getNumBytes()); } } else { NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: " + "Redundant addStoredBlock request received for " - + block.getBlockName() + " on " + node.getName()); + + block.getBlockName() + " on " + node.getName() + + " size " + block.getNumBytes()); } // Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Thu Jan 17 10:11:35 2008 @@ -23,6 +23,8 @@ * buffers output through a {@link BufferedOutputStream} and creates a checksum * file. */ public class FSDataOutputStream extends DataOutputStream { + private OutputStream wrappedStream; + private static class PositionCache extends FilterOutputStream { long position; @@ -53,6 +55,7 @@ public FSDataOutputStream(OutputStream out) throws IOException { super(new PositionCache(out)); + wrappedStream = out; } public long getPos() throws IOException { @@ -62,5 +65,10 @@ public void close() throws IOException { flush(); out.close(); + } + + // Returns the underlying output stream. This is used by unit tests. + public OutputStream getWrappedStream() { + return wrappedStream; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Daemon.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Daemon.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Daemon.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Daemon.java Thu Jan 17 10:11:35 2008 @@ -38,6 +38,13 @@ this.setName(((Object)runnable).toString()); } + /** Construct a daemon thread to be part of a specified thread group. */ + public Daemon(ThreadGroup group, Runnable runnable) { + super(group, runnable); + this.runnable = runnable; + this.setName(((Object)runnable).toString()); + } + public Runnable getRunnable() { return runnable; } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu Jan 17 10:11:35 2008 @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.dfs.FSConstants.DatanodeReportType; @@ -37,9 +38,24 @@ */ public class MiniDFSCluster { + private class DataNodeProperties { + DataNode datanode; + Configuration conf; + String[] dnArgs; + + DataNodeProperties(DataNode node, Configuration conf, String[] args) { + this.datanode = node; + this.conf = conf; + this.dnArgs = args; + } + } + private Configuration conf; private NameNode nameNode; - private ArrayList dataNodes = new ArrayList(); + private int numDataNodes; + private int curDatanodesNum = 0; + private ArrayList dataNodes = + new ArrayList(); private File base_dir; private File data_dir; @@ -214,7 +230,7 @@ * * @throws IllegalStateException if NameNode has been shutdown */ - public void startDataNodes(Configuration conf, int numDataNodes, + public synchronized void startDataNodes(Configuration conf, int numDataNodes, boolean manageDfsDirs, StartupOption operation, String[] racks, long[] simulatedCapacities) throws IOException { @@ -255,7 +271,6 @@ null : new String[] {"-"+operation.toString()}; String [] dnArgs = (operation == StartupOption.UPGRADE) ? null : args; - int curDatanodesNum = dataNodes.size(); for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { Configuration dnConf = new Configuration(conf); if (manageDfsDirs) { @@ -279,8 +294,13 @@ } System.out.println("Starting DataNode " + i + " with dfs.data.dir: " + dnConf.get("dfs.data.dir")); - dataNodes.add(DataNode.createDataNode(dnArgs, dnConf)); + Configuration newconf = new Configuration(dnConf); // save config + dataNodes.add(new DataNodeProperties( + DataNode.createDataNode(dnArgs, dnConf), + newconf, dnArgs)); } + curDatanodesNum += numDataNodes; + this.numDataNodes += numDataNodes; } @@ -334,7 +354,12 @@ * Gets a list of the started DataNodes. May be empty. */ public ArrayList getDataNodes() { - return dataNodes; + ArrayList list = new ArrayList(); + for (int i = 0; i < dataNodes.size(); i++) { + DataNode node = dataNodes.get(i).datanode; + list.add(node); + } + return list; } /** @@ -365,9 +390,67 @@ public void shutdownDataNodes() { for (int i = dataNodes.size()-1; i >= 0; i--) { System.out.println("Shutting down DataNode " + i); - DataNode dn = dataNodes.remove(i); + DataNode dn = dataNodes.remove(i).datanode; dn.shutdown(); + numDataNodes--; + } + } + + /* + * Shutdown a particular datanode + */ + boolean stopDataNode(int i) { + if (i < 0 || i >= dataNodes.size()) { + return false; + } + DataNode dn = dataNodes.remove(i).datanode; + System.out.println("MiniDFSCluster Stopping DataNode " + + dn.dnRegistration.getName() + + " from a total of " + (dataNodes.size() + 1) + + " datanodes."); + dn.shutdown(); + numDataNodes--; + return true; + } + + /* + * Restart a particular datanode + */ + synchronized boolean restartDataNode(int i) throws IOException { + if (i < 0 || i >= dataNodes.size()) { + return false; + } + DataNodeProperties dnprop = dataNodes.remove(i); + DataNode dn = dnprop.datanode; + Configuration conf = dnprop.conf; + String[] args = dnprop.dnArgs; + System.out.println("MiniDFSCluster Restart DataNode " + + dn.dnRegistration.getName() + + " from a total of " + (dataNodes.size() + 1) + + " datanodes."); + dn.shutdown(); + + // recreate new datanode with the same configuration as the one + // that was stopped. + Configuration newconf = new Configuration(conf); // save cloned config + dataNodes.add(new DataNodeProperties( + DataNode.createDataNode(args, conf), + newconf, args)); + return true; + } + + /* + * Shutdown a datanode by name. + */ + synchronized boolean stopDataNode(String name) { + int i; + for (i = 0; i < dataNodes.size(); i++) { + DataNode dn = dataNodes.get(i).datanode; + if (dn.dnRegistration.getName().equals(name)) { + break; + } } + return stopDataNode(i); } /** @@ -423,7 +506,7 @@ // make sure all datanodes are alive while( client.datanodeReport(DatanodeReportType.LIVE).length - != dataNodes.size()) { + != numDataNodes) { try { Thread.sleep(500); } catch (Exception e) { @@ -450,7 +533,7 @@ if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } - return dataNodes.get(dataNodeIndex).getFSDataset().getBlockReport(); + return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport(); } @@ -482,13 +565,13 @@ if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } - FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).getFSDataset(); + FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset(); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet; sdataset.injectBlocks(blocksToInject); - dataNodes.get(dataNodeIndex).scheduleBlockReport(0); + dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0); } /** Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java Thu Jan 17 10:11:35 2008 @@ -97,6 +97,14 @@ return theBlock.len; } } + + synchronized void setlength(long length) { + if (!finalized) { + oStream.setLength(length); + } else { + theBlock.len = length; + } + } synchronized SimulatedInputStream getIStream() throws IOException { if (!finalized) { @@ -145,6 +153,10 @@ SimulatedInputStream getMetaIStream() { return new SimulatedInputStream(nullCrcFileData); } + + synchronized boolean isFinalized() { + return finalized; + } } static private class SimulatedStorage { @@ -234,6 +246,10 @@ } + public synchronized void unfinalizeBlock(Block b) throws IOException { + blockMap.remove(b); + } + public synchronized Block[] getBlockReport() { Block[] blockTable = new Block[blockMap.size()]; int i = 0; @@ -287,14 +303,20 @@ } public synchronized boolean isValidBlock(Block b) { - return (blockMap.containsKey(b)); + // return (blockMap.containsKey(b)); + BInfo binfo = blockMap.get(b); + if (binfo == null) { + return false; + } + return binfo.isFinalized(); } public String toString() { return "Simulated FSDataset"; } - public synchronized BlockWriteStreams writeToBlock(Block b) + public synchronized BlockWriteStreams writeToBlock(Block b, + boolean isRecovery) throws IOException { if (isValidBlock(b)) { throw new IOException("Block " + b + @@ -374,8 +396,27 @@ public void checkDataDir() throws DiskErrorException { // nothing to check for simulated data set } - - + + public synchronized long getChannelPosition(Block b, + BlockWriteStreams stream) + throws IOException { + BInfo binfo = blockMap.get(b); + if (binfo == null) { + throw new IOException("No such Block " + b ); + } + return binfo.getlength(); + } + + public synchronized void setChannelPosition(Block b, BlockWriteStreams stream, + long dataOffset, long ckOffset) + throws IOException { + BInfo binfo = blockMap.get(b); + if (binfo == null) { + throw new IOException("No such Block " + b ); + } + binfo.setlength(dataOffset); + } + /** * Simulated input and output streams * @@ -470,10 +511,16 @@ /** * - * @return the lenght of the data created so far. + * @return the length of the data created so far. */ long getLength() { return length; + } + + /** + */ + void setLength(long length) { + this.length = length; } @Override Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Thu Jan 17 10:11:35 2008 @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.dfs.DFSClient.DFSDataInputStream; @@ -46,11 +47,13 @@ DatanodeID datanode; InetSocketAddress dnAddr; - byte[] sendBuf = new byte[128]; - byte[] recvBuf = new byte[128]; - ByteBuffer byteBuf = ByteBuffer.wrap(sendBuf); - ByteBuffer recvByteBuf = ByteBuffer.wrap(recvBuf); - + ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128); + DataOutputStream sendOut = new DataOutputStream(sendBuf); + // byte[] recvBuf = new byte[128]; + // ByteBuffer recvByteBuf = ByteBuffer.wrap(recvBuf); + ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128); + DataOutputStream recvOut = new DataOutputStream(recvBuf); + private void sendRecvData(String testDescription, boolean eofExpected) throws IOException { /* Opens a socket to datanode @@ -73,10 +76,10 @@ OutputStream out = sock.getOutputStream(); // Should we excuse - out.write(sendBuf, 0, byteBuf.position()); - byte[] retBuf = new byte[recvByteBuf.position()]; + byte[] retBuf = new byte[recvBuf.size()]; DataInputStream in = new DataInputStream(sock.getInputStream()); + out.write(sendBuf.toByteArray()); try { in.readFully(retBuf); } catch (EOFException eof) { @@ -86,6 +89,10 @@ } throw eof; } + for (int i=0; i dinfo = dis.getAllBlocks(); + int num = 0; + DatanodeInfo[] status = null; + + for (LocatedBlock blk : dinfo) { // for each block + int hasdown = 0; + DatanodeInfo[] nodes = blk.getLocations(); + for (int j = 0; j < nodes.length; j++) { // for each replica + System.out.println("Block " + blk.getBlock() + " replica " + + nodes[j].getName()); + } + if (blockNumber == num) { + status = nodes; + } + num++; + } + return status; + } + + // + // verify that the data written are sane + // + private void checkFile(FileSystem fileSys, Path name, int repl, + int numblocks, int filesize, long seed) + throws IOException { + boolean done = false; + int attempt = 0; + + long len = fileSys.getFileStatus(name).getLen(); + assertTrue(name + " should be of size " + filesize + + " but found to be of size " + len, + len == filesize); + + // wait till all full blocks are confirmed by the datanodes. + while (!done) { + attempt++; + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + done = true; + String[][] locations = fileSys.getFileCacheHints(name, 0, filesize); + if (locations.length < numblocks) { + if (attempt > 100) { + System.out.println("File " + name + " has only " + + locations.length + " blocks, " + + " but is expected to have " + numblocks + + " blocks."); + } + done = false; + continue; + } + for (int idx = 0; idx < locations.length; idx++) { + if (locations[idx].length < repl) { + if (attempt > 100) { + System.out.println("File " + name + " has " + + locations.length + " blocks: " + + " The " + idx + " block has only " + + locations[idx].length + " replicas " + + " but is expected to have " + repl + + " replicas."); + } + done = false; + break; + } + } + } + FSDataInputStream stm = fileSys.open(name); + byte[] expected = new byte[filesize]; + Random rand = new Random(seed); + rand.nextBytes(expected); + // do a sanity check. Read the file + byte[] actual = new byte[filesize]; + stm.readFully(0, actual); + checkData(actual, 0, expected, "Read 1"); + } + + private void checkData(byte[] actual, int from, byte[] expected, String message) { + for (int idx = 0; idx < actual.length; idx++) { + this.assertEquals(message+" byte "+(from+idx)+" differs. expected "+ + expected[from+idx]+" actual "+actual[idx], + actual[idx], expected[from+idx]); + actual[idx] = 0; + } + } + + /** + * A class that kills one datanode and recreates a new one. It waits to + * ensure that that all workers have finished at least one file since the + * last kill of a datanode. This guarantees that all three replicas of + * a block do not get killed (otherwise the file will be corrupt and the + * test will fail). + */ + class Modify extends Thread { + Random rand; + volatile boolean running; + MiniDFSCluster cluster; + Configuration conf; + + Modify(Configuration conf, MiniDFSCluster cluster) { + rand = new Random(); + running = true; + this.cluster = cluster; + this.conf = conf; + } + + public void run() { + + while (running) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + continue; + } + + // check if all threads have a new stamp. + // If so, then all workers have finished at least one file + // since the last stamp. + boolean loop = false; + for (int i = 0; i < numThreads; i++) { + if (workload[i].getStamp() == 0) { + loop = true; + break; + } + } + if (loop) { + continue; + } + + // Now it is guaranteed that there will be at least one valid + // replica of a file. + + for (int i = 0; i < replication - 1; i++) { + // pick a random datanode to shutdown + int victim = rand.nextInt(numDatanodes); + try { + System.out.println("Stopping datanode " + victim); + cluster.restartDataNode(victim); + // cluster.startDataNodes(conf, 1, true, null, null); + } catch (IOException e) { + System.out.println("TestDatanodeDeath Modify exception " + e); + assertTrue("TestDatanodeDeath Modify exception " + e, false); + running = false; + } + } + + // set a new stamp for all workers + for (int i = 0; i < numThreads; i++) { + workload[i].resetStamp(); + } + } + } + + // Make the thread exit. + void close() { + running = false; + this.interrupt(); + } + } + + /** + * Test that writing to files is good even when datanodes in the pipeline + * dies. + */ + private void complexTest() throws IOException { + Configuration conf = new Configuration(); + conf.setInt("heartbeat.recheck.interval", 2000); + conf.setInt("dfs.heartbeat.interval", 1); + conf.setInt("dfs.replication.pending.timeout.sec", 2); + conf.setInt("dfs.socket.timeout", 5000); + MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Modify modThread = null; + + try { + + // Create threads and make them run workload concurrently. + workload = new Workload[numThreads]; + for (int i = 0; i < numThreads; i++) { + workload[i] = new Workload(fs, i, numberOfFiles, replication, 0); + workload[i].start(); + } + + // Create a thread that kills existing datanodes and creates new ones. + modThread = new Modify(conf, cluster); + modThread.start(); + + // wait for all transactions to get over + for (int i = 0; i < numThreads; i++) { + try { + System.out.println("Waiting for thread " + i + " to complete..."); + workload[i].join(); + + // if most of the threads are done, then stop restarting datanodes. + if (i >= numThreads/2) { + modThread.close(); + } + + } catch (InterruptedException e) { + i--; // retry + } + } + } finally { + if (modThread != null) { + modThread.close(); + try { + modThread.join(); + } catch (InterruptedException e) {} + } + fs.close(); + cluster.shutdown(); + } + } + + /** + * Write to one file, then kill one datanode in the pipeline and then + * close the file. + */ + private void simpleTest(int datanodeToKill) throws IOException { + Configuration conf = new Configuration(); + conf.setInt("heartbeat.recheck.interval", 2000); + conf.setInt("dfs.heartbeat.interval", 1); + conf.setInt("dfs.replication.pending.timeout.sec", 2); + conf.setInt("dfs.socket.timeout", 5000); + int myMaxNodes = 5; + System.out.println("SimpleTest starting with DataNode to Kill " + + datanodeToKill); + MiniDFSCluster cluster = new MiniDFSCluster(conf, myMaxNodes, true, null); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + DistributedFileSystem dfs = (DistributedFileSystem) fs; + short repl = 3; + + Path filename = new Path("simpletest.dat"); + Random rand = new Random(); + long myseed = rand.nextInt(); + rand = new Random(myseed); + try { + + // create a file and write one block of data + System.out.println("SimpleTest creating file " + filename); + FSDataOutputStream stm = createFile(fs, filename, repl); + DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream) + (stm.getWrappedStream()); + + // these are test settings + int bytesPerChecksum = conf.getInt( "io.bytes.per.checksum", 512); + dfstream.setChunksPerPacket(5); + dfstream.setArtificialSlowdown(3000); + + byte[] buffer = new byte[fileSize]; + rand.nextBytes(buffer); + int mid = fileSize/4; + stm.write(buffer, 0, mid); + + DatanodeInfo[] targets = dfstream.getPipeline(); + int count = 5; + while (count-- > 0 && targets == null) { + try { + System.out.println("SimpleTest: Waiting for pipeline to be created."); + Thread.sleep(1000); + } catch (InterruptedException e) { + } + targets = dfstream.getPipeline(); + } + + if (targets == null) { + int victim = rand.nextInt(myMaxNodes); + System.out.println("SimpleTest stopping datanode random " + victim); + cluster.stopDataNode(victim); + } else { + int victim = datanodeToKill; + System.out.println("SimpleTest stopping datanode " + + targets[victim].getName()); + cluster.stopDataNode(targets[victim].getName()); + } + System.out.println("SimpleTest stopping datanode complete"); + + // write some more data to file, close and verify + stm.write(buffer, mid, fileSize - mid); + stm.close(); + + checkFile(fs, filename, repl, numBlocks, fileSize, myseed); + } catch (Throwable e) { + System.out.println("Simple Workload exception " + e); + e.printStackTrace(); + assertTrue(e.toString(), false); + } finally { + fs.close(); + cluster.shutdown(); + } + } + + public void testDatanodeDeath() throws IOException { + for (int i = 0; i < 3; i++) { + simpleTest(i); // kills the ith datanode in the pipeline + } + complexTest(); + } +} Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java ------------------------------------------------------------------------------ svn:keywords = Id Revision HeadURL Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileLimit.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileLimit.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileLimit.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileLimit.java Thu Jan 17 10:11:35 2008 @@ -39,8 +39,6 @@ public class TestFileLimit extends TestCase { static final long seed = 0xDEADBEEFL; static final int blockSize = 8192; - static final int numBlocks = 2; - static final int fileSize = numBlocks * blockSize; boolean simulatedStorage = false; // The test file is 2 times the blocksize plus one. This means that when the @@ -59,6 +57,10 @@ FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf().getInt("io.file.buffer.size", 4096), (short)1, (long)blockSize); + byte[] buffer = new byte[1024]; + Random rand = new Random(seed); + rand.nextBytes(buffer); + stm.write(buffer); stm.close(); } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java Thu Jan 17 10:11:35 2008 @@ -31,6 +31,7 @@ } conf.set("dfs.replication", "" + fromREP); conf.setLong("dfs.blockreport.intervalMsec", 1000L); + conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2)); MiniDFSCluster cluster = new MiniDFSCluster(conf, 10, true, null); FileSystem fs = cluster.getFileSystem(); assertTrue("Not a HDFS: "+fs.getUri(), fs instanceof DistributedFileSystem); Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java?rev=612903&r1=612902&r2=612903&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java Thu Jan 17 10:11:35 2008 @@ -60,7 +60,7 @@ int bytesAdded = 0; for (int i = 1; i <= NUMBLOCKS; ++i) { Block b = new Block(i, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written - OutputStream dataOut = fsdataset.writeToBlock(b).dataOut; + OutputStream dataOut = fsdataset.writeToBlock(b, false).dataOut; assertEquals(0, fsdataset.getLength(b)); for (int j=1; j <= blockIdToLen(i); ++j) { dataOut.write(j);