hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1467538 [2/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/net/ src/main/java/org/apache/hadoop/hdfs/protocol/datatrans...
Date Sat, 13 Apr 2013 02:14:01 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Sat Apr 13 02:13:59 2013
@@ -29,6 +29,8 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
@@ -54,8 +56,8 @@ import org.apache.hadoop.util.DataChecks
 @InterfaceAudience.Private
 @Deprecated
 public class RemoteBlockReader extends FSInputChecker implements BlockReader {
-
-  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  private final Peer peer;
+  private final DatanodeID datanodeID;
   private final DataInputStream in;
   private DataChecksum checksum;
 
@@ -125,9 +127,9 @@ public class RemoteBlockReader extends F
     // if eos was set in the previous read, send a status code to the DN
     if (eos && !eosBefore && nRead >= 0) {
       if (needChecksum()) {
-        sendReadResult(dnSock, Status.CHECKSUM_OK);
+        sendReadResult(peer, Status.CHECKSUM_OK);
       } else {
-        sendReadResult(dnSock, Status.SUCCESS);
+        sendReadResult(peer, Status.SUCCESS);
       }
     }
     return nRead;
@@ -321,7 +323,8 @@ public class RemoteBlockReader extends F
   
   private RemoteBlockReader(String file, String bpid, long blockId,
       DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+      DatanodeID datanodeID) {
     // Path is used only for printing block and file information in debug
     super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
           1, verifyChecksum,
@@ -329,7 +332,8 @@ public class RemoteBlockReader extends F
           checksum.getBytesPerChecksum(),
           checksum.getChecksumSize());
     
-    this.dnSock = dnSock;
+    this.peer = peer;
+    this.datanodeID = datanodeID;
     this.in = in;
     this.checksum = checksum;
     this.startOffset = Math.max( startOffset, 0 );
@@ -348,13 +352,6 @@ public class RemoteBlockReader extends F
     checksumSize = this.checksum.getChecksumSize();
   }
 
-  public static RemoteBlockReader newBlockReader(Socket sock, String file,
-      ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len, int bufferSize) throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset,
-        len, bufferSize, true, "");
-  }
-
   /**
    * Create a new BlockReader specifically to satisfy a read.
    * This method also sends the OP_READ_BLOCK request.
@@ -370,16 +367,17 @@ public class RemoteBlockReader extends F
    * @param clientName  Client name
    * @return New BlockReader instance, or null on error.
    */
-  public static RemoteBlockReader newBlockReader( Socket sock, String file,
+  public static RemoteBlockReader newBlockReader(String file,
                                      ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
-                                     String clientName)
+                                     String clientName, Peer peer,
+                                     DatanodeID datanodeID)
                                      throws IOException {
     // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
+    final DataOutputStream out =
+        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
         verifyChecksum);
     
@@ -388,12 +386,11 @@ public class RemoteBlockReader extends F
     //
 
     DataInputStream in = new DataInputStream(
-        new BufferedInputStream(NetUtils.getInputStream(sock), 
-                                bufferSize));
+        new BufferedInputStream(peer.getInputStream(), bufferSize));
     
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         PBHelper.vintPrefixed(in));
-    RemoteBlockReader2.checkSuccess(status, sock, block, file);
+    RemoteBlockReader2.checkSuccess(status, peer, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -411,15 +408,19 @@ public class RemoteBlockReader extends F
     }
 
     return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
-        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
+        peer, datanodeID);
   }
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close(PeerCache peerCache,
+      FileInputStreamCache fisCache) throws IOException {
     startOffset = -1;
     checksum = null;
-    if (dnSock != null) {
-      dnSock.close();
+    if (peerCache != null & sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
     }
 
     // in will be closed when its Socket is closed.
@@ -436,37 +437,21 @@ public class RemoteBlockReader extends F
     return readFully(this, buf, offset, len);
   }
 
-  @Override
-  public Socket takeSocket() {
-    assert hasSentStatusCode() :
-      "BlockReader shouldn't give back sockets mid-read";
-    Socket res = dnSock;
-    dnSock = null;
-    return res;
-  }
-
-  @Override
-  public boolean hasSentStatusCode() {
-    return sentStatusCode;
-  }
-
   /**
    * When the reader reaches end of the read, it sends a status response
    * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
    * closing our connection (which we will re-open), but won't affect
    * data correctness.
    */
-  void sendReadResult(Socket sock, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + sock;
+  void sendReadResult(Peer peer, Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + peer;
     try {
-      RemoteBlockReader2.writeReadResult(
-          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
-          statusCode);
+      RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               sock.getInetAddress() + ": " + e.getMessage());
+               peer.getRemoteAddressString() + ": " + e.getMessage());
     }
   }
   
@@ -486,12 +471,11 @@ public class RemoteBlockReader extends F
   public int read(ByteBuffer buf) throws IOException {
     throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
   }
-
+  
   @Override
