hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1086461 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSOutputStream.java src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Date Tue, 29 Mar 2011 01:36:23 GMT
Author: szetszwo
Date: Tue Mar 29 01:36:22 2011
New Revision: 1086461

URL: http://svn.apache.org/viewvc?rev=1086461&view=rev
Log:
Revert HDFS-1789 since fault-injection tests cannot be compiled.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1086461&r1=1086460&r2=1086461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Mar 29 01:36:22 2011
@@ -21,6 +21,10 @@ Trunk (unreleased changes)
 
     HDFS-1675. Support transferring RBW between datanodes. (szetszwo)
 
+    HDFS-1785. In BlockReceiver and DataXceiver, clientName.length() is used
+    multiple times for determining whether the source is a client or a
+    datanode.  (szetszwo)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
@@ -82,13 +86,6 @@ Trunk (unreleased changes)
     HDFS-1541. Not marking datanodes dead when namenode in safemode.
     (hairong)
 
-    HDFS-1785. In BlockReceiver and DataXceiver, clientName.length() is used
-    multiple times for determining whether the source is a client or a
-    datanode.  (szetszwo)
-
-    HDFS-1789. Refactor frequently used codes from DFSOutputStream,
-    BlockReceiver and DataXceiver.  (szetszwo)
-
     HDFS-1120. Make DataNode's block-to-device placement policy pluggable
     (Harsh J Chouraria via todd)
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1086461&r1=1086460&r2=1086461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Mar 29 01:36:22
2011
@@ -24,8 +24,8 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.BufferOverflowException;
@@ -47,22 +47,21 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -72,6 +71,8 @@ import org.apache.hadoop.util.DataChecks
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 
 /****************************************************************
  * DFSOutputStream creates files from a stream of bytes.
@@ -888,7 +889,18 @@ class DFSOutputStream extends FSOutputSu
 
       boolean result = false;
       try {
-        s = createSocketForPipeline(nodes, dfsClient);
+        if(DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug("Connecting to " + nodes[0].getName());
+        }
+        InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
+        s = dfsClient.socketFactory.createSocket();
+        int timeoutValue = dfsClient.getDatanodeReadTimeout(nodes.length);
+        NetUtils.connect(s, target, timeoutValue);
+        s.setSoTimeout(timeoutValue);
+        s.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
+        if(DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug("Send buf size " + s.getSendBufferSize());
+        }
         long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
 
         //
@@ -1024,29 +1036,6 @@ class DFSOutputStream extends FSOutputSu
     }
   }
 
-  /**
-   * Create a socket for a write pipeline
-   * @param datanodes the datanodes on the pipeline 
-   * @param client
-   * @return the socket connected to the first datanode
-   */
-  static Socket createSocketForPipeline(final DatanodeInfo[] datanodes,
-      final DFSClient client) throws IOException {
-    if(DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Connecting to datanode " + datanodes[0].getName());
-    }
-    final InetSocketAddress isa = NetUtils.createSocketAddr(datanodes[0].getName());
-    final Socket sock = client.socketFactory.createSocket();
-    final int timeout = client.getDatanodeReadTimeout(datanodes.length);
-    NetUtils.connect(sock, isa, timeout);
-    sock.setSoTimeout(timeout);
-    sock.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
-    if(DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize());
-    }
-    return sock;
-  }
-
   private void isClosed() throws IOException {
     if (closed) {
       IOException e = lastException;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1086461&r1=1086460&r2=1086461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue
Mar 29 01:36:22 2011
@@ -22,7 +22,6 @@ import static org.apache.hadoop.hdfs.pro
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 import java.io.BufferedOutputStream;
-import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -55,7 +54,7 @@ import org.apache.hadoop.util.StringUtil
  * may copies it to another site. If a throttler is provided,
  * streaming throttling is also supported.
  **/
-class BlockReceiver implements Closeable, FSConstants {
+class BlockReceiver implements java.io.Closeable, FSConstants {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -630,7 +629,7 @@ class BlockReceiver implements Closeable
       DataInputStream mirrIn,   // input from next datanode
       DataOutputStream replyOut,  // output to previous datanode
       String mirrAddr, DataTransferThrottler throttlerArg,
-      final DatanodeInfo[] downstreams) throws IOException {
+      int numTargets) throws IOException {
 
       boolean responderClosed = false;
       mirrorOut = mirrOut;
@@ -639,8 +638,10 @@ class BlockReceiver implements Closeable
 
     try {
       if (isClient) {
-        responder = new Daemon(datanode.threadGroup,
-            new PacketResponder(downstreams, mirrIn, replyOut));
+        responder = new Daemon(datanode.threadGroup, 
+                               new PacketResponder(this, block, mirrIn, 
+                                                   replyOut, numTargets,
+                                                   Thread.currentThread()));
         responder.start(); // start thread to processes reponses
       }
 
@@ -787,59 +788,46 @@ class BlockReceiver implements Closeable
    * Processed responses from downstream datanodes in the pipeline
    * and sends back replies to the originator.
    */
-  private class PacketResponder implements Runnable, Closeable, FSConstants {   
+  class PacketResponder implements Runnable, FSConstants {   
 
-    /** queue for packets waiting for ack */
-    private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
-    /** the thread that spawns this responder */
-    private final Thread receiverThread = Thread.currentThread();
-    /** is this responder running? */
+    //packet waiting for ack
+    private LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
     private volatile boolean running = true;
+    private Block block;
+    DataInputStream mirrorIn;   // input from downstream datanode
+    DataOutputStream replyOut;  // output to upstream datanode
+    private int numTargets;     // number of downstream datanodes including myself
+    private BlockReceiver receiver; // The owner of this responder.
+    private Thread receiverThread; // the thread that spawns this responder
 
-    /** downstream datanodes excluding myself */
-    private final DatanodeInfo[] downstreams;
-    /** input from the next downstream datanode */
-    private final DataInputStream downstreamIn;
-    /** output to upstream datanode/client */
-    private final DataOutputStream upstreamOut;
-
-    /** for log and error messages */
-    private final String myString; 
-
-    @Override
     public String toString() {
-      return getClass().getSimpleName() + ": block=" + block + ", downstreams="
-          + (downstreams == null? null: Arrays.asList(downstreams));
-    }
-
-    PacketResponder(final DatanodeInfo[] downstreams,
-        final DataInputStream downstreamIn,
-        final DataOutputStream upstreamOut) {
-      this.downstreams = downstreams;
-      this.downstreamIn = downstreamIn;
-      this.upstreamOut = upstreamOut;
-
-      this.myString = toString();
+      return "PacketResponder " + numTargets + " for Block " + this.block;
     }
 
-    private boolean isLastDatanode() {
-      return downstreams != null && downstreams.length == 0;
+    PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
+                    DataOutputStream out, int numTargets,
+                    Thread receiverThread) {
+      this.receiverThread = receiverThread;
+      this.receiver = receiver;
+      this.block = b;
+      mirrorIn = in;
+      replyOut = out;
+      this.numTargets = numTargets;
     }
 
     /**
      * enqueue the seqno that is still be to acked by the downstream datanode.
      * @param seqno
      * @param lastPacketInBlock
-     * @param offsetInBlock
+     * @param lastByteInPacket
      */
-    synchronized void enqueue(final long seqno,
-        final boolean lastPacketInBlock, final long offsetInBlock) {
+    synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket)
{
       if (running) {
-        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
         if(LOG.isDebugEnabled()) {
-          LOG.debug(myString + ": enqueue " + p);
+          LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
+                    " to ack queue.");
         }
-        ackQueue.addLast(p);
+        ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
         notifyAll();
       }
     }
@@ -847,8 +835,7 @@ class BlockReceiver implements Closeable
     /**
      * wait for all pending packets to be acked. Then shutdown thread.
      */
-    @Override
-    public synchronized void close() {
+    synchronized void close() {
       while (running && ackQueue.size() != 0 && datanode.shouldRun) {
         try {
           wait();
@@ -857,7 +844,8 @@ class BlockReceiver implements Closeable
         }
       }
       if(LOG.isDebugEnabled()) {
-        LOG.debug(myString + ": closing");
+        LOG.debug("PacketResponder " + numTargets +
+                 " for block " + block + " Closing down.");
       }
       running = false;
       notifyAll();
@@ -879,19 +867,21 @@ class BlockReceiver implements Closeable
             PipelineAck ack = new PipelineAck();
             long seqno = PipelineAck.UNKOWN_SEQNO;
             try {
-              if (!isLastDatanode() && !mirrorError) {// not the last DN & no
mirror error
+              if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror
error
                 // read an ack from downstream datanode
-                ack.readFields(downstreamIn);
+                ack.readFields(mirrorIn);
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug(myString + " got " + ack);
+                  LOG.debug("PacketResponder " + numTargets + " got " + ack);
                 }
                 seqno = ack.getSeqno();
               }
-              if (seqno != PipelineAck.UNKOWN_SEQNO || isLastDatanode()) {
+              if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size()
== 0) {
                     if (LOG.isDebugEnabled()) {
-                      LOG.debug(myString + ": seqno=" + seqno +
+                      LOG.debug("PacketResponder " + numTargets + 
+                                " seqno = " + seqno +
+                                " for block " + block +
                                 " waiting for local datanode to finish write.");
                     }
                     wait();
@@ -901,10 +891,11 @@ class BlockReceiver implements Closeable
                   }
                   pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  if (downstreams != null && downstreams.length > 0
-                      && seqno != expected) {
-                    throw new IOException(myString + "seqno: expected="
-                        + expected + ", received=" + seqno);
+                  if (numTargets > 0 && seqno != expected) {
+                    throw new IOException("PacketResponder " + numTargets +
+                                          " for block " + block +
+                                          " expected seqno:" + expected +
+                                          " received:" + seqno);
                   }
                   lastPacketInBlock = pkt.lastPacketInBlock;
                 }
@@ -919,7 +910,8 @@ class BlockReceiver implements Closeable
                 // notify client of the error
                 // and wait for the client to shut down the pipeline
                 mirrorError = true;
-                LOG.info(myString, ioe);
+                LOG.info("PacketResponder " + block + " " + numTargets + 
+                      " Exception " + StringUtils.stringifyException(ioe));
               }
             }
 
@@ -931,7 +923,8 @@ class BlockReceiver implements Closeable
                * because this datanode has a problem. The upstream datanode
                * will detect that this datanode is bad, and rightly so.
                */
-              LOG.info(myString + ": Thread is interrupted.");
+              LOG.info("PacketResponder " + block +  " " + numTargets +
+                       " : Thread is interrupted.");
               running = false;
               continue;
             }
@@ -939,7 +932,7 @@ class BlockReceiver implements Closeable
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (lastPacketInBlock) {
-              BlockReceiver.this.close();
+              receiver.close();
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
@@ -947,12 +940,13 @@ class BlockReceiver implements Closeable
               if (ClientTraceLog.isInfoEnabled() && isClient) {
                 long offset = 0;
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                      inAddr, myAddr, block.getNumBytes(),
-                      "HDFS_WRITE", clientname, offset,
+                      receiver.inAddr, receiver.myAddr, block.getNumBytes(),
+                      "HDFS_WRITE", receiver.clientname, offset,
                       datanode.dnRegistration.getStorageID(), block, endTime-startTime));
               } else {
-                LOG.info("Received block " + block + " of size "
-                    + block.getNumBytes() + " from " + inAddr);
+                LOG.info("Received block " + block + 
+                         " of size " + block.getNumBytes() + 
+                         " from " + receiver.inAddr);
               }
             }
 
@@ -963,7 +957,7 @@ class BlockReceiver implements Closeable
               replies[0] = SUCCESS;
               replies[1] = ERROR;
             } else {
-              short ackLen = isLastDatanode() ? 0 : ack.getNumOfReplies();
+              short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
               replies = new Status[1+ackLen];
               replies[0] = SUCCESS;
               for (int i=0; i<ackLen; i++) {
@@ -973,18 +967,20 @@ class BlockReceiver implements Closeable
             PipelineAck replyAck = new PipelineAck(expected, replies);
             
             // send my ack back to upstream datanode
-            replyAck.write(upstreamOut);
-            upstreamOut.flush();
+            replyAck.write(replyOut);
+            replyOut.flush();
             if (LOG.isDebugEnabled()) {
-              LOG.debug(myString + ", replyAck=" + replyAck);
+              LOG.debug("PacketResponder " + numTargets + 
+                        " for block " + block +
+                        " responded an ack: " + replyAck);
             }
             if (pkt != null) {
               // remove the packet from the ack queue
               removeAckHead();
               // update bytes acked
               if (replyAck.isSuccess() && 
-                  pkt.offsetInBlock > replicaInfo.getBytesAcked()) {
-                replicaInfo.setBytesAcked(pkt.offsetInBlock);
+                  pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+                replicaInfo.setBytesAcked(pkt.lastByteInBlock);
               }
             }
         } catch (IOException e) {
@@ -995,7 +991,8 @@ class BlockReceiver implements Closeable
             } catch (IOException ioe) {
               LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
             }
-            LOG.info(myString, e);
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
             running = false;
             if (!Thread.interrupted()) { // failure not caused by interruption
               receiverThread.interrupt();
@@ -1003,13 +1000,15 @@ class BlockReceiver implements Closeable
           }
         } catch (Throwable e) {
           if (running) {
-            LOG.info(myString, e);
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
             running = false;
             receiverThread.interrupt();
           }
         }
       }
-      LOG.info(myString + " terminating");
+      LOG.info("PacketResponder " + numTargets + 
+               " for block " + block + " terminating");
     }
     
     /**
@@ -1026,23 +1025,15 @@ class BlockReceiver implements Closeable
   /**
    * This information is cached by the Datanode in the ackQueue.
    */
-  private static class Packet {
-    final long seqno;
-    final boolean lastPacketInBlock;
-    final long offsetInBlock;
+  static private class Packet {
+    long seqno;
+    boolean lastPacketInBlock;
+    long lastByteInBlock;
 
-    Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
+    Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
       this.seqno = seqno;
       this.lastPacketInBlock = lastPacketInBlock;
-      this.offsetInBlock = offsetInBlock;
-    }
-
-    @Override
-    public String toString() {
-      return getClass().getSimpleName() + "(seqno=" + seqno
-        + ", lastPacketInBlock=" + lastPacketInBlock
-        + ", offsetInBlock=" + offsetInBlock
-        + ")";
+      this.lastByteInBlock = lastByteInPacket;
     }
   }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1086461&r1=1086460&r2=1086461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue
Mar 29 01:36:22 2011
@@ -153,9 +153,24 @@ class DataXceiver extends DataTransferPr
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-    checkAccess(out, block, blockToken,
-        DataTransferProtocol.Op.READ_BLOCK,
-        BlockTokenSecretManager.AccessMode.READ);
+
+    if (datanode.isBlockTokenEnabled) {
+      try {
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.READ);
+      } catch (InvalidToken e) {
+        try {
+          ERROR_ACCESS_TOKEN.write(out);
+          out.flush();
+          LOG.warn("Block token verification failed, for client "
+              + remoteAddress + " for OP_READ_BLOCK for block " + block + " : "
+              + e.getLocalizedMessage());
+          throw e;
+        } finally {
+          IOUtils.closeStream(out);
+        }
+      }
+    }
   
     // send the block
     BlockSender blockSender = null;
