Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 59082 invoked from network); 21 Aug 2008 21:28:05 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 21 Aug 2008 21:28:05 -0000 Received: (qmail 93568 invoked by uid 500); 21 Aug 2008 21:28:03 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 93544 invoked by uid 500); 21 Aug 2008 21:28:03 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 93531 invoked by uid 99); 21 Aug 2008 21:28:02 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Aug 2008 14:28:02 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Aug 2008 21:27:03 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 839B223889F3; Thu, 21 Aug 2008 14:27:32 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r687868 - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/hdfs/ Date: Thu, 21 Aug 2008 21:27:32 -0000 To: core-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080821212732.839B223889F3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Thu Aug 21 14:27:31 2008 New Revision: 687868 URL: http://svn.apache.org/viewvc?rev=687868&view=rev Log: HADOOP-3062. Add metrics to DataNode and TaskTracker to record network traffic for HDFS reads/writes and MR shuffling. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=687868&r1=687867&r2=687868&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Thu Aug 21 14:27:31 2008 @@ -206,6 +206,9 @@ HADOOP-3934. Upgrade log4j to 1.2.15. (omalley) + HADOOP-3062. Add metrics to DataNode and TaskTracker to record network + traffic for HDFS reads/writes and MR shuffling. (cdouglas) + OPTIMIZATIONS HADOOP-3556. Removed lock contention in MD5Hash by changing the Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=687868&r1=687867&r2=687868&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Thu Aug 21 14:27:31 2008 @@ -1115,6 +1115,17 @@ long startOffset, long len, int bufferSize, boolean verifyChecksum) throws IOException { + return newBlockReader(sock, file, blockId, genStamp, startOffset, + len, bufferSize, verifyChecksum, ""); + } + + public static BlockReader newBlockReader( Socket sock, String file, + long blockId, + long genStamp, + long startOffset, long len, + int bufferSize, boolean verifyChecksum, + String clientName) + throws IOException { // in and out will be closed when sock is closed (by the caller) DataOutputStream out = new DataOutputStream( new BufferedOutputStream(NetUtils.getOutputStream(sock,WRITE_TIMEOUT))); @@ -1126,6 +1137,7 @@ out.writeLong( genStamp ); out.writeLong( startOffset ); out.writeLong( len ); + Text.writeString(out, clientName); out.flush(); // @@ -1391,7 +1403,7 @@ blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), blk.getGenerationStamp(), offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, - buffersize, verifyChecksum); + buffersize, verifyChecksum, clientName); return chosenNode; } catch (IOException ex) { // Put chosen node into dead list, continue @@ -1573,7 +1585,7 @@ block.getBlock().getBlockId(), block.getBlock().getGenerationStamp(), start, len, buffersize, - verifyChecksum); + verifyChecksum, clientName); int nread = reader.readAll(buf, offset, len); if (nread != len) { throw new IOException("truncated return from reader.read(): " + @@ -2297,7 +2309,7 @@ this.hasError = false; errorIndex = 0; - success = createBlockOutputStream(nodes, src, true); + success = createBlockOutputStream(nodes, clientName, true); } response = new ResponseProcessor(nodes); @@ -2482,7 +2494,7 @@ // // Connect to first DataNode in the list. // - success = createBlockOutputStream(nodes, client, false); + success = createBlockOutputStream(nodes, clientName, false); if (!success) { LOG.info("Abandoning block " + block); Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=687868&r1=687867&r2=687868&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java Thu Aug 21 14:27:31 2008 @@ -101,11 +101,11 @@ * when protocol changes. It is not very obvious. */ /* - * Version 11: - * OP_WRITE_BLOCK sends a boolean. If its value is true, an additonal - * DatanodeInfo of client requesting transfer is also sent. + * Version 12: + * OP_READ_BLOCK includes clientName and OP_WRITE_BLOCK includes + * clientName instead of path from DFSClient to Datanode */ - public static final int DATA_TRANSFER_VERSION = 11; + public static final int DATA_TRANSFER_VERSION = 12; // Return codes for file create public static final int OPERATION_FAILED = 0; Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=687868&r1=687867&r2=687868&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Aug 21 14:27:31 2008 @@ -23,6 +23,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.zip.CRC32; @@ -39,6 +40,7 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT; /** A class that receives a block and writes to its own disk, meanwhile * may copies it to another site. If a throttler is provided, @@ -46,6 +48,7 @@ **/ class BlockReceiver implements java.io.Closeable, FSConstants { public static final Log LOG = DataNode.LOG; + static final Log ClientTraceLog = DataNode.ClientTraceLog; private Block block; // the block to receive protected boolean finalized; @@ -60,6 +63,7 @@ private int maxPacketReadLen; protected long offsetInBlock; protected final String inAddr; + protected final String myAddr; private String mirrorAddr; private DataOutputStream mirrorOut; private Daemon responder = null; @@ -72,12 +76,13 @@ private DataNode datanode = null; BlockReceiver(Block block, DataInputStream in, String inAddr, - boolean isRecovery, String clientName, + String myAddr, boolean isRecovery, String clientName, DatanodeInfo srcDataNode, DataNode datanode) throws IOException { try{ this.block = block; this.in = in; this.inAddr = inAddr; + this.myAddr = myAddr; this.isRecovery = isRecovery; this.clientName = clientName; this.offsetInBlock = 0; @@ -498,8 +503,7 @@ if (clientName.length() > 0) { responder = new Daemon(datanode.threadGroup, new PacketResponder(this, block, mirrIn, - replyOut, numTargets, - clientName)); + replyOut, numTargets)); responder.start(); // start thread to processes reponses } @@ -673,7 +677,6 @@ DataInputStream mirrorIn; // input from downstream datanode DataOutputStream replyOut; // output to upstream datanode private int numTargets; // number of downstream datanodes including myself - private String clientName; // The name of the client (if any) private BlockReceiver receiver; // The owner of this responder. public String toString() { @@ -681,13 +684,12 @@ } PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, - DataOutputStream out, int numTargets, String clientName) { + DataOutputStream out, int numTargets) { this.receiver = receiver; this.block = b; mirrorIn = in; replyOut = out; this.numTargets = numTargets; - this.clientName = clientName; } /** @@ -776,9 +778,17 @@ datanode.myMetrics.blocksWritten.inc(); datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT); - LOG.info("Received block " + block + - " of size " + block.getNumBytes() + - " from " + receiver.inAddr); + if (ClientTraceLog.isInfoEnabled() && + receiver.clientName.length() > 0) { + ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, + receiver.inAddr, receiver.myAddr, block.getNumBytes(), + "HDFS_WRITE", receiver.clientName, + datanode.dnRegistration.getStorageID(), block)); + } else { + LOG.info("Received block " + block + + " of size " + block.getNumBytes() + + " from " + receiver.inAddr); + } } lastPacket = true; } @@ -891,9 +901,17 @@ datanode.myMetrics.blocksWritten.inc(); datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT); - LOG.info("Received block " + block + - " of size " + block.getNumBytes() + - " from " + receiver.inAddr); + if (ClientTraceLog.isInfoEnabled() && + receiver.clientName.length() > 0) { + ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, + receiver.inAddr, receiver.myAddr, block.getNumBytes(), + "HDFS_WRITE", receiver.clientName, + datanode.dnRegistration.getStorageID(), block)); + } else { + LOG.info("Received block " + block + + " of size " + block.getNumBytes() + + " from " + receiver.inAddr); + } } // send my status back to upstream datanode @@ -932,7 +950,7 @@ // If we forwarded an error response from a downstream datanode // and we are acting on behalf of a client, then we quit. The // client will drive the recovery mechanism. - if (op == OP_STATUS_ERROR && clientName.length() > 0) { + if (op == OP_STATUS_ERROR && receiver.clientName.length() > 0) { running = false; } } catch (IOException e) { Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=687868&r1=687867&r2=687868&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Aug 21 14:27:31 2008 @@ -43,6 +43,7 @@ */ class BlockSender implements java.io.Closeable, FSConstants { public static final Log LOG = DataNode.LOG; + static final Log ClientTraceLog = DataNode.ClientTraceLog; private Block block; // the block to read from private InputStream blockIn; // data stream @@ -62,7 +63,8 @@ private boolean blockReadFully; //set when the whole block is read private boolean verifyChecksum; //if true, check is verified while reading private BlockTransferThrottler throttler; - + private final String clientTraceFmt; // format of client trace log message + /** * Minimum buffer used while sending data to clients. Used only if * transferTo() is enabled. 64KB is not that large. It could be larger, but @@ -74,7 +76,14 @@ BlockSender(Block block, long startOffset, long length, boolean corruptChecksumOk, boolean chunkOffsetOK, boolean verifyChecksum, DataNode datanode) throws IOException { + this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK, + verifyChecksum, datanode, null); + } + BlockSender(Block block, long startOffset, long length, + boolean corruptChecksumOk, boolean chunkOffsetOK, + boolean verifyChecksum, DataNode datanode, String clientTraceFmt) + throws IOException { try { this.block = block; this.chunkOffsetOK = chunkOffsetOK; @@ -82,6 +91,7 @@ this.verifyChecksum = verifyChecksum; this.blockLength = datanode.data.getLength(block); this.transferToAllowed = datanode.transferToAllowed; + this.clientTraceFmt = clientTraceFmt; if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) { checksumIn = new DataInputStream( @@ -382,6 +392,9 @@ out.writeInt(0); // mark the end of block out.flush(); } finally { + if (clientTraceFmt != null) { + ClientTraceLog.info(String.format(clientTraceFmt, totalRead)); + } close(); } Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=687868&r1=687867&r2=687868&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Aug 21 14:27:31 2008 @@ -33,6 +33,7 @@ import java.security.SecureRandom; import java.util.AbstractList; import java.util.ArrayList; +import java.util.Formatter; import java.util.LinkedList; import java.util.Random; import java.util.concurrent.Semaphore; @@ -113,7 +114,18 @@ **********************************************************/ public class DataNode extends Configured implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable { - public static final Log LOG = LogFactory.getLog(DataNode.class.getName()); + public static final Log LOG = LogFactory.getLog(DataNode.class); + + public static final String DN_CLIENTTRACE_FORMAT = + "src: %s" + // src IP + ", dest: %s" + // dst IP + ", bytes: %s" + // byte count + ", op: %s" + // operation + ", cliID: %s" + // DFSClient id + ", srvID: %s" + // DatanodeRegistration + ", blockid: %s"; // block id + static final Log ClientTraceLog = + LogFactory.getLog(DataNode.class.getName() + ".clienttrace"); /** * Use {@link NetUtils#createSocketAddr(String)} instead. @@ -914,6 +926,8 @@ +-------------------------------------------------------------------------+ | 8 byte Block ID | 8 byte genstamp | 8 byte start offset | 8 byte length | +-------------------------------------------------------------------------+ + | vInt length | | + +-----------------------------------+ Client sends optional response only at the end of receiving data. Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=687868&r1=687867&r2=687868&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Aug 21 14:27:31 2008 @@ -36,16 +36,18 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT; /** * Thread for processing incoming/outgoing data stream. */ class DataXceiver implements Runnable, FSConstants { public static final Log LOG = DataNode.LOG; + static final Log ClientTraceLog = DataNode.ClientTraceLog; Socket s; - String remoteAddress; // address of remote side - String localAddress; // local address of this daemon + final String remoteAddress; // address of remote side + final String localAddress; // local address of this daemon DataNode datanode; DataXceiverServer dataXceiverServer; @@ -55,9 +57,8 @@ this.s = s; this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; - InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress(); - remoteAddress = isock.toString(); - localAddress = s.getInetAddress() + ":" + s.getLocalPort(); + remoteAddress = s.getRemoteSocketAddress().toString(); + localAddress = s.getLocalSocketAddress().toString(); LOG.debug("Number of active connections is: " + datanode.getXceiverCount()); } @@ -141,7 +142,7 @@ long startOffset = in.readLong(); long length = in.readLong(); - + String clientName = Text.readString(in); // send the block OutputStream baseStream = NetUtils.getOutputStream(s, datanode.socketWriteTimeout); @@ -149,10 +150,17 @@ new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE)); BlockSender blockSender = null; + final String clientTraceFmt = + clientName.length() > 0 && ClientTraceLog.isInfoEnabled() + ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress, + "%d", "HDFS_READ", clientName, + datanode.dnRegistration.getStorageID(), block) + : datanode.dnRegistration + " Served block " + block + " to " + + s.getInetAddress(); try { try { - blockSender = new BlockSender(block, startOffset, length, - true, true, false, datanode); + blockSender = new BlockSender(block, startOffset, length, + true, true, false, datanode, clientTraceFmt); } catch(IOException e) { out.writeShort(OP_STATUS_ERROR); throw e; @@ -174,8 +182,6 @@ datanode.myMetrics.bytesRead.inc((int) read); datanode.myMetrics.blocksRead.inc(); - LOG.info(datanode.dnRegistration + " Served block " + block + " to " + - s.getInetAddress()); } catch ( SocketException ignored ) { // Its ok for remote side to close the connection anytime. datanode.myMetrics.blocksRead.inc(); @@ -241,8 +247,9 @@ try { // open a block receiver and check if the block does not exist blockReceiver = new BlockReceiver(block, in, - s.getInetAddress().toString(), isRecovery, client, srcDataNode, - datanode); + s.getRemoteSocketAddress().toString(), + s.getLocalSocketAddress().toString(), + isRecovery, client, srcDataNode, datanode); // get a connection back to the previous target replyOut = new DataOutputStream( @@ -502,8 +509,8 @@ try { // open a block receiver and check if the block does not exist blockReceiver = new BlockReceiver( - block, in, s.getRemoteSocketAddress().toString(), false, "", null, - datanode); + block, in, s.getRemoteSocketAddress().toString(), + s.getLocalSocketAddress().toString(), false, "", null, datanode); // receive a block blockReceiver.receiveBlock(null, null, null, null, Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=687868&r1=687867&r2=687868&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Aug 21 14:27:31 2008 @@ -99,6 +99,15 @@ public static final Log LOG = LogFactory.getLog(TaskTracker.class); + public static final String MR_CLIENTTRACE_FORMAT = + "src: %s" + // src IP + ", dest: %s" + // dst IP + ", bytes: %s" + // byte count + ", op: %s" + // operation + ", cliID: %s"; // task id + public static final Log ClientTraceLog = + LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace"); + private boolean running = true; private LocalDirAllocator localDirAllocator; @@ -2389,6 +2398,7 @@ FSDataInputStream indexIn = null; FSDataInputStream mapOutputIn = null; + long totalRead = 0; ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics) context.getAttribute("shuffleServerMetrics"); try { @@ -2468,7 +2478,6 @@ //seek to the correct offset for the reduce mapOutputIn.seek(startOffset); - long totalRead = 0; int len = mapOutputIn.read(buffer, 0, partLength < MAX_BYTES_TO_READ ? (int)partLength : MAX_BYTES_TO_READ); @@ -2514,6 +2523,12 @@ mapOutputIn.close(); } shuffleMetrics.serverHandlerFree(); + if (ClientTraceLog.isInfoEnabled()) { + ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT, + request.getLocalAddr() + ":" + request.getLocalPort(), + request.getRemoteAddr() + ":" + request.getRemotePort(), + totalRead, "MAPRED_SHUFFLE", mapId)); + } } outStream.close(); shuffleMetrics.successOutput(); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=687868&r1=687867&r2=687868&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Thu Aug 21 14:27:31 2008 @@ -267,6 +267,7 @@ sendOut.writeLong(0L); sendOut.writeLong(fileLen); recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR); + Text.writeString(sendOut, "cl"); sendRecvData("Wrong block ID " + newBlockId + " for read", false); // negative block start offset @@ -277,6 +278,7 @@ sendOut.writeLong(firstBlock.getGenerationStamp()); sendOut.writeLong(-1L); sendOut.writeLong(fileLen); + Text.writeString(sendOut, "cl"); sendRecvData("Negative start-offset for read for block " + firstBlock.getBlockId(), false); @@ -288,6 +290,7 @@ sendOut.writeLong(firstBlock.getGenerationStamp()); sendOut.writeLong(fileLen); sendOut.writeLong(fileLen); + Text.writeString(sendOut, "cl"); sendRecvData("Wrong start-offset for reading block " + firstBlock.getBlockId(), false); @@ -301,6 +304,7 @@ sendOut.writeLong(firstBlock.getGenerationStamp()); sendOut.writeLong(0); sendOut.writeLong(-1-random.nextInt(oneMil)); + Text.writeString(sendOut, "cl"); sendRecvData("Negative length for reading block " + firstBlock.getBlockId(), false); @@ -314,6 +318,7 @@ sendOut.writeLong(firstBlock.getGenerationStamp()); sendOut.writeLong(0); sendOut.writeLong(fileLen + 1); + Text.writeString(sendOut, "cl"); sendRecvData("Wrong length for reading block " + firstBlock.getBlockId(), false); @@ -325,6 +330,7 @@ sendOut.writeLong(firstBlock.getGenerationStamp()); sendOut.writeLong(0); sendOut.writeLong(fileLen); + Text.writeString(sendOut, "cl"); readFile(fileSys, file, fileLen); } }