-  public IOStreamPair getStreams() {
-    // This class doesn't support encryption, which is the only thing this
-    // method is used for. See HDFS-3637.
-    return null;
+  public int available() throws IOException {
+    // An optimistic estimate of how much data is available
+    // to us without doing network I/O.
+    return DFSClient.TCP_WINDOW_SIZE;
   }
-
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Sat Apr 13 02:13:59 2013
@@ -23,16 +23,16 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -44,10 +44,11 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This is a wrapper around connection to datanode
  * and understands checksum, offset etc.
@@ -79,9 +80,8 @@ public class RemoteBlockReader2  impleme
 
   static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
   
-  Socket dnSock;
-  // for now just sending the status code (e.g. checksumOk) after the read.
-  private IOStreamPair ioStreams;
+  final private Peer peer;
+  final private DatanodeID datanodeID;
   private final ReadableByteChannel in;
   private DataChecksum checksum;
   
@@ -114,6 +114,11 @@ public class RemoteBlockReader2  impleme
   /** Amount of unread data in the current received packet */
   int dataLeft = 0;
   
+  @VisibleForTesting
+  public Peer getPeer() {
+    return peer;
+  }
+  
   @Override
   public synchronized int read(byte[] buf, int off, int len) 
                                throws IOException {
@@ -246,13 +251,13 @@ public class RemoteBlockReader2  impleme
   }
 
   protected RemoteBlockReader2(String file, String bpid, long blockId,
-      ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
-      IOStreamPair ioStreams) {
+      DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+      DatanodeID datanodeID) {
     // Path is used only for printing block and file information in debug
-    this.dnSock = dnSock;
-    this.ioStreams = ioStreams;
-    this.in = in;
+    this.peer = peer;
+    this.datanodeID = datanodeID;
+    this.in = peer.getInputStreamChannel();
     this.checksum = checksum;
     this.verifyChecksum = verifyChecksum;
     this.startOffset = Math.max( startOffset, 0 );
@@ -269,54 +274,35 @@ public class RemoteBlockReader2  impleme
 
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close(PeerCache peerCache,
+      FileInputStreamCache fisCache) throws IOException {
     packetReceiver.close();
-    
     startOffset = -1;
     checksum = null;
-    if (dnSock != null) {
-      dnSock.close();
+    if (peerCache != null && sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
     }
 
     // in will be closed when its Socket is closed.
   }
   
   /**
-   * Take the socket used to talk to the DN.
-   */
-  @Override
-  public Socket takeSocket() {
-    assert hasSentStatusCode() :
-      "BlockReader shouldn't give back sockets mid-read";
-    Socket res = dnSock;
-    dnSock = null;
-    return res;
-  }
-
-  /**
-   * Whether the BlockReader has reached the end of its input stream
-   * and successfully sent a status code back to the datanode.
-   */
-  @Override
-  public boolean hasSentStatusCode() {
-    return sentStatusCode;
-  }
-
-  /**
    * When the reader reaches end of the read, it sends a status response
    * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
    * closing our connection (which we will re-open), but won't affect
    * data correctness.
    */
   void sendReadResult(Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + dnSock;
+    assert !sentStatusCode : "already sent status code to " + peer;
     try {
-      writeReadResult(ioStreams.out, statusCode);
+      writeReadResult(peer.getOutputStream(), statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               dnSock.getInetAddress() + ": " + e.getMessage());
+               peer.getRemoteAddressString() + ": " + e.getMessage());
     }
   }
 
@@ -367,42 +353,34 @@ public class RemoteBlockReader2  impleme
    * @param blockToken  The block token for security
    * @param startOffset  The read offset, relative to block head
    * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
    * @param verifyChecksum  Whether to verify checksum
    * @param clientName  Client name
+   * @param peer  The Peer to use
+   * @param datanodeID  The DatanodeID this peer is connected to
    * @return New BlockReader instance, or null on error.
    */
-  public static BlockReader newBlockReader(Socket sock, String file,
+  public static BlockReader newBlockReader(String file,
                                      ExtendedBlock block,
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum,
+                                     boolean verifyChecksum,
                                      String clientName,
-                                     DataEncryptionKey encryptionKey,
-                                     IOStreamPair ioStreams)
+                                     Peer peer, DatanodeID datanodeID)
                                      throws IOException {
-    
-    ReadableByteChannel ch;
-    if (ioStreams.in instanceof SocketInputWrapper) {
-      ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
-    } else {
-      ch = (ReadableByteChannel) ioStreams.in;
-    }
-    
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          ioStreams.out));
+          peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
         verifyChecksum);
 
     //
     // Get bytes in block
     //
-    DataInputStream in = new DataInputStream(ioStreams.in);
+    DataInputStream in = new DataInputStream(peer.getInputStream());
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         PBHelper.vintPrefixed(in));
-    checkSuccess(status, sock, block, file);
+    checkSuccess(status, peer, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -420,34 +398,36 @@ public class RemoteBlockReader2  impleme
     }
 
     return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
-        ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
-        ioStreams);
+        checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
+        datanodeID);
   }
 
   static void checkSuccess(
-      BlockOpResponseProto status, Socket sock,
+      BlockOpResponseProto status, Peer peer,
       ExtendedBlock block, String file)
       throws IOException {
     if (status.getStatus() != Status.SUCCESS) {
       if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
         throw new InvalidBlockTokenException(
             "Got access token error for OP_READ_BLOCK, self="
-                + sock.getLocalSocketAddress() + ", remote="
-                + sock.getRemoteSocketAddress() + ", for file " + file
+                + peer.getLocalAddressString() + ", remote="
+                + peer.getRemoteAddressString() + ", for file " + file
                 + ", for pool " + block.getBlockPoolId() + " block " 
                 + block.getBlockId() + "_" + block.getGenerationStamp());
       } else {
         throw new IOException("Got error for OP_READ_BLOCK, self="
-            + sock.getLocalSocketAddress() + ", remote="
-            + sock.getRemoteSocketAddress() + ", for file " + file
+            + peer.getLocalAddressString() + ", remote="
+            + peer.getRemoteAddressString() + ", for file " + file
             + ", for pool " + block.getBlockPoolId() + " block " 
             + block.getBlockId() + "_" + block.getGenerationStamp());
       }
     }
   }
-
+  
   @Override