@@ -246,14 +261,30 @@ class DataXceiver extends DataTransferPr
              " src: " + remoteAddress +
              " dest: " + localAddress);
 
-    // reply to upstream datanode or client 
-    final DataOutputStream replyOut = new DataOutputStream(
-        new BufferedOutputStream(
-            NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
-            SMALL_BUFFER_SIZE));
-    checkAccess(isClient? replyOut: null, block, blockToken,
-        DataTransferProtocol.Op.WRITE_BLOCK,
-        BlockTokenSecretManager.AccessMode.WRITE);
+    DataOutputStream replyOut = null;   // stream to prev target
+    replyOut = new DataOutputStream(new BufferedOutputStream(
+                   NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
+                   SMALL_BUFFER_SIZE));
+    if (datanode.isBlockTokenEnabled) {
+      try {
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.WRITE);
+      } catch (InvalidToken e) {
+        try {
+          if (isClient) {
+            ERROR_ACCESS_TOKEN.write(replyOut);
+            Text.writeString(replyOut, datanode.dnRegistration.getName());
+            replyOut.flush();
+          }
+          LOG.warn("Block token verification failed, for client "
+              + remoteAddress + " for OP_WRITE_BLOCK for block " + block
+              + " : " + e.getLocalizedMessage());
+          throw e;
+        } finally {
+          IOUtils.closeStream(replyOut);
+        }
+      }
+    }
 
     DataOutputStream mirrorOut = null;  // stream to next target
     DataInputStream mirrorIn = null;    // reply from next target
