hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r687868 - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/hdfs/
Date Thu, 21 Aug 2008 21:27:32 GMT
Author: cdouglas
Date: Thu Aug 21 14:27:31 2008
New Revision: 687868

URL: http://svn.apache.org/viewvc?rev=687868&view=rev
Log:
HADOOP-3062. Add metrics to DataNode and TaskTracker to record network
traffic for HDFS reads/writes and MR shuffling.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=687868&r1=687867&r2=687868&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Aug 21 14:27:31 2008
@@ -206,6 +206,9 @@
 
     HADOOP-3934. Upgrade log4j to 1.2.15. (omalley)
 
+    HADOOP-3062. Add metrics to DataNode and TaskTracker to record network
+    traffic for HDFS reads/writes and MR shuffling. (cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=687868&r1=687867&r2=687868&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Thu Aug 21 14:27:31 2008
@@ -1115,6 +1115,17 @@
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum)
                                        throws IOException {
+      return newBlockReader(sock, file, blockId, genStamp, startOffset,
+                            len, bufferSize, verifyChecksum, "");
+    }
+
+    public static BlockReader newBlockReader( Socket sock, String file,
+                                       long blockId, 
+                                       long genStamp,
+                                       long startOffset, long len,
+                                       int bufferSize, boolean verifyChecksum,
+                                       String clientName)
+                                       throws IOException {
       // in and out will be closed when sock is closed (by the caller)
       DataOutputStream out = new DataOutputStream(
         new BufferedOutputStream(NetUtils.getOutputStream(sock,WRITE_TIMEOUT)));
@@ -1126,6 +1137,7 @@
       out.writeLong( genStamp );
       out.writeLong( startOffset );
       out.writeLong( len );
+      Text.writeString(out, clientName);
       out.flush();
       
       //
@@ -1391,7 +1403,7 @@
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
               blk.getGenerationStamp(),
               offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
-              buffersize, verifyChecksum);
+              buffersize, verifyChecksum, clientName);
           return chosenNode;
         } catch (IOException ex) {
           // Put chosen node into dead list, continue
@@ -1573,7 +1585,7 @@
                                               block.getBlock().getBlockId(),
                                               block.getBlock().getGenerationStamp(),
                                               start, len, buffersize, 
-                                              verifyChecksum);
+                                              verifyChecksum, clientName);
           int nread = reader.readAll(buf, offset, len);
           if (nread != len) {
             throw new IOException("truncated return from reader.read(): " +
@@ -2297,7 +2309,7 @@
 
         this.hasError = false;
         errorIndex = 0;
-        success = createBlockOutputStream(nodes, src, true);
+        success = createBlockOutputStream(nodes, clientName, true);
       }
 
       response = new ResponseProcessor(nodes);
@@ -2482,7 +2494,7 @@
         //
         // Connect to first DataNode in the list.
         //
-        success = createBlockOutputStream(nodes, client, false);
+        success = createBlockOutputStream(nodes, clientName, false);
 
         if (!success) {
           LOG.info("Abandoning block " + block);

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=687868&r1=687867&r2=687868&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java Thu Aug 21
14:27:31 2008
@@ -101,11 +101,11 @@
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 11:
-   *    OP_WRITE_BLOCK sends a boolean. If its value is true, an additonal 
-   *    DatanodeInfo of client requesting transfer is also sent. 
+   * Version 12:
+   *    OP_READ_BLOCK includes clientName and OP_WRITE_BLOCK includes
+   *    clientName instead of path from DFSClient to Datanode
    */
-  public static final int DATA_TRANSFER_VERSION = 11;
+  public static final int DATA_TRANSFER_VERSION = 12;
 
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=687868&r1=687867&r2=687868&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu
Aug 21 14:27:31 2008
@@ -23,6 +23,7 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.zip.CRC32;
@@ -39,6 +40,7 @@
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 /** A class that receives a block and writes to its own disk, meanwhile
  * may copies it to another site. If a throttler is provided,
@@ -46,6 +48,7 @@
  **/
 class BlockReceiver implements java.io.Closeable, FSConstants {
   public static final Log LOG = DataNode.LOG;
+  static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   private Block block; // the block to receive
   protected boolean finalized;
@@ -60,6 +63,7 @@
   private int maxPacketReadLen;
   protected long offsetInBlock;
   protected final String inAddr;
+  protected final String myAddr;
   private String mirrorAddr;
   private DataOutputStream mirrorOut;
   private Daemon responder = null;
@@ -72,12 +76,13 @@
   private DataNode datanode = null;
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
-                boolean isRecovery, String clientName, 
+                String myAddr, boolean isRecovery, String clientName, 
                 DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
     try{
       this.block = block;
       this.in = in;
       this.inAddr = inAddr;
+      this.myAddr = myAddr;
       this.isRecovery = isRecovery;
       this.clientName = clientName;
       this.offsetInBlock = 0;
@@ -498,8 +503,7 @@
       if (clientName.length() > 0) {
         responder = new Daemon(datanode.threadGroup, 
                                new PacketResponder(this, block, mirrIn, 
-                                                   replyOut, numTargets,
-                                                   clientName));
+                                                   replyOut, numTargets));
         responder.start(); // start thread to processes reponses
       }
 
@@ -673,7 +677,6 @@
     DataInputStream mirrorIn;   // input from downstream datanode
     DataOutputStream replyOut;  // output to upstream datanode
     private int numTargets;     // number of downstream datanodes including myself
-    private String clientName;  // The name of the client (if any)
     private BlockReceiver receiver; // The owner of this responder.
 
     public String toString() {
@@ -681,13 +684,12 @@
     }
 
     PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
-                    DataOutputStream out, int numTargets, String clientName) {
+                    DataOutputStream out, int numTargets) {
       this.receiver = receiver;
       this.block = b;
       mirrorIn = in;
       replyOut = out;
       this.numTargets = numTargets;
-      this.clientName = clientName;
     }
 
     /**
@@ -776,9 +778,17 @@
                 datanode.myMetrics.blocksWritten.inc();
                 datanode.notifyNamenodeReceivedBlock(block, 
                     DataNode.EMPTY_DEL_HINT);
-                LOG.info("Received block " + block + 
-                         " of size " + block.getNumBytes() + 
-                         " from " + receiver.inAddr);
+                if (ClientTraceLog.isInfoEnabled() &&
+                    receiver.clientName.length() > 0) {
+                  ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
+                        receiver.inAddr, receiver.myAddr, block.getNumBytes(),
+                        "HDFS_WRITE", receiver.clientName,
+                        datanode.dnRegistration.getStorageID(), block));
+                } else {
+                  LOG.info("Received block " + block + 
+                           " of size " + block.getNumBytes() + 
+                           " from " + receiver.inAddr);
+                }
               }
               lastPacket = true;
             }
@@ -891,9 +901,17 @@
               datanode.myMetrics.blocksWritten.inc();
               datanode.notifyNamenodeReceivedBlock(block, 
                   DataNode.EMPTY_DEL_HINT);
-              LOG.info("Received block " + block + 
-                       " of size " + block.getNumBytes() + 
-                       " from " + receiver.inAddr);
+              if (ClientTraceLog.isInfoEnabled() &&
+                  receiver.clientName.length() > 0) {
+                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
+                      receiver.inAddr, receiver.myAddr, block.getNumBytes(),
+                      "HDFS_WRITE", receiver.clientName,
+                      datanode.dnRegistration.getStorageID(), block));
+              } else {
+                LOG.info("Received block " + block + 
+                         " of size " + block.getNumBytes() + 
+                         " from " + receiver.inAddr);
+              }
             }
 
             // send my status back to upstream datanode
@@ -932,7 +950,7 @@
             // If we forwarded an error response from a downstream datanode
             // and we are acting on behalf of a client, then we quit. The 
             // client will drive the recovery mechanism.
-            if (op == OP_STATUS_ERROR && clientName.length() > 0) {
+            if (op == OP_STATUS_ERROR && receiver.clientName.length() > 0) {
               running = false;
             }
         } catch (IOException e) {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=687868&r1=687867&r2=687868&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu
Aug 21 14:27:31 2008
@@ -43,6 +43,7 @@
  */
 class BlockSender implements java.io.Closeable, FSConstants {
   public static final Log LOG = DataNode.LOG;
+  static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   private Block block; // the block to read from
   private InputStream blockIn; // data stream
@@ -62,7 +63,8 @@
   private boolean blockReadFully; //set when the whole block is read
   private boolean verifyChecksum; //if true, check is verified while reading
   private BlockTransferThrottler throttler;
-  
+  private final String clientTraceFmt; // format of client trace log message
+
   /**
    * Minimum buffer used while sending data to clients. Used only if
    * transferTo() is enabled. 64KB is not that large. It could be larger, but
@@ -74,7 +76,14 @@
   BlockSender(Block block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean verifyChecksum, DataNode datanode) throws IOException {
+    this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
+         verifyChecksum, datanode, null);
+  }
 
+  BlockSender(Block block, long startOffset, long length,
+              boolean corruptChecksumOk, boolean chunkOffsetOK,
+              boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
+      throws IOException {
     try {
       this.block = block;
       this.chunkOffsetOK = chunkOffsetOK;
@@ -82,6 +91,7 @@
       this.verifyChecksum = verifyChecksum;
       this.blockLength = datanode.data.getLength(block);
       this.transferToAllowed = datanode.transferToAllowed;
+      this.clientTraceFmt = clientTraceFmt;
 
       if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
         checksumIn = new DataInputStream(
@@ -382,6 +392,9 @@
       out.writeInt(0); // mark the end of block        
       out.flush();
     } finally {
+      if (clientTraceFmt != null) {
+        ClientTraceLog.info(String.format(clientTraceFmt, totalRead));
+      }
       close();
     }
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=687868&r1=687867&r2=687868&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Aug
21 14:27:31 2008
@@ -33,6 +33,7 @@
 import java.security.SecureRandom;
 import java.util.AbstractList;
 import java.util.ArrayList;
+import java.util.Formatter;
 import java.util.LinkedList;
 import java.util.Random;
 import java.util.concurrent.Semaphore;
@@ -113,7 +114,18 @@
  **********************************************************/
 public class DataNode extends Configured 
     implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
-  public static final Log LOG = LogFactory.getLog(DataNode.class.getName());
+  public static final Log LOG = LogFactory.getLog(DataNode.class);
+
+  public static final String DN_CLIENTTRACE_FORMAT =
+        "src: %s" +      // src IP
+        ", dest: %s" +   // dst IP
+        ", bytes: %s" +  // byte count
+        ", op: %s" +     // operation
+        ", cliID: %s" +  // DFSClient id
+        ", srvID: %s" +  // DatanodeRegistration
+        ", blockid: %s"; // block id
+  static final Log ClientTraceLog =
+    LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
 
   /**
    * Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -914,6 +926,8 @@
      +-------------------------------------------------------------------------+
      | 8 byte Block ID | 8 byte genstamp | 8 byte start offset | 8 byte length |
      +-------------------------------------------------------------------------+
+     |   vInt length   |  <DFSClient id> |
+     +-----------------------------------+
      
      Client sends optional response only at the end of receiving data.
        

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=687868&r1=687867&r2=687868&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu
Aug 21 14:27:31 2008
@@ -36,16 +36,18 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 /**
  * Thread for processing incoming/outgoing data stream.
  */
 class DataXceiver implements Runnable, FSConstants {
   public static final Log LOG = DataNode.LOG;
+  static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
   Socket s;
-  String remoteAddress; // address of remote side
-  String localAddress;  // local address of this daemon
+  final String remoteAddress; // address of remote side
+  final String localAddress;  // local address of this daemon
   DataNode datanode;
   DataXceiverServer dataXceiverServer;
   
@@ -55,9 +57,8 @@
     this.s = s;
     this.datanode = datanode;
     this.dataXceiverServer = dataXceiverServer;
-    InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress();
-    remoteAddress = isock.toString();
-    localAddress = s.getInetAddress() + ":" + s.getLocalPort();
+    remoteAddress = s.getRemoteSocketAddress().toString();
+    localAddress = s.getLocalSocketAddress().toString();
     LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
   }
 
@@ -141,7 +142,7 @@
 
     long startOffset = in.readLong();
     long length = in.readLong();
-
+    String clientName = Text.readString(in);
     // send the block
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
@@ -149,10 +150,17 @@
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
     
     BlockSender blockSender = null;
+    final String clientTraceFmt =
+      clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
+        ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
+            "%d", "HDFS_READ", clientName,
+            datanode.dnRegistration.getStorageID(), block)
+        : datanode.dnRegistration + " Served block " + block + " to " +
+            s.getInetAddress();
     try {
       try {
-        blockSender = new BlockSender(block, startOffset, length, 
-                                      true, true, false, datanode);
+        blockSender = new BlockSender(block, startOffset, length,
+            true, true, false, datanode, clientTraceFmt);
       } catch(IOException e) {
         out.writeShort(OP_STATUS_ERROR);
         throw e;
@@ -174,8 +182,6 @@
       
       datanode.myMetrics.bytesRead.inc((int) read);
       datanode.myMetrics.blocksRead.inc();
-      LOG.info(datanode.dnRegistration + " Served block " + block + " to " + 
-          s.getInetAddress());
     } catch ( SocketException ignored ) {
       // Its ok for remote side to close the connection anytime.
       datanode.myMetrics.blocksRead.inc();
@@ -241,8 +247,9 @@
     try {
       // open a block receiver and check if the block does not exist
       blockReceiver = new BlockReceiver(block, in, 
-          s.getInetAddress().toString(), isRecovery, client, srcDataNode,
-          datanode);
+          s.getRemoteSocketAddress().toString(),
+          s.getLocalSocketAddress().toString(),
+          isRecovery, client, srcDataNode, datanode);
 
       // get a connection back to the previous target
       replyOut = new DataOutputStream(
@@ -502,8 +509,8 @@
     try {
       // open a block receiver and check if the block does not exist
        blockReceiver = new BlockReceiver(
-          block, in, s.getRemoteSocketAddress().toString(), false, "", null,
-          datanode);
+          block, in, s.getRemoteSocketAddress().toString(),
+          s.getLocalSocketAddress().toString(), false, "", null, datanode);
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=687868&r1=687867&r2=687868&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Aug 21 14:27:31
2008
@@ -99,6 +99,15 @@
   public static final Log LOG =
     LogFactory.getLog(TaskTracker.class);
 
+  public static final String MR_CLIENTTRACE_FORMAT =
+        "src: %s" +     // src IP
+        ", dest: %s" +  // dst IP
+        ", bytes: %s" + // byte count
+        ", op: %s" +    // operation
+        ", cliID: %s";  // task id
+  public static final Log ClientTraceLog =
+    LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
+
   private boolean running = true;
 
   private LocalDirAllocator localDirAllocator;
@@ -2389,6 +2398,7 @@
       FSDataInputStream indexIn = null;
       FSDataInputStream mapOutputIn = null;
       
+      long totalRead = 0;
       ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
                                       context.getAttribute("shuffleServerMetrics");
       try {
@@ -2468,7 +2478,6 @@
         //seek to the correct offset for the reduce
         mapOutputIn.seek(startOffset);
           
-        long totalRead = 0;
         int len = mapOutputIn.read(buffer, 0,
                                    partLength < MAX_BYTES_TO_READ 
                                    ? (int)partLength : MAX_BYTES_TO_READ);
@@ -2514,6 +2523,12 @@
           mapOutputIn.close();
         }
         shuffleMetrics.serverHandlerFree();
+        if (ClientTraceLog.isInfoEnabled()) {
+          ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
+                request.getLocalAddr() + ":" + request.getLocalPort(),
+                request.getRemoteAddr() + ":" + request.getRemotePort(),
+                totalRead, "MAPRED_SHUFFLE", mapId));
+        }
       }
       outStream.close();
       shuffleMetrics.successOutput();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=687868&r1=687867&r2=687868&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Thu Aug
21 14:27:31 2008
@@ -267,6 +267,7 @@
     sendOut.writeLong(0L);
     sendOut.writeLong(fileLen);
     recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
+    Text.writeString(sendOut, "cl");
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
 
     // negative block start offset
@@ -277,6 +278,7 @@
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(-1L);
     sendOut.writeLong(fileLen);
+    Text.writeString(sendOut, "cl");
     sendRecvData("Negative start-offset for read for block " + 
                  firstBlock.getBlockId(), false);
 
@@ -288,6 +290,7 @@
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
+    Text.writeString(sendOut, "cl");
     sendRecvData("Wrong start-offset for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -301,6 +304,7 @@
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(0);
     sendOut.writeLong(-1-random.nextInt(oneMil));
+    Text.writeString(sendOut, "cl");
     sendRecvData("Negative length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -314,6 +318,7 @@
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen + 1);
+    Text.writeString(sendOut, "cl");
     sendRecvData("Wrong length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -325,6 +330,7 @@
     sendOut.writeLong(firstBlock.getGenerationStamp());
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen);
+    Text.writeString(sendOut, "cl");
     readFile(fileSys, file, fileLen);
   }
 }



Mime
View raw message