-  public IOStreamPair getStreams() {
-    return ioStreams;
+  public int available() throws IOException {
+    // An optimistic estimate of how much data is available
+    // to us without doing network I/O.
+    return DFSClient.TCP_WINDOW_SIZE;
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Sat Apr 13 02:13:59 2013
@@ -108,6 +108,18 @@ public interface DataTransferProtocol {
       final DatanodeInfo[] targets) throws IOException;
 
   /**
+   * Request short circuit access file descriptors from a DataNode.
+   *
+   * @param blk             The block to get file descriptors for.
+   * @param blockToken      Security token for accessing the block.
+   * @param maxVersion      Maximum version of the block data the client 
+   *                        can understand.
+   */
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      int maxVersion) throws IOException;
+
+  /**
    * Receive a block from a source datanode
    * and then notifies the namenode
    * to remove the copy from the original datanode.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java Sat Apr 13 02:13:59 2013
@@ -34,7 +34,8 @@ public enum Op {
   REPLACE_BLOCK((byte)83),
   COPY_BLOCK((byte)84),
   BLOCK_CHECKSUM((byte)85),
-  TRANSFER_BLOCK((byte)86);
+  TRANSFER_BLOCK((byte)86),
+  REQUEST_SHORT_CIRCUIT_FDS((byte)87);
 
   /** The code for this operation. */
   public final byte code;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Sat Apr 13 02:13:59 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 
@@ -76,6 +77,9 @@ public abstract class Receiver implement
     case TRANSFER_BLOCK:
       opTransferBlock(in);
       break;
+    case REQUEST_SHORT_CIRCUIT_FDS:
+      opRequestShortCircuitFds(in);
+      break;
     default:
       throw new IOException("Unknown op " + op + " in data stream");
     }
@@ -117,6 +121,15 @@ public abstract class Receiver implement
         PBHelper.convert(proto.getTargetsList()));
   }
 
+  /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
+  private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
+    final OpRequestShortCircuitAccessProto proto =
+      OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
+    requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
+        PBHelper.convert(proto.getHeader().getToken()),
+        proto.getMaxVersion());
+  }
+
   /** Receive OP_REPLACE_BLOCK */
   private void opReplaceBlock(DataInputStream in) throws IOException {
     OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Sat Apr 13 02:13:59 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -141,6 +142,17 @@ public class Sender implements DataTrans
   }
 
   @Override
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      int maxVersion) throws IOException {
+    OpRequestShortCircuitAccessProto proto =
+        OpRequestShortCircuitAccessProto.newBuilder()
+          .setHeader(DataTransferProtoUtil.buildBaseHeader(
+            blk, blockToken)).setMaxVersion(maxVersion).build();
+    send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
+  }
+  
+  @Override
   public void replaceBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Sat Apr 13 02:13:59 2013
@@ -45,6 +45,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -208,9 +210,12 @@ public class JspHelper {
       // Use the block name for file name. 
     String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
     BlockReader blockReader = BlockReaderFactory.newBlockReader(
-        conf, s, file,
+        conf, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead, encryptionKey);
+        offsetIntoBlock, amtToRead,  true,
+        "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
+        new DatanodeID(addr.getAddress().toString(),              
+            addr.getHostName(), poolId, addr.getPort(), 0, 0), null, false);
         
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
@@ -229,8 +234,7 @@ public class JspHelper {
       amtToRead -= numRead;
       readOffset += numRead;
     }
-    blockReader = null;
-    s.close();
+    blockReader.close(null, null);
     out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8)));
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Apr 13 02:13:59 2013
@@ -53,19 +53,18 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.AbstractList;
@@ -93,6 +92,8 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.net.DomainPeerServer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -151,11 +152,11 @@ import org.apache.hadoop.io.ReadaheadPoo
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -235,6 +236,7 @@ public class DataNode extends Configured
     LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
   
   private static final String USAGE = "Usage: java DataNode [-rollback | -regular]";