@@ -276,7 +307,8 @@ class DataXceiver extends DataTransferPr
       }
 
       //
-      // Connect to downstream machine, if appropriate
+      // Open network conn to backup machine, if 
+      // appropriate
       //
       if (targets.length > 0) {
         InetSocketAddress mirrorTarget = null;
@@ -298,6 +330,7 @@ class DataXceiver extends DataTransferPr
                          SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
 
+          // Write header: Copied from DFSClient.java!
           DataTransferProtocol.Sender.opWriteBlock(mirrorOut, originalBlock,
               pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
               srcDataNode, targets, blockToken);
@@ -358,7 +391,7 @@ class DataXceiver extends DataTransferPr
       if (blockReceiver != null) {
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets);
+            mirrorAddr, null, targets.length);
       }
 
       // update its generation stamp
@@ -406,11 +439,26 @@ class DataXceiver extends DataTransferPr
   @Override
   protected void opBlockChecksum(DataInputStream in, Block block,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
-    final DataOutputStream out = new DataOutputStream(
-        NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-    checkAccess(out, block, blockToken,
-        DataTransferProtocol.Op.BLOCK_CHECKSUM,
-        BlockTokenSecretManager.AccessMode.READ);
+    DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
+        datanode.socketWriteTimeout));
+    if (datanode.isBlockTokenEnabled) {
+      try {
+        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
+            BlockTokenSecretManager.AccessMode.READ);
+      } catch (InvalidToken e) {
+        try {
+          ERROR_ACCESS_TOKEN.write(out);
+          out.flush();
+          LOG.warn("Block token verification failed, for client "
+              + remoteAddress + " for OP_BLOCK_CHECKSUM for block " + block
+              + " : " + e.getLocalizedMessage());
+          throw e;
+        } finally {
+          IOUtils.closeStream(out);
+        }
+
+      }
+    }
 
     updateCurrentThreadName("Reading metadata for block " + block);
     final MetaDataInputStream metadataIn = 