+  static final int CURRENT_BLOCK_FORMAT_VERSION = 1;
 
   /**
    * Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -252,6 +254,7 @@ public class DataNode extends Configured
   public final static String EMPTY_DEL_HINT = "";
   AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
+  Daemon localDataXceiverServer = null;
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
   private volatile boolean heartbeatsDisabledForTests = false;
@@ -263,6 +266,7 @@ public class DataNode extends Configured
   private String hostName;
   private DatanodeID id;
   
+  final private String fileDescriptorPassingDisabledReason;
   boolean isBlockTokenEnabled;
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   private boolean hasAnyBlockPoolRegistered = false;
@@ -311,6 +315,24 @@ public class DataNode extends Configured
     this.getHdfsBlockLocationsEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+
+    // Determine whether we should try to pass file descriptors to clients.
+    if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+              DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
+      String reason = DomainSocket.getLoadingFailureReason();
+      if (reason != null) {
+        LOG.warn("File descriptor passing is disabled because " + reason);
+        this.fileDescriptorPassingDisabledReason = reason;
+      } else {
+        LOG.info("File descriptor passing is enabled.");
+        this.fileDescriptorPassingDisabledReason = null;
+      }
+    } else {
+      this.fileDescriptorPassingDisabledReason =
+          "File descriptor passing was not configured.";
+      LOG.debug(this.fileDescriptorPassingDisabledReason);
+    }
+
     try {
       hostName = getHostName(conf);
       LOG.info("Configured hostname is " + hostName);
@@ -525,25 +547,63 @@ public class DataNode extends Configured
   
   private void initDataXceiver(Configuration conf) throws IOException {
     // find free port or use privileged port provided
-    ServerSocket ss;
-    if (secureResources == null) {
-      InetSocketAddress addr = DataNode.getStreamingAddr(conf);
-      ss = (dnConf.socketWriteTimeout > 0) ? 
-          ServerSocketChannel.open().socket() : new ServerSocket();
-          Server.bind(ss, addr, 0);
+    TcpPeerServer tcpPeerServer;
+    if (secureResources != null) {
+      tcpPeerServer = new TcpPeerServer(secureResources);
     } else {
-      ss = secureResources.getStreamingSocket();
+      tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
+          DataNode.getStreamingAddr(conf));
     }
-    ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); 
-
-    streamingAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
-                                     ss.getLocalPort());
-
+    tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+    streamingAddr = tcpPeerServer.getStreamingAddr();
     LOG.info("Opened streaming server at " + streamingAddr);
     this.threadGroup = new ThreadGroup("dataXceiverServer");
     this.dataXceiverServer = new Daemon(threadGroup, 
-        new DataXceiverServer(ss, conf, this));
+        new DataXceiverServer(tcpPeerServer, conf, this));
     this.threadGroup.setDaemon(true); // auto destroy when empty
+
+    if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+              DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
+        conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+              DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
+      DomainPeerServer domainPeerServer =
+                getDomainPeerServer(conf, streamingAddr.getPort());
+      if (domainPeerServer != null) {
+        this.localDataXceiverServer = new Daemon(threadGroup,
+            new DataXceiverServer(domainPeerServer, conf, this));
+        LOG.info("Listening on UNIX domain socket: " +
+            domainPeerServer.getBindPath());
+      }
+    }
+  }
+
+  static DomainPeerServer getDomainPeerServer(Configuration conf,
+      int port) throws IOException {
+    String domainSocketPath =
+        conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+            DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
+    if (domainSocketPath.isEmpty()) {
+      if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) &&
+         (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
+        LOG.warn("Although short-circuit local reads are configured, " +
+            "they are disabled because you didn't configure " +
+            DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY);
+      }
+      return null;
+    }
+    if (DomainSocket.getLoadingFailureReason() != null) {
+      throw new RuntimeException("Although a UNIX domain socket " +
+          "path is configured as " + domainSocketPath + ", we cannot " +
+          "start a localDataXceiverServer because " +
+          DomainSocket.getLoadingFailureReason());
+    }
+    DomainPeerServer domainPeerServer =
+      new DomainPeerServer(domainSocketPath, port);
+    domainPeerServer.setReceiveBufferSize(
+        HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+    return domainPeerServer;
   }
   
   // calls specific to BP
@@ -1044,6 +1104,42 @@ public class DataNode extends Configured
     return info;
   }
 
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  static public class ShortCircuitFdsUnsupportedException extends IOException {
+    private static final long serialVersionUID = 1L;
+    public ShortCircuitFdsUnsupportedException(String msg) {
+      super(msg);
+    }
+  }
+
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  static public class ShortCircuitFdsVersionException extends IOException {
+    private static final long serialVersionUID = 1L;
+    public ShortCircuitFdsVersionException(String msg) {
+      super(msg);
+    }
+  }
+
+  FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> token, int maxVersion) 
+          throws ShortCircuitFdsUnsupportedException,
+            ShortCircuitFdsVersionException, IOException {
+    if (fileDescriptorPassingDisabledReason != null) {
+      throw new ShortCircuitFdsUnsupportedException(
+          fileDescriptorPassingDisabledReason);
+    }
+    checkBlockToken(blk, token, BlockTokenSecretManager.AccessMode.READ);
+    int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
+    if (maxVersion < blkVersion) {
+      throw new ShortCircuitFdsVersionException("Your client is too old " +
+        "to read this block!  Its format version is " + 
+        blkVersion + ", but the highest format version you can read is " +
+        maxVersion);
+    }
+    metrics.incrBlocksGetLocalPathInfo();
+    return data.getShortCircuitFdsForRead(blk);
+  }
+
   @Override
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
       List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
@@ -1118,32 +1214,45 @@ public class DataNode extends Configured
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
       this.dataXceiverServer.interrupt();
-
-      // wait for all data receiver threads to exit
-      if (this.threadGroup != null) {
-        int sleepMs = 2;
-        while (true) {
-          this.threadGroup.interrupt();
-          LOG.info("Waiting for threadgroup to exit, active threads is " +
-                   this.threadGroup.activeCount());
-          if (this.threadGroup.activeCount() == 0) {
-            break;
-          }
-          try {
-            Thread.sleep(sleepMs);
-          } catch (InterruptedException e) {}
-          sleepMs = sleepMs * 3 / 2; // exponential backoff
-          if (sleepMs > 1000) {
-            sleepMs = 1000;
-          }
+    }
+    if (localDataXceiverServer != null) {
+      ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
+      this.localDataXceiverServer.interrupt();
+    }
+    // wait for all data receiver threads to exit
+    if (this.threadGroup != null) {
+      int sleepMs = 2;
+      while (true) {
+        this.threadGroup.interrupt();
+        LOG.info("Waiting for threadgroup to exit, active threads is " +
+                 this.threadGroup.activeCount());
+        if (this.threadGroup.activeCount() == 0) {
+          break;
+        }
+        try {
+          Thread.sleep(sleepMs);
+        } catch (InterruptedException e) {}
+        sleepMs = sleepMs * 3 / 2; // exponential backoff
+        if (sleepMs > 1000) {
+          sleepMs = 1000;
         }
       }
-      // wait for dataXceiveServer to terminate
+      this.threadGroup = null;
+    }
+    if (this.dataXceiverServer != null) {
+      // wait for dataXceiverServer to terminate
       try {
         this.dataXceiverServer.join();
       } catch (InterruptedException ie) {
       }
     }
+    if (this.localDataXceiverServer != null) {
+      // wait for localDataXceiverServer to terminate
+      try {
+        this.localDataXceiverServer.join();
+      } catch (InterruptedException ie) {
+      }
+    }
     
     if(blockPoolManager != null) {
       try {
@@ -1538,6 +1647,9 @@ public class DataNode extends Configured
 
     // start dataXceiveServer
     dataXceiverServer.start();
+    if (localDataXceiverServer != null) {
+      localDataXceiverServer.start();
+    }
     ipcServer.start();
     startPlugins(conf);
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Sat Apr 13 02:13:59 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
 import static org.apache.hadoop.util.Time.now;
@@ -28,6 +29,8 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
@@ -39,6 +42,7 @@ import java.nio.channels.ClosedChannelEx
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -59,12 +63,13 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -79,8 +84,7 @@ class DataXceiver extends Receiver imple
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
-  private final Socket s;
-  private final boolean isLocal; //is a local connection?
+  private final Peer peer;
   private final String remoteAddress; // address of remote side
   private final String localAddress;  // local address of this daemon
   private final DataNode datanode;
@@ -88,7 +92,7 @@ class DataXceiver extends Receiver imple
   private final DataXceiverServer dataXceiverServer;
   private final boolean connectToDnViaHostname;
   private long opStartTime; //the start time of receiving an Op
-  private final SocketInputWrapper socketIn;
+  private final InputStream socketIn;
   private OutputStream socketOut;
 
   /**
@@ -97,25 +101,23 @@ class DataXceiver extends Receiver imple
    */
   private String previousOpClientName;
   
-  public static DataXceiver create(Socket s, DataNode dn,
+  public static DataXceiver create(Peer peer, DataNode dn,
       DataXceiverServer dataXceiverServer) throws IOException {
-    return new DataXceiver(s, dn, dataXceiverServer);
+    return new DataXceiver(peer, dn, dataXceiverServer);
   }
   
-  private DataXceiver(Socket s, 
-      DataNode datanode, 
+  private DataXceiver(Peer peer, DataNode datanode,
       DataXceiverServer dataXceiverServer) throws IOException {
 
-    this.s = s;
+    this.peer = peer;
     this.dnConf = datanode.getDnConf();
-    this.socketIn = NetUtils.getInputStream(s);
-    this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout);
-    this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
+    this.socketIn = peer.getInputStream();
+    this.socketOut = peer.getOutputStream();
     this.datanode = datanode;
     this.dataXceiverServer = dataXceiverServer;
     this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
-    remoteAddress = s.getRemoteSocketAddress().toString();
-    localAddress = s.getLocalSocketAddress().toString();
+    remoteAddress = peer.getRemoteAddressString();
+    localAddress = peer.getLocalAddressString();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Number of active connections is: "
@@ -155,11 +157,10 @@ class DataXceiver extends Receiver imple
   public void run() {
     int opsProcessed = 0;
     Op op = null;
-    
-    dataXceiverServer.childSockets.add(s);
-    
+
+    dataXceiverServer.addPeer(peer);
     try {
-      
+      peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
       if (dnConf.encryptDataTransfer) {
         IOStreamPair encryptedStreams = null;
@@ -169,8 +170,9 @@ class DataXceiver extends Receiver imple
               dnConf.encryptionAlgorithm);
         } catch (InvalidMagicNumberException imne) {
           LOG.info("Failed to read expected encryption handshake from client " +
-              "at " + s.getInetAddress() + ". Perhaps the client is running an " +
-              "older version of Hadoop which does not support encryption");
+              "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
+              "is running an older version of Hadoop which does not support " +
+              "encryption");
           return;
         }
         input = encryptedStreams.in;
@@ -189,9 +191,9 @@ class DataXceiver extends Receiver imple
         try {
           if (opsProcessed != 0) {
             assert dnConf.socketKeepaliveTimeout > 0;
-            socketIn.setTimeout(dnConf.socketKeepaliveTimeout);
+            peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
           } else {
-            socketIn.setTimeout(dnConf.socketTimeout);
+            peer.setReadTimeout(dnConf.socketTimeout);
           }
           op = readOp();
         } catch (InterruptedIOException ignored) {
@@ -202,7 +204,7 @@ class DataXceiver extends Receiver imple
           if (opsProcessed > 0 &&
               (err instanceof EOFException || err instanceof ClosedChannelException)) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops");
+              LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
             }
           } else {
             throw err;
@@ -212,13 +214,13 @@ class DataXceiver extends Receiver imple
 
         // restore normal timeout
         if (opsProcessed != 0) {
-          s.setSoTimeout(dnConf.socketTimeout);
+          peer.setReadTimeout(dnConf.socketTimeout);
         }
 
         opStartTime = now();
         processOp(op);
         ++opsProcessed;
-      } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0);
+      } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0);
     } catch (Throwable t) {
       LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " +
                 ((op == null) ? "unknown" : op.name()) + " operation " +
@@ -230,9 +232,70 @@ class DataXceiver extends Receiver imple
             + datanode.getXceiverCount());
       }
       updateCurrentThreadName("Cleaning up");
+      dataXceiverServer.closePeer(peer);
       IOUtils.closeStream(in);