@@ -601,7 +649,7 @@ class DataXceiver extends DataTransferPr
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 
-          dataXceiverServer.balanceThrottler, null);
+          dataXceiverServer.balanceThrottler, -1);
                     
       // notify name node
       datanode.notifyNamenodeReceivedBlock(block, sourceID);
@@ -665,32 +713,4 @@ class DataXceiver extends DataTransferPr
       IOUtils.closeStream(reply);
     }
   }
-
-  private void checkAccess(final DataOutputStream out, 
-      final Block blk,
-      final Token<BlockTokenIdentifier> t,
-      final DataTransferProtocol.Op op,
-      final BlockTokenSecretManager.AccessMode mode) throws IOException {
-    if (datanode.isBlockTokenEnabled) {
-      try {
-        datanode.blockTokenSecretManager.checkAccess(t, null, blk, mode);
-      } catch(InvalidToken e) {
-        try {
-          if (out != null) {
-            ERROR_ACCESS_TOKEN.write(out);
-            if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
-              Text.writeString(out, datanode.dnRegistration.getName());
-            }
-            out.flush();
-          }
-          LOG.warn("Block token verification failed: op=" + op
-              + ", remoteAddress=" + remoteAddress
-              + ", message=" + e.getLocalizedMessage());
-          throw e;
-        } finally {
-          IOUtils.closeStream(out);
-        }
-      }
-    }
-  }
 }



Mime
View raw message