-      IOUtils.closeSocket(s);
-      dataXceiverServer.childSockets.remove(s);
+    }
+  }
+
+  @Override
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> token,
+      int maxVersion) throws IOException {
+    updateCurrentThreadName("Passing file descriptors for block " + blk);
+    BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
+    FileInputStream fis[] = null;
+    try {
+      if (peer.getDomainSocket() == null) {
+        throw new IOException("You cannot pass file descriptors over " +
+            "anything but a UNIX domain socket.");
+      }
+      fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
+      bld.setStatus(SUCCESS);
+      bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+    } catch (ShortCircuitFdsVersionException e) {
+      bld.setStatus(ERROR_UNSUPPORTED);
+      bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+      bld.setMessage(e.getMessage());
+    } catch (ShortCircuitFdsUnsupportedException e) {
+      bld.setStatus(ERROR_UNSUPPORTED);
+      bld.setMessage(e.getMessage());
+    } catch (InvalidToken e) {
+      bld.setStatus(ERROR_ACCESS_TOKEN);
+      bld.setMessage(e.getMessage());
+    } catch (IOException e) {
+      bld.setStatus(ERROR);
+      bld.setMessage(e.getMessage());
+    }
+    try {
+      bld.build().writeDelimitedTo(socketOut);
+      if (fis != null) {
+        FileDescriptor fds[] = new FileDescriptor[fis.length];
+        for (int i = 0; i < fds.length; i++) {
+          fds[i] = fis[i].getFD();
+        }
+        byte buf[] = new byte[] { (byte)0 };
+        peer.getDomainSocket().
+          sendFileDescriptors(fds, buf, 0, buf.length);
+      }
+    } finally {
+      if (ClientTraceLog.isInfoEnabled()) {
+        DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
+            .getBlockPoolId());
+        BlockSender.ClientTraceLog.info(String.format(
+          String.format(
+            "src: %s, dest: %s, op: %s, blockid: %s, srvID: %s, " +
+              "success: %b",
+            "127.0.0.1",                   // src IP
+            "127.0.0.1",                   // dst IP
+            "REQUEST_SHORT_CIRCUIT_FDS",   // operation
+            blk.getBlockId(),             // block id
+            dnR.getStorageID(),
+            (fis != null)
+          )));
+      }
+      if (fis != null) {
+        IOUtils.cleanup(LOG, fis);
+      }
     }
   }
 
@@ -287,8 +350,9 @@ class DataXceiver extends Receiver imple
           ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
               PBHelper.vintPrefixed(in));
           if (!stat.hasStatus()) {
-            LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
-                     "code after reading. Will close connection.");
+            LOG.warn("Client " + peer.getRemoteAddressString() +
+                " did not send a valid status code after reading. " +
+                "Will close connection.");
             IOUtils.closeStream(out);
           }
         } catch (IOException ioe) {
@@ -321,7 +385,7 @@ class DataXceiver extends Receiver imple
 
     //update metrics
     datanode.metrics.addReadBlockOp(elapsed());
-    datanode.metrics.incrReadsFromClient(isLocal);
+    datanode.metrics.incrReadsFromClient(peer.isLocal());
   }
 
   @Override
@@ -359,8 +423,8 @@ class DataXceiver extends Receiver imple
       LOG.debug("isDatanode=" + isDatanode
           + ", isClient=" + isClient
           + ", isTransfer=" + isTransfer);
-      LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
-                " tcp no delay " + s.getTcpNoDelay());
+      LOG.debug("writeBlock receive buf size " + peer.getReceiveBufferSize() +
+                " tcp no delay " + peer.getTcpNoDelay());
     }
 
     // We later mutate block's generation stamp and length, but we need to
@@ -391,8 +455,8 @@ class DataXceiver extends Receiver imple
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         // open a block receiver
         blockReceiver = new BlockReceiver(block, in, 
-            s.getRemoteSocketAddress().toString(),
-            s.getLocalSocketAddress().toString(),
+            peer.getRemoteAddressString(),
+            peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum);
       } else {
@@ -547,7 +611,7 @@ class DataXceiver extends Receiver imple
 
     //update metrics
     datanode.metrics.addWriteBlockOp(elapsed());
-    datanode.metrics.incrWritesFromClient(isLocal);
+    datanode.metrics.incrWritesFromClient(peer.isLocal());
   }
 
   @Override
@@ -555,7 +619,7 @@ class DataXceiver extends Receiver imple
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final DatanodeInfo[] targets) throws IOException {
-    checkAccess(null, true, blk, blockToken,
+    checkAccess(socketOut, true, blk, blockToken,
         Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
     previousOpClientName = clientName;
     updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
@@ -642,8 +706,9 @@ class DataXceiver extends Receiver imple
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
-      String msg = "Not able to copy block " + block.getBlockId() + " to " 
-      + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
+      String msg = "Not able to copy block " + block.getBlockId() + " " +
+          "to " + peer.getRemoteAddressString() + " because threads " +
+          "quota is exceeded.";
       LOG.info(msg);
       sendResponse(ERROR, msg);
       return;
@@ -672,7 +737,7 @@ class DataXceiver extends Receiver imple
       datanode.metrics.incrBytesRead((int) read);
       datanode.metrics.incrBlocksRead();
       
-      LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress());
+      LOG.info("Copied " + block + " to " + peer.getRemoteAddressString());
     } catch (IOException ioe) {
       isOpSuccess = false;
       LOG.info("opCopyBlock " + block + " received exception " + ioe);
@@ -717,8 +782,9 @@ class DataXceiver extends Receiver imple
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
-      String msg = "Not able to receive block " + block.getBlockId() + " from " 
-          + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
+      String msg = "Not able to receive block " + block.getBlockId() +
+          " from " + peer.getRemoteAddressString() + " because threads " +
+          "quota is exceeded.";
       LOG.warn(msg);
       sendResponse(ERROR, msg);
       return;
@@ -795,7 +861,7 @@ class DataXceiver extends Receiver imple
       // notify name node
       datanode.notifyNamenodeReceivedBlock(block, delHint);
 
-      LOG.info("Moved " + block + " from " + s.getRemoteSocketAddress());
+      LOG.info("Moved " + block + " from " + peer.getRemoteAddressString());
       
     } catch (IOException ioe) {
       opStatus = ERROR;
@@ -818,7 +884,7 @@ class DataXceiver extends Receiver imple
       try {
         sendResponse(opStatus, errMsg);
       } catch (IOException ioe) {
-        LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
+        LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
       }
       IOUtils.closeStream(proxyOut);
       IOUtils.closeStream(blockReceiver);
@@ -872,7 +938,7 @@ class DataXceiver extends Receiver imple
   }
   
 
-  private void checkAccess(DataOutputStream out, final boolean reply, 
+  private void checkAccess(OutputStream out, final boolean reply, 
       final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> t,
       final Op op,
@@ -887,11 +953,6 @@ class DataXceiver extends Receiver imple
       } catch(InvalidToken e) {
         try {
           if (reply) {
-            if (out == null) {
-              out = new DataOutputStream(
-                  NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
-            }
-            
             BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
               .setStatus(ERROR_ACCESS_TOKEN);
             if (mode == BlockTokenSecretManager.AccessMode.WRITE) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Sat Apr 13 02:13:59 2013
@@ -18,18 +18,16 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.channels.AsynchronousCloseException;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.PeerServer;
 import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
@@ -45,11 +43,9 @@ import org.apache.hadoop.util.Daemon;
 class DataXceiverServer implements Runnable {
   public static final Log LOG = DataNode.LOG;
   
-  ServerSocket ss;
-  DataNode datanode;
-  // Record all sockets opened for data transfer
-  Set<Socket> childSockets = Collections.synchronizedSet(
-                                       new HashSet<Socket>());
+  private final PeerServer peerServer;
+  private final DataNode datanode;
+  private final Set<Peer> peers = new HashSet<Peer>();
   
   /**
    * Maximal number of concurrent xceivers per node.
@@ -109,10 +105,10 @@ class DataXceiverServer implements Runna
   long estimateBlockSize;
   
   
-  DataXceiverServer(ServerSocket ss, Configuration conf, 
+  DataXceiverServer(PeerServer peerServer, Configuration conf,
       DataNode datanode) {
     
-    this.ss = ss;
+    this.peerServer = peerServer;
     this.datanode = datanode;
     
     this.maxXceiverCount = 
@@ -130,12 +126,10 @@ class DataXceiverServer implements Runna
 
   @Override
   public void run() {
+    Peer peer = null;
     while (datanode.shouldRun) {
-      Socket s = null;
       try {
-        s = ss.accept();
-        s.setTcpNoDelay(true);
-        // Timeouts are set within DataXceiver.run()
+        peer = peerServer.accept();
 
         // Make sure the xceiver count is not exceeded
         int curXceiverCount = datanode.getXceiverCount();
@@ -146,7 +140,7 @@ class DataXceiverServer implements Runna
         }
 
         new Daemon(datanode.threadGroup,
-            DataXceiver.create(s, datanode, this))
+            DataXceiver.create(peer, datanode, this))
             .start();
       } catch (SocketTimeoutException ignored) {
         // wake up to see if should continue to run
@@ -157,10 +151,10 @@ class DataXceiverServer implements Runna
           LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
         }
       } catch (IOException ie) {
-        IOUtils.closeSocket(s);
+        IOUtils.cleanup(null, peer);
         LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
       } catch (OutOfMemoryError ie) {
-        IOUtils.closeSocket(s);
+        IOUtils.cleanup(null, peer);
         // DataNode can run out of memory if there is too many transfers.
         // Log the event, Sleep for 30 seconds, other transfers may complete by
         // then.
@@ -176,33 +170,35 @@ class DataXceiverServer implements Runna
         datanode.shouldRun = false;
       }
     }
+    synchronized (this) {
+      for (Peer p : peers) {
+        IOUtils.cleanup(LOG, p);
+      }
+    }
     try {
-      ss.close();
+      peerServer.close();
     } catch (IOException ie) {
       LOG.warn(datanode.getDisplayName()
           + " :DataXceiverServer: close exception", ie);
     }
   }
-  
+
   void kill() {
     assert datanode.shouldRun == false :
       "shoudRun should be set to false before killing";
     try {
-      this.ss.close();
+      this.peerServer.close();
     } catch (IOException ie) {
       LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie);
     }
+  }
+  
+  synchronized void addPeer(Peer peer) {
+    peers.add(peer);
+  }
 
-    // close all the sockets that were accepted earlier
-    synchronized (childSockets) {
-      for (Iterator<Socket> it = childSockets.iterator();
-           it.hasNext();) {
-        Socket thissock = it.next();
-        try {
-          thissock.close();
-        } catch (IOException e) {
-        }
-      }
-    }
+  synchronized void closePeer(Peer peer) {
+    peers.remove(peer);
+    IOUtils.cleanup(null, peer);
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Sat Apr 13 02:13:59 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
 
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -386,4 +387,6 @@ public interface FsDatasetSpi<V extends 
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
       throws IOException;
 
+  FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+      throws IOException;
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Sat Apr 13 02:13:59 2013
@@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.DataChecksum;
@@ -1701,6 +1702,26 @@ class FsDatasetImpl implements FsDataset
   }
   
   @Override // FsDatasetSpi
+  public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block) 
+      throws IOException {
+    File datafile = getBlockFile(block);
+    File metafile = FsDatasetUtil.getMetaFile(datafile,
+        block.getGenerationStamp());
+    FileInputStream fis[] = new FileInputStream[2];
+    boolean success = false;
+    try {
+      fis[0] = new FileInputStream(datafile);
+      fis[1] = new FileInputStream(metafile);
+      success = true;
+      return fis;
+    } finally {
+      if (!success) {
+        IOUtils.cleanup(null, fis);
+      }
+    }
+  }
+    
+  @Override // FsDatasetSpi
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
       throws IOException {
     // List of VolumeIds, one per volume on the datanode

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Sat Apr 13 02:13:59 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.BlockReade
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -559,9 +560,10 @@ public class NamenodeFsck {
         String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
             block.getBlockId());
         blockReader = BlockReaderFactory.newBlockReader(
-            conf, s, file, block, lblock
-            .getBlockToken(), 0, -1,
-            namenode.getRpcServer().getDataEncryptionKey());
+            conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
+            TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
+                getDataEncryptionKey()),
+            chosenNode, null, false);
         
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1430995-1467533

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto Sat Apr 13 02:13:59 2013
@@ -74,6 +74,8 @@ message DeleteBlockPoolResponseProto {
  * Gets the file information where block and its metadata is stored
  * block - block for which path information is being requested
  * token - block token
+ *
+ * This message is deprecated in favor of file descriptor passing.
  */
 message GetBlockLocalPathInfoRequestProto {
   required ExtendedBlockProto block = 1;
@@ -84,6 +86,8 @@ message GetBlockLocalPathInfoRequestProt
  * block - block for which file path information is being returned
  * localPath - file path where the block data is stored
  * localMetaPath - file path where the block meta data is stored
+ *
+ * This message is deprecated in favor of file descriptor passing.
  */
 message GetBlockLocalPathInfoResponseProto {
   required ExtendedBlockProto block = 1;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Sat Apr 13 02:13:59 2013
@@ -115,6 +115,16 @@ message OpBlockChecksumProto { 
   required BaseHeaderProto header = 1;
 }
 
+message OpRequestShortCircuitAccessProto { 
+  required BaseHeaderProto header = 1;
+
+  /** In order to get short-circuit access to block data, clients must set this
+   * to the highest version of the block data that they can understand.
+   * Currently 1 is the only version, but more versions may exist in the future
+   * if the on-disk format changes.
+   */
+  required uint32 maxVersion = 2;
+}
 
 message PacketHeaderProto {
   // All fields must be fixed-length!
@@ -133,6 +143,7 @@ enum Status {
   ERROR_EXISTS = 4;
   ERROR_ACCESS_TOKEN = 5;
   CHECKSUM_OK = 6;
+  ERROR_UNSUPPORTED = 7;
 }
 
 message PipelineAckProto {
@@ -165,6 +176,16 @@ message BlockOpResponseProto {
 
   /** explanatory text which may be useful to log on the client side */
   optional string message = 5;
+
+  /** If the server chooses to agree to the request of a client for
+   * short-circuit access, it will send a response message with the relevant
+   * file descriptors attached.
+   *
+   * In the body of the message, this version number will be set to the
+   * specific version number of the block data that the client is about to
+   * read.
+   */
+  optional uint32 shortCircuitAccessVersion = 6;
 }
 
 /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Sat Apr 13 02:13:59 2013
@@ -1232,6 +1232,17 @@
 </property>
 
 <property>
+  <name>dfs.domain.socket.path</name>
+  <value></value>
+  <description>
+    Optional.  This is a path to a UNIX domain socket that will be used for
+    communication between the DataNode and local HDFS clients.
+    If the string "_PORT" is present in this path, it will be replaced by the
+    TCP port of the DataNode.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.fsdataset.volume.choosing.balanced-space-threshold</name>
   <value>10737418240</value> <!-- 10 GB -->
   <description>

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1430995-1467533

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1430995-1467533

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1430995-1467533

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1430995-1467533

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Sat Apr 13 02:13:59 2013
@@ -28,9 +28,9 @@ import java.net.Socket;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -150,12 +150,12 @@ public class BlockReaderTestUtil {
     sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
 
     return BlockReaderFactory.newBlockReader(
-      new DFSClient.Conf(conf),
-      sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
+      conf,
+      targetAddr.toString()+ ":" + block.getBlockId(), block,
       testBlock.getBlockToken(), 
       offset, lenToRead,
-      conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
-      true, "", null, null);
+      true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
+      nodes[0], null, false);
   }
 
   /**
@@ -166,5 +166,4 @@ public class BlockReaderTestUtil {
     int ipcport = nodes[0].getIpcPort();
     return cluster.getDataNode(ipcport);
   }
-
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Sat Apr 13 02:13:59 2013
@@ -2188,8 +2188,8 @@ public class MiniDFSCluster {
   /**
    * Get file correpsonding to a block
    * @param storageDir storage directory
-   * @param blk block to be corrupted
-   * @return file corresponding to the block
+   * @param blk the block
+   * @return data file corresponding to the block
    */
   public static File getBlockFile(File storageDir, ExtendedBlock blk) {
     return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()), 
@@ -2197,6 +2197,19 @@ public class MiniDFSCluster {
   }
 
   /**
+   * Get the latest metadata file correpsonding to a block
+   * @param storageDir storage directory
+   * @param blk the block
+   * @return metadata file corresponding to the block
+   */
+  public static File getBlockMetadataFile(File storageDir, ExtendedBlock blk) {
+    return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()), 
+        blk.getBlockName() + "_" + blk.getGenerationStamp() +
+        Block.METADATA_EXTENSION);
+    
+  }
+
+  /**
    * Shut down a cluster if it is not null
    * @param cluster cluster reference or null
    */
@@ -2223,7 +2236,7 @@ public class MiniDFSCluster {
   }
   
   /**
-   * Get files related to a block for a given datanode
+   * Get the block data file for a block from a given datanode
    * @param dnIndex Index of the datanode to get block files for
    * @param block block for which corresponding files are needed
    */
@@ -2238,6 +2251,24 @@ public class MiniDFSCluster {
     }
     return null;
   }
+
+  /**
+   * Get the block metadata file for a block from a given datanode
+   * 
+   * @param dnIndex Index of the datanode to get block files for
+   * @param block block for which corresponding files are needed
+   */
+  public static File getBlockMetadataFile(int dnIndex, ExtendedBlock block) {
+    // Check for block file in the two storage directories of the datanode
+    for (int i = 0; i <=1 ; i++) {
+      File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i);
+      File blockMetaFile = getBlockMetadataFile(storageDir, block);
+      if (blockMetaFile.exists()) {
+        return blockMetaFile;
+      }
+    }
+    return null;
+  }
   
   /**
    * Throw an exception if the MiniDFSCluster is not started with a single



Mime
View raw message