hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1430507 [1/2] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/net/ src/main/java/org/apache/hadoop/hdfs/server/common/ src/main/java/org/apache/hadoop...
Date Tue, 08 Jan 2013 20:44:10 GMT
Author: todd
Date: Tue Jan  8 20:44:09 2013
New Revision: 1430507

URL: http://svn.apache.org/viewvc?rev=1430507&view=rev
Log:
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes. Contributed by Colin Patrick McCabe.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
Removed:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1430507&r1=1430506&r2=1430507&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jan  8 20:44:09 2013
@@ -180,6 +180,9 @@ Trunk (Unreleased)
     HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class
     (Colin Patrick McCabe via todd)
 
+    HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes
+    (Colin Patrick McCabe via todd)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1430507&r1=1430506&r2=1430507&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Tue Jan  8 20:44:09 2013
@@ -18,10 +18,8 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.net.Socket;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 
 /**
  * A BlockReader is responsible for reading a single block
@@ -43,7 +41,18 @@ public interface BlockReader extends Byt
    */
   long skip(long n) throws IOException;
 
-  void close() throws IOException;
+  /**
+   * Close the block reader.
+   *
+   * @param peerCache      The PeerCache to put the Peer we're using back
+   *                       into, or null if we should simply close the Peer
+   *                       we're using (along with its Socket).
+   *                       Some block readers, like BlockReaderLocal, may
+   *                       not make use of this parameter.
+   *
+   * @throws IOException
+   */
+  void close(PeerCache peerCache) throws IOException;
 
   /**
    * Read exactly the given amount of data, throwing an exception
@@ -60,20 +69,4 @@ public interface BlockReader extends Byt
    * filled or the next call will return EOF.
    */
   int readAll(byte[] buf, int offset, int len) throws IOException;
-
-  /**
-   * Take the socket used to talk to the DN.
-   */
-  Socket takeSocket();
-
-  /**
-   * Whether the BlockReader has reached the end of its input stream
-   * and successfully sent a status code back to the datanode.
-   */
-  boolean hasSentStatusCode();
-
-  /**
-   * @return a reference to the streams this block reader is using.
-   */
-  IOStreamPair getStreams();
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1430507&r1=1430506&r2=1430507&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Tue Jan  8 20:44:09 2013
@@ -19,19 +19,18 @@ package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 
+import com.google.common.base.Preconditions;
+
 
 /** 
  * Utility class to create BlockReader implementations.
@@ -47,18 +46,73 @@ public class BlockReaderFactory {
   @InterfaceAudience.Private
   public static class Params {
     private final Conf conf;
-    private Socket socket = null;
+    /**
+     * The peer that this BlockReader will be connected to.
+     * You must set this.
+     */
+    private Peer peer = null;
+    
+    /**
+     * The file name that this BlockReader pertains to.
+     * This is optional and only used for display and logging purposes.
+     */
     private String file = null;
+
+    /**
+     * The block that this BlockReader is reading.
+     * You must set this.
+     */
     private ExtendedBlock block = null;
+    
+    /**
+     * The BlockTokenIdentifier to use, or null to use none.
+     */
     private Token<BlockTokenIdentifier> blockToken = null;
+
+    /**
+     * The offset in the block to start reading at.
+     */
     private long startOffset = 0;
+    
+    /**
+     * The total number of bytes we might want to read, or -1 to assume no
+     * limit.
+     */
     private long len = -1;
+    
+    /**
+     * The buffer size to use.
+     *
+     * If this is not set, we will use the default from the Conf.
+     */
     private int bufferSize;
+    
+    /**
+     * Whether or not we should verify the checksum.
+     *
+     * This is used instead of conf.verifyChecksum, because there are some
+     * cases when we may want to explicitly turn off checksum verification,
+     * such as when the caller has explicitly asked for a file to be opened
+     * without checksum verification.
+     */
     private boolean verifyChecksum = true;
+
+    /**
+     * Whether or not we should try to use short circuit local reads.
+     */
     private boolean shortCircuitLocalReads = false;
+
+    /**
+     * The name of the client using this BlockReader, for logging and
+     * debugging purposes.
+     */
     private String clientName = "";
-    private DataEncryptionKey encryptionKey = null;
-    private IOStreamPair ioStreamPair = null;
+    
+    /**
+     * The DataNode on which this Block resides.
+     * You must set this.
+     */
+    private DatanodeID datanodeID = null;
 
     public Params(Conf conf) {
       this.conf = conf;
@@ -67,11 +121,11 @@ public class BlockReaderFactory {
     public Conf getConf() {
       return conf;
     }
-    public Socket getSocket() {
-      return socket;
+    public Peer getPeer() {
+      return peer;
     }
-    public Params setSocket(Socket socket) {
-      this.socket = socket;
+    public Params setPeer(Peer peer) {
+      this.peer = peer;
       return this;
     }
     public String getFile() {
@@ -137,19 +191,12 @@ public class BlockReaderFactory {
       this.clientName = clientName;
       return this;
     }
-    public Params setEncryptionKey(DataEncryptionKey encryptionKey) {
-      this.encryptionKey = encryptionKey;
+    public Params setDatanodeID(DatanodeID datanodeID) {
+      this.datanodeID = datanodeID;
       return this;
     }
-    public DataEncryptionKey getEncryptionKey() {
-      return encryptionKey;
-    }
-    public IOStreamPair getIoStreamPair() {
-      return ioStreamPair;
-    }
-    public Params setIoStreamPair(IOStreamPair ioStreamPair) {
-      this.ioStreamPair = ioStreamPair;
-      return this;
+    public DatanodeID getDatanodeID() {
+      return datanodeID;
     }
   }
 
@@ -164,24 +211,27 @@ public class BlockReaderFactory {
    */
   @SuppressWarnings("deprecation")
   public static BlockReader newBlockReader(Params params) throws IOException {
+    Preconditions.checkNotNull(params.getPeer());
+    Preconditions.checkNotNull(params.getBlock());
+    Preconditions.checkNotNull(params.getDatanodeID());
+    // First, let's set the read and write timeouts appropriately.
+    // This will keep us from blocking forever if something goes wrong during
+    // network communication.
+    Peer peer = params.getPeer();
+    peer.setReadTimeout(params.getConf().socketTimeout);
+    peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
+
     if (params.getConf().useLegacyBlockReader) {
-      if (params.getEncryptionKey() != null) {
-        throw new RuntimeException("Encryption is not supported with the legacy block reader.");
-      }
+      // The legacy BlockReader doesn't require that the Peers it uses
+      // have associated ReadableByteChannels.  This makes it easier to use 
+      // with some older Socket classes like, say, SocksSocketImpl.
+      //
+      // TODO: create a wrapper class that makes channel-less sockets look like
+      // they have a channel, so that we can finally remove the legacy
+      // RemoteBlockReader.  See HDFS-2534.
       return RemoteBlockReader.newBlockReader(params);
     } else {
-      Socket sock = params.getSocket();
-      if (params.getIoStreamPair() == null) {
-        params.setIoStreamPair(new IOStreamPair(NetUtils.getInputStream(sock),
-            NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
-        if (params.getEncryptionKey() != null) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  params.getIoStreamPair().out, params.getIoStreamPair().in, 
-                  params.getEncryptionKey());
-          params.setIoStreamPair(encryptedStreams);
-        }
-      }
+      // The usual block reader.
       return RemoteBlockReader2.newBlockReader(params);
     }
   }
@@ -197,4 +247,4 @@ public class BlockReaderFactory {
       final String poolId, final long blockId) {
     return s.toString() + ":" + poolId + ":" + blockId;
   }
-}
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1430507&r1=1430506&r2=1430507&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Tue Jan  8 20:44:09 2013
@@ -649,7 +649,7 @@ class BlockReaderLocal implements BlockR
   }
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close(PeerCache peerCache) throws IOException {
     dataIn.close();
     if (checksumIn != null) {
       checksumIn.close();
@@ -675,19 +675,4 @@ class BlockReaderLocal implements BlockR
   public void readFully(byte[] buf, int off, int len) throws IOException {
     BlockReaderUtil.readFully(this, buf, off, len);
   }
-
-  @Override
-  public Socket takeSocket() {
-    return null;
-  }
-
-  @Override
-  public boolean hasSentStatusCode() {
-    return false;
-  }
-
-  @Override
-  public IOStreamPair getStreams() {
-    return null;
-  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1430507&r1=1430506&r2=1430507&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jan  8 20:44:09 2013
@@ -191,7 +191,7 @@ public class DFSClient implements java.i
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
   private final String authority;
-  final SocketCache socketCache;
+  final PeerCache peerCache;
   final Conf dfsClientConf;
   private Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
@@ -433,7 +433,7 @@ public class DFSClient implements java.i
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
     
-    this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
+    this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1430507&r1=1430506&r2=1430507&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Jan  8 20:44:09 2013
@@ -32,12 +32,15 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams;
+import org.apache.hadoop.hdfs.net.EncryptedPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.ipc.RPC;
@@ -60,7 +64,7 @@ import org.apache.hadoop.security.token.
  ****************************************************************/
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
-  private final SocketCache socketCache;
+  private final PeerCache peerCache;
 
   private final DFSClient dfsClient;
   private boolean closed = false;
@@ -110,7 +114,7 @@ public class DFSInputStream extends FSIn
     this.verifyChecksum = verifyChecksum;
     this.buffersize = buffersize;
     this.src = src;
-    this.socketCache = dfsClient.socketCache;
+    this.peerCache = dfsClient.peerCache;
     prefetchSize = dfsClient.getConf().prefetchSize;
     timeWindow = dfsClient.getConf().timeWindow;
     nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@@ -424,7 +428,7 @@ public class DFSInputStream extends FSIn
 
     // Will be getting a new BlockReader.
     if (blockReader != null) {
-      closeBlockReader(blockReader);
+      blockReader.close(peerCache);
       blockReader = null;
     }
 
@@ -506,7 +510,7 @@ public class DFSInputStream extends FSIn
     dfsClient.checkOpen();
 
     if (blockReader != null) {
-      closeBlockReader(blockReader);
+      blockReader.close(peerCache);
       blockReader = null;
     }
     super.close();
@@ -833,7 +837,7 @@ public class DFSInputStream extends FSIn
         }
       } finally {
         if (reader != null) {
-          closeBlockReader(reader);
+          reader.close(peerCache);
         }
       }
       // Put chosen node into dead list, continue
@@ -841,16 +845,30 @@ public class DFSInputStream extends FSIn
     }
   }
 
-  /**
-   * Close the given BlockReader and cache its socket.
-   */
-  private void closeBlockReader(BlockReader reader) throws IOException {
-    if (reader.hasSentStatusCode()) {
-      IOStreamPair ioStreams = reader.getStreams();
-      Socket oldSock = reader.takeSocket();
-      socketCache.put(oldSock, ioStreams);
+  private Peer newPeer(InetSocketAddress addr) throws IOException {
+    Peer peer = null;
+    boolean success = false;
+    Socket sock = null;
+    try {
+      sock = dfsClient.socketFactory.createSocket();
+      NetUtils.connect(sock, addr,
+        dfsClient.getRandomLocalInterfaceAddr(),
+        dfsClient.getConf().socketTimeout);
+      peer = TcpPeerServer.peerFromSocket(sock);
+      
+      // Add encryption if configured.
+      DataEncryptionKey key = dfsClient.getDataEncryptionKey();
+      if (key != null) {
+        peer = new EncryptedPeer(peer, key);
+      }
+      success = true;
+      return peer;
+    } finally {
+      if (!success) {
+        IOUtils.closeQuietly(peer);
+        IOUtils.closeQuietly(sock);
+      }
     }
-    reader.close();
   }
 
   /**
@@ -896,40 +914,16 @@ public class DFSInputStream extends FSIn
     // Allow retry since there is no way of knowing whether the cached socket
     // is good until we actually use it.
     for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
-      SocketAndStreams sockAndStreams = null;
+      Peer peer = null;
       // Don't use the cache on the last attempt - it's possible that there
       // are arbitrarily many unusable sockets in the cache, but we don't
       // want to fail the read.
       if (retries < nCachedConnRetry) {
-        sockAndStreams = socketCache.get(dnAddr);
+        peer = peerCache.get(chosenNode);
       }
-      Socket sock;
-      if (sockAndStreams == null) {
+      if (peer == null) {
+        peer = newPeer(dnAddr);
         fromCache = false;
-
-        sock = dfsClient.socketFactory.createSocket();
-        
-        // TCP_NODELAY is crucial here because of bad interactions between
-        // Nagle's Algorithm and Delayed ACKs. With connection keepalive
-        // between the client and DN, the conversation looks like:
-        //   1. Client -> DN: Read block X
-        //   2. DN -> Client: data for block X
-        //   3. Client -> DN: Status OK (successful read)
-        //   4. Client -> DN: Read block Y
-        // The fact that step #3 and #4 are both in the client->DN direction
-        // triggers Nagling. If the DN is using delayed ACKs, this results
-        // in a delay of 40ms or more.
-        //
-        // TCP_NODELAY disables nagling and thus avoids this performance
-        // disaster.
-        sock.setTcpNoDelay(true);
-
-        NetUtils.connect(sock, dnAddr,
-            dfsClient.getRandomLocalInterfaceAddr(),
-            dfsClient.getConf().socketTimeout);
-        sock.setSoTimeout(dfsClient.getConf().socketTimeout);
-      } else {
-        sock = sockAndStreams.sock;
       }
 
       try {
@@ -939,19 +933,13 @@ public class DFSInputStream extends FSIn
                 setFile(file).setBlock(block).setBlockToken(blockToken).
                 setStartOffset(startOffset).setLen(len).
                 setBufferSize(bufferSize).setVerifyChecksum(verifyChecksum).
-                setClientName(clientName).
-                setEncryptionKey(dfsClient.getDataEncryptionKey()).
-                setIoStreamPair(sockAndStreams == null ? null : sockAndStreams.ioStreams).
-                setSocket(sock));
+                setClientName(clientName).setDatanodeID(chosenNode).
+                setPeer(peer));
         return reader;
       } catch (IOException ex) {
         // Our socket is no good.
-        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
-        if (sockAndStreams != null) {
-          sockAndStreams.close();
-        } else {
-          sock.close();
-        }
+        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex);
+        IOUtils.closeQuietly(peer);
         err = ex;
       }
     }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java?rev=1430507&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java Tue Jan  8 20:44:09 2013
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+
+/**
+ * A cache of input stream sockets to Data Node.
+ */
+class PeerCache {
+  private static final Log LOG = LogFactory.getLog(PeerCache.class);
+  
+  private static class Value {
+    private final Peer peer;
+    private final long time;
+
+    Value(Peer peer, long time) {
+      this.peer = peer;
+      this.time = time;
+    }
+
+    Peer getPeer() {
+      return peer;
+    }
+
+    long getTime() {
+      return time;
+    }
+  }
+
+  private Daemon daemon;
+  /** A map for per user per datanode. */
+  private static LinkedListMultimap<DatanodeID, Value> multimap =
+    LinkedListMultimap.create();
+  private static int capacity;
+  private static long expiryPeriod;
+  private static PeerCache instance = new PeerCache();
+  private static boolean isInitedOnce = false;
+ 
+  public static synchronized PeerCache getInstance(int c, long e) {
+    // capacity is only initialized once
+    if (isInitedOnce == false) {
+      capacity = c;
+      expiryPeriod = e;
+
+      if (capacity == 0 ) {
+        LOG.info("SocketCache disabled.");
+      }
+      else if (expiryPeriod == 0) {
+        throw new IllegalStateException("Cannot initialize expiryPeriod to " +
+           expiryPeriod + "when cache is enabled.");
+      }
+      isInitedOnce = true;
+    } else { //already initialized once
+      if (capacity != c || expiryPeriod != e) {
+        LOG.info("capacity and expiry periods already set to " + capacity + 
+          " and " + expiryPeriod + " respectively. Cannot set it to " + c + 
+          " and " + e);
+      }
+    }
+
+    return instance;
+  }
+
+  private boolean isDaemonStarted() {
+    return (daemon == null)? false: true;
+  }
+
+  private synchronized void startExpiryDaemon() {
+    // start daemon only if not already started
+    if (isDaemonStarted() == true) {
+      return;
+    }
+    
+    daemon = new Daemon(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          PeerCache.this.run();
+        } catch(InterruptedException e) {
+          //noop
+        } finally {
+          PeerCache.this.clear();
+        }
+      }
+
+      @Override
+      public String toString() {
+        return String.valueOf(PeerCache.this);
+      }
+    });
+    daemon.start();
+  }
+
+  /**
+   * Get a cached peer connected to the given DataNode.
+   * @param dnId         The DataNode to get a Peer for.
+   * @return             An open Peer connected to the DN, or null if none
+   *                     was found. 
+   */
+  public synchronized Peer get(DatanodeID dnId) {
+
+    if (capacity <= 0) { // disabled
+      return null;
+    }
+
+    List<Value> sockStreamList = multimap.get(dnId);
+    if (sockStreamList == null) {
+      return null;
+    }
+
+    Iterator<Value> iter = sockStreamList.iterator();
+    while (iter.hasNext()) {
+      Value candidate = iter.next();
+      iter.remove();
+      if (!candidate.getPeer().isClosed()) {
+        return candidate.getPeer();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Give an unused socket to the cache.
+   * @param sock socket not used by anyone.
+   */
+  public synchronized void put(DatanodeID dnId, Peer peer) {
+    Preconditions.checkNotNull(dnId);
+    Preconditions.checkNotNull(peer);
+    if (peer.isClosed()) return;
+    if (capacity <= 0) {
+      // Cache disabled.
+      IOUtils.cleanup(LOG, peer);
+      return;
+    }
+ 
+    startExpiryDaemon();
+
+    if (capacity == multimap.size()) {
+      evictOldest();
+    }
+    multimap.put(dnId, new Value(peer, Time.monotonicNow()));
+  }
+
+  public synchronized int size() {
+    return multimap.size();
+  }
+
+  /**
+   * Evict and close sockets older than expiry period from the cache.
+   */
+  private synchronized void evictExpired(long expiryPeriod) {
+    while (multimap.size() != 0) {
+      Iterator<Entry<DatanodeID, Value>> iter =
+        multimap.entries().iterator();
+      Entry<DatanodeID, Value> entry = iter.next();
+      // if oldest socket expired, remove it
+      if (entry == null || 
+        Time.monotonicNow() - entry.getValue().getTime() <
+        expiryPeriod) {
+        break;
+      }
+      IOUtils.cleanup(LOG, entry.getValue().getPeer());
+      iter.remove();
+    }
+  }
+
+  /**
+   * Evict the oldest entry in the cache.
+   */
+  private synchronized void evictOldest() {
+    // We can get the oldest element immediately, because of an interesting
+    // property of LinkedListMultimap: its iterator traverses entries in the
+    // order that they were added.
+    Iterator<Entry<DatanodeID, Value>> iter =
+      multimap.entries().iterator();
+    if (!iter.hasNext()) {
+      throw new IllegalStateException("Cannot evict from empty cache! " +
+        "capacity: " + capacity);
+    }
+    Entry<DatanodeID, Value> entry = iter.next();
+    IOUtils.cleanup(LOG, entry.getValue().getPeer());
+    iter.remove();
+  }
+
+  /**
+   * Periodically check in the cache and expire the entries
+   * older than expiryPeriod minutes
+   */
+  private void run() throws InterruptedException {
+    for(long lastExpiryTime = Time.monotonicNow();
+        !Thread.interrupted();
+        Thread.sleep(expiryPeriod)) {
+      final long elapsed = Time.monotonicNow() - lastExpiryTime;
+      if (elapsed >= expiryPeriod) {
+        evictExpired(expiryPeriod);
+        lastExpiryTime = Time.monotonicNow();
+      }
+    }
+    clear();
+    throw new InterruptedException("Daemon Interrupted");
+  }
+
+  /**
+   * Empty the cache, and close all sockets.
+   */
+  @VisibleForTesting
+  synchronized void clear() {
+    for (Value value : multimap.values()) {
+      IOUtils.cleanup(LOG, value.getPeer());
+    }
+    multimap.clear();
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1430507&r1=1430506&r2=1430507&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Jan  8 20:44:09 2013
@@ -25,25 +25,20 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
 
@@ -56,7 +51,8 @@ import org.apache.hadoop.util.DataChecks
 @Deprecated
 public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 
-  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  private final Peer peer;
+  private final DatanodeID datanodeID;
   private final DataInputStream in;
   private DataChecksum checksum;
 
@@ -126,9 +122,9 @@ public class RemoteBlockReader extends F
     // if eos was set in the previous read, send a status code to the DN
     if (eos && !eosBefore && nRead >= 0) {
       if (needChecksum()) {
-        sendReadResult(dnSock, Status.CHECKSUM_OK);
+        sendReadResult(peer, Status.CHECKSUM_OK);
       } else {
-        sendReadResult(dnSock, Status.SUCCESS);
+        sendReadResult(peer, Status.SUCCESS);
       }
     }
     return nRead;
@@ -322,7 +318,8 @@ public class RemoteBlockReader extends F
   
   private RemoteBlockReader(String file, String bpid, long blockId,
       DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+      long startOffset, long firstChunkOffset, long bytesToRead, 
+      Peer peer, DatanodeID datanodeID) {
     // Path is used only for printing block and file information in debug
     super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
           1, verifyChecksum,
@@ -330,7 +327,8 @@ public class RemoteBlockReader extends F
           checksum.getBytesPerChecksum(),
           checksum.getChecksumSize());
     
-    this.dnSock = dnSock;
+    this.peer = peer;
+    this.datanodeID = datanodeID;
     this.in = in;
     this.checksum = checksum;
     this.startOffset = Math.max( startOffset, 0 );
@@ -367,9 +365,8 @@ public class RemoteBlockReader extends F
   public static RemoteBlockReader newBlockReader(BlockReaderFactory.Params params)
         throws IOException {
     // in and out will be closed when sock is closed (by the caller)
-    Socket sock = params.getSocket();
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
+        params.getPeer().getOutputStream()));
     new Sender(out).readBlock(params.getBlock(), params.getBlockToken(), 
         params.getClientName(), params.getStartOffset(), params.getLen());
 
@@ -377,13 +374,13 @@ public class RemoteBlockReader extends F
     // Get bytes in block, set streams
     //
     DataInputStream in = new DataInputStream(
-        new BufferedInputStream(NetUtils.getInputStream(sock), 
+        new BufferedInputStream(params.getPeer().getInputStream(),
             params.getBufferSize()));
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
-    RemoteBlockReader2.checkSuccess(status, sock, params.getBlock(),
-        params.getFile());
+    RemoteBlockReader2.checkSuccess(status, params.getPeer(),
+        params.getBlock(), params.getFile());
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -402,18 +399,20 @@ public class RemoteBlockReader extends F
 
     return new RemoteBlockReader(params.getFile(), params.getBlock().getBlockPoolId(), 
         params.getBlock().getBlockId(), in, checksum, params.getVerifyChecksum(),
-        params.getStartOffset(), firstChunkOffset, params.getLen(), sock);
+        params.getStartOffset(), firstChunkOffset, params.getLen(),
+        params.getPeer(), params.getDatanodeID());
   }
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close(PeerCache peerCache) throws IOException {
     startOffset = -1;
     checksum = null;
-    if (dnSock != null) {
-      dnSock.close();
+    if (peerCache != null && sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
     }
-
-    // in will be closed when its Socket is closed.
+    // in will be closed when its Peer is closed.
   }
   
   @Override
@@ -427,37 +426,21 @@ public class RemoteBlockReader extends F
     return readFully(this, buf, offset, len);
   }
 
-  @Override
-  public Socket takeSocket() {
-    assert hasSentStatusCode() :
-      "BlockReader shouldn't give back sockets mid-read";
-    Socket res = dnSock;
-    dnSock = null;
-    return res;
-  }
-
-  @Override
-  public boolean hasSentStatusCode() {
-    return sentStatusCode;
-  }
-
   /**
    * When the reader reaches end of the read, it sends a status response
    * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
    * closing our connection (which we will re-open), but won't affect
    * data correctness.
    */
-  void sendReadResult(Socket sock, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + sock;
+  void sendReadResult(Peer peer, Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + peer;
     try {
-      RemoteBlockReader2.writeReadResult(
-          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
-          statusCode);
+      RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               sock.getInetAddress() + ": " + e.getMessage());
+               datanodeID + ": " + e.getMessage());
     }
   }
   
@@ -477,12 +460,4 @@ public class RemoteBlockReader extends F
   public int read(ByteBuffer buf) throws IOException {
     throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
   }
-
-  @Override
-  public IOStreamPair getStreams() {
-    // This class doesn't support encryption, which is the only thing this
-    // method is used for. See HDFS-3637.
-    return null;
-  }
-
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1430507&r1=1430506&r2=1430507&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Jan  8 20:44:09 2013
@@ -25,16 +25,15 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -42,13 +41,11 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.net.SocketInputWrapper;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This is a wrapper around connection to datanode
  * and understands checksum, offset etc.
@@ -79,11 +76,8 @@ import org.apache.hadoop.util.DataChecks
 public class RemoteBlockReader2  implements BlockReader {
 
   static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
-  
-  Socket dnSock;
-  // for now just sending the status code (e.g. checksumOk) after the read.
-  private IOStreamPair ioStreams;
-  private final ReadableByteChannel in;
+  private final DatanodeID datanodeID;
+  private final Peer peer;
   private DataChecksum checksum;
   
   private PacketReceiver packetReceiver = new PacketReceiver(true);
@@ -115,6 +109,11 @@ public class RemoteBlockReader2  impleme
   /** Amount of unread data in the current received packet */
   int dataLeft = 0;
   
+  @VisibleForTesting
+  public Peer getPeer() {
+    return peer;
+  }
+  
   @Override
   public synchronized int read(byte[] buf, int off, int len) 
                                throws IOException {
@@ -155,7 +154,7 @@ public class RemoteBlockReader2  impleme
 
   private void readNextPacket() throws IOException {
     //Read packet headers.
-    packetReceiver.receiveNextPacket(in);
+    packetReceiver.receiveNextPacket(peer.getInputStreamChannel());
 
     PacketHeader curHeader = packetReceiver.getHeader();
     curDataSlice = packetReceiver.getDataSlice();
@@ -236,7 +235,7 @@ public class RemoteBlockReader2  impleme
       LOG.trace("Reading empty packet at end of read");
     }
     
-    packetReceiver.receiveNextPacket(in);
+    packetReceiver.receiveNextPacket(peer.getInputStreamChannel());
 
     PacketHeader trailer = packetReceiver.getHeader();
     if (!trailer.isLastPacketInBlock() ||
@@ -247,11 +246,10 @@ public class RemoteBlockReader2  impleme
   }
 
   protected RemoteBlockReader2(BlockReaderFactory.Params params, 
-      DataChecksum checksum, long firstChunkOffset, ReadableByteChannel in) {
+      DataChecksum checksum, long firstChunkOffset) {
     // Path is used only for printing block and file information in debug
-    this.dnSock = params.getSocket();
-    this.ioStreams = params.getIoStreamPair();
-    this.in = in;
+    this.datanodeID = params.getDatanodeID();
+    this.peer = params.getPeer();
     this.checksum = checksum;
     this.verifyChecksum = params.getVerifyChecksum();
     this.startOffset = Math.max( params.getStartOffset(), 0 );
@@ -268,38 +266,19 @@ public class RemoteBlockReader2  impleme
 
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close(PeerCache peerCache) throws IOException {
     packetReceiver.close();
     
     startOffset = -1;
     checksum = null;
-    if (dnSock != null) {
-      dnSock.close();
+    if (peerCache != null && sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
     }
 
     // in will be closed when its Socket is closed.
   }
-  
-  /**
-   * Take the socket used to talk to the DN.
-   */
-  @Override
-  public Socket takeSocket() {
-    assert hasSentStatusCode() :
-      "BlockReader shouldn't give back sockets mid-read";
-    Socket res = dnSock;
-    dnSock = null;
-    return res;
-  }
-
-  /**
-   * Whether the BlockReader has reached the end of its input stream
-   * and successfully sent a status code back to the datanode.
-   */
-  @Override
-  public boolean hasSentStatusCode() {
-    return sentStatusCode;
-  }
 
   /**
    * When the reader reaches end of the read, it sends a status response
@@ -308,14 +287,14 @@ public class RemoteBlockReader2  impleme
    * data correctness.
    */
   void sendReadResult(Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + dnSock;
+    assert !sentStatusCode : "already sent status code to " + peer;
     try {
-      writeReadResult(ioStreams.out, statusCode);
+      writeReadResult(peer.getOutputStream(), statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               dnSock.getInetAddress() + ": " + e.getMessage());
+               peer.getRemoteAddressString() + ": " + e.getMessage());
     }
   }
 
@@ -373,29 +352,20 @@ public class RemoteBlockReader2  impleme
    */
   public static BlockReader newBlockReader(BlockReaderFactory.Params params)
                                      throws IOException {
-    IOStreamPair ioStreams = params.getIoStreamPair();
-    ReadableByteChannel ch;
-    if (ioStreams.in instanceof SocketInputWrapper) {
-      ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
-    } else {
-      ch = (ReadableByteChannel) ioStreams.in;
-    }
-    
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          ioStreams.out));
+          params.getPeer().getOutputStream()));
     new Sender(out).readBlock(params.getBlock(), params.getBlockToken(), 
         params.getClientName(), params.getStartOffset(), params.getLen());
 
     //
     // Get bytes in block
     //
-    DataInputStream in = new DataInputStream(ioStreams.in);
+    DataInputStream in = new DataInputStream(params.getPeer().getInputStream());
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
-    checkSuccess(status, params.getSocket(), params.getBlock(),
-        params.getFile());
+    checkSuccess(status, params.getPeer(), params.getBlock(), params.getFile());
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -412,33 +382,28 @@ public class RemoteBlockReader2  impleme
                     params.getStartOffset() + " for file " + params.getFile());
     }
 
-    return new RemoteBlockReader2(params, checksum, firstChunkOffset, ch);
+    return new RemoteBlockReader2(params, checksum, firstChunkOffset);
   }
 
   static void checkSuccess(
-      BlockOpResponseProto status, Socket sock,
+      BlockOpResponseProto status, Peer peer,
       ExtendedBlock block, String file)
       throws IOException {
     if (status.getStatus() != Status.SUCCESS) {
       if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
         throw new InvalidBlockTokenException(
             "Got access token error for OP_READ_BLOCK, self="
-                + sock.getLocalSocketAddress() + ", remote="
-                + sock.getRemoteSocketAddress() + ", for file " + file
+                + peer.getLocalAddressString() + ", remote="
+                + peer.getRemoteAddressString() + ", for file " + file
                 + ", for pool " + block.getBlockPoolId() + " block " 
                 + block.getBlockId() + "_" + block.getGenerationStamp());
       } else {
         throw new IOException("Got error for OP_READ_BLOCK, self="
-            + sock.getLocalSocketAddress() + ", remote="
-            + sock.getRemoteSocketAddress() + ", for file " + file
+            + peer.getLocalAddressString() + ", remote="
+            + peer.getRemoteAddressString() + ", for file " + file
             + ", for pool " + block.getBlockPoolId() + " block " 
             + block.getBlockId() + "_" + block.getGenerationStamp());
       }
     }
   }
-
-  @Override
-  public IOStreamPair getStreams() {
-    return ioStreams;
-  }
 }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java?rev=1430507&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java Tue Jan  8 20:44:09 2013
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * Represents a peer that we communicate with by using a basic Socket
+ * that has no associated Channel.
+ *
+ */
+class BasicInetPeer implements Peer {
+  private final Socket socket;
+  private final OutputStream out;
+  private final InputStream in;
+  private final boolean isLocal;
+
+  public BasicInetPeer(Socket socket) throws IOException {
+    this.socket = socket;
+    this.out = socket.getOutputStream();
+    this.in = socket.getInputStream();
+    this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
+  }
+
+  @Override
+  public ReadableByteChannel getInputStreamChannel() {
+    /*
+     * This Socket has no channel, so there's nothing to return here.
+     */
+    return null;
+  }
+
+  @Override
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    socket.setSoTimeout(timeoutMs);
+  }
+
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return socket.getReceiveBufferSize();
+  }
+
+  @Override
+  public boolean getTcpNoDelay() throws IOException {
+    return socket.getTcpNoDelay();
+  }
+
+  @Override
+  public void setWriteTimeout(int timeoutMs) {
+   /* 
+    * We can't implement write timeouts. :(
+    * 
+    * Java provides no facility to set a blocking write timeout on a Socket.
+    * You can simulate a blocking write with a timeout by using
+    * non-blocking I/O.  However, we can't use nio here, because this Socket
+    * doesn't have an associated Channel.
+    * 
+    * See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4031100 for
+    * more details.
+    */
+  }
+
+  @Override
+  public boolean isClosed() {
+    return socket.isClosed();
+  }
+
+  @Override
+  public void close() throws IOException {
+    socket.close();
+  }
+
+  @Override
+  public String getRemoteAddressString() {
+    return socket.getRemoteSocketAddress().toString();
+  }
+
+  @Override
+  public String getLocalAddressString() {
+    return socket.getLocalSocketAddress().toString();
+  }
+  
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public boolean isLocal() {
+    return isLocal;
+  }
+
+  @Override
+  public String toString() {
+    return "BasicInetPeer(" + socket.toString() + ")";
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java?rev=1430507&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java Tue Jan  8 20:44:09 2013
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * Represents a peer that we communicate with by using an encrypted
+ * communications medium.
+ */
+@InterfaceAudience.Private
+public class EncryptedPeer implements Peer {
+  private final Peer enclosedPeer;
+
+  /**
+   * An encrypted InputStream.
+   */
+  private final InputStream in;
+  
+  /**
+   * An encrypted OutputStream.
+   */
+  private final OutputStream out;
+  
+  /**
+   * An encrypted ReadableByteChannel.
+   */
+  private final ReadableByteChannel channel;
+
+  public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key)
+      throws IOException {
+    this.enclosedPeer = enclosedPeer;
+    IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams(
+        enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key);
+    this.in = ios.in;
+    this.out = ios.out;
+    this.channel = ios.in instanceof ReadableByteChannel ? 
+        (ReadableByteChannel)ios.in : null;
+  }
+
+  @Override
+  public ReadableByteChannel getInputStreamChannel() {
+    return channel;
+  }
+
+  @Override
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    enclosedPeer.setReadTimeout(timeoutMs);
+  }
+
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return enclosedPeer.getReceiveBufferSize();
+  }
+
+  @Override
+  public boolean getTcpNoDelay() throws IOException {
+    return enclosedPeer.getTcpNoDelay();
+  }
+
+  @Override
+  public void setWriteTimeout(int timeoutMs) throws IOException {
+    enclosedPeer.setWriteTimeout(timeoutMs);
+  }
+
+  @Override
+  public boolean isClosed() {
+    return enclosedPeer.isClosed();
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      in.close();
+    } finally {
+      try {
+        out.close();
+      } finally {
+        enclosedPeer.close();
+      }
+    }
+  }
+
+  @Override
+  public String getRemoteAddressString() {
+    return enclosedPeer.getRemoteAddressString();
+  }
+
+  @Override
+  public String getLocalAddressString() {
+    return enclosedPeer.getLocalAddressString();
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public boolean isLocal() {
+    return enclosedPeer.isLocal();
+  }
+
+  @Override
+  public String toString() {
+    return "EncryptedPeer(" + enclosedPeer + ")";
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java?rev=1430507&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java Tue Jan  8 20:44:09 2013
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.net.SocketInputStream;
+import org.apache.hadoop.net.SocketOutputStream;
+
+/**
+ * Represents a peer that we communicate with by using non-blocking I/O 
+ * on a Socket.
+ */
+class NioInetPeer implements Peer {
+  private final Socket socket;
+
+  /**
+   * An InputStream which simulates blocking I/O with timeouts using NIO.
+   */
+  private final SocketInputStream in;
+  
+  /**
+   * An OutputStream which simulates blocking I/O with timeouts using NIO.
+   */
+  private final SocketOutputStream out;
+
+  private final boolean isLocal;
+
+  NioInetPeer(Socket socket) throws IOException {
+    this.socket = socket;
+    this.in = new SocketInputStream(socket.getChannel(), 0);
+    this.out = new SocketOutputStream(socket.getChannel(), 0);
+    this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
+  }
+
+  @Override
+  public ReadableByteChannel getInputStreamChannel() {
+    return socket.getChannel();
+  }
+
+  @Override
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    in.setTimeout(timeoutMs);
+  }
+
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return socket.getReceiveBufferSize();
+  }
+
+  @Override
+  public boolean getTcpNoDelay() throws IOException {
+    return socket.getTcpNoDelay();
+  }
+
+  @Override
+  public void setWriteTimeout(int timeoutMs) throws IOException {
+    out.setTimeout(timeoutMs);
+  }
+
+  @Override
+  public boolean isClosed() {
+    return socket.isClosed();
+  }
+
+  @Override
+  public void close() throws IOException {
+    // We always close the outermost streams-- in this case, 'in' and 'out'
+    // Closing either one of these will also close the Socket.
+    try {
+      in.close();
+    } finally {
+      out.close();
+    }
+  }
+
+  @Override
+  public String getRemoteAddressString() {
+    return socket.getRemoteSocketAddress().toString();
+  }
+
+  @Override
+  public String getLocalAddressString() {
+    return socket.getLocalSocketAddress().toString();
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public boolean isLocal() {
+    return isLocal;
+  }
+
+  @Override
+  public String toString() {
+    return "NioInetPeer(" + socket.toString() + ")";
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java?rev=1430507&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java Tue Jan  8 20:44:09 2013
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Represents a connection to a peer.
+ */
+@InterfaceAudience.Private
+public interface Peer extends Closeable {
+  /**
+   * @return                The input stream channel associated with this
+   *                        peer, or null if it has none.
+   */
+  public ReadableByteChannel getInputStreamChannel();
+
+  /**
+   * Set the read timeout on this peer.
+   *
+   * @param timeoutMs       The timeout in milliseconds.
+   */
+  public void setReadTimeout(int timeoutMs) throws IOException;
+
+  /**
+   * @return                The receive buffer size.
+   */
+  public int getReceiveBufferSize() throws IOException;
+
+  /**
+   * @return                True if TCP_NODELAY is turned on.
+   */
+  public boolean getTcpNoDelay() throws IOException;
+
+  /**
+   * Set the write timeout on this peer.
+   *
+   * Note: this is not honored for BasicInetPeer.
+   * See {@link BasicSocketPeer#setWriteTimeout} for details.
+   * 
+   * @param timeoutMs       The timeout in milliseconds.
+   */
+  public void setWriteTimeout(int timeoutMs) throws IOException;
+
+  /**
+   * @return                true only if the peer is closed.
+   */
+  public boolean isClosed();
+  
+  /**
+   * Close the peer.
+   *
+   * It's safe to re-close a Peer that is already closed.
+   */
+  public void close() throws IOException;
+
+  /**
+   * @return               A string representing the remote end of our 
+   *                       connection to the peer.
+   */
+  public String getRemoteAddressString();
+
+  /**
+   * @return               A string representing the local end of our 
+   *                       connection to the peer.
+   */
+  public String getLocalAddressString();
+  
+  /**
+   * @return               An InputStream associated with the Peer.
+   *                       This InputStream will be valid until you close
+   *                       this peer with Peer#close.
+   */
+  public InputStream getInputStream() throws IOException;
+  
+  /**
+   * @return               An OutputStream associated with the Peer.
+   *                       This OutputStream will be valid until you close
+   *                       this peer with Peer#close.
+   */
+  public OutputStream getOutputStream() throws IOException;
+
+  /**
+   * @return               True if the peer resides on the same
+   *                       computer as we.
+   */
+  public boolean isLocal();
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java?rev=1430507&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java Tue Jan  8 20:44:09 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.Closeable;
+import org.apache.hadoop.classification.InterfaceAudience;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+
+@InterfaceAudience.Private
+public interface PeerServer extends Closeable {
+  /**
+   * Set the receive buffer size of the PeerServer.
+   * 
+   * @param size     The receive buffer size.
+   */
+  public void setReceiveBufferSize(int size) throws IOException;
+
+  /**
+   * Listens for a connection to be made to this server and accepts 
+   * it. The method blocks until a connection is made.
+   *
+   * @exception IOException  if an I/O error occurs when waiting for a
+   *               connection.
+   * @exception SecurityException  if a security manager exists and its  
+   *             <code>checkAccept</code> method doesn't allow the operation.
+   * @exception SocketTimeoutException if a timeout was previously set and
+   *             the timeout has been reached.
+   */
+  public Peer accept() throws IOException, SocketTimeoutException;
+
+  /**
+   * @return                 A string representation of the address we're
+   *                         listening on.
+   */
+  public String getListeningString();
+
+  /**
+   * Free the resources associated with this peer server.
+   * This normally includes sockets, etc.
+   *
+   * @throws IOException     If there is an error closing the PeerServer
+   */
+  public void close() throws IOException;
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java?rev=1430507&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java Tue Jan  8 20:44:09 2013
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.Server;
+
+@InterfaceAudience.Private
+public class TcpPeerServer implements PeerServer {
+  static Log LOG = LogFactory.getLog(TcpPeerServer.class);
+
+  private final ServerSocket serverSocket;
+
+  public static Peer peerFromSocket(Socket socket)
+      throws IOException {
+    Peer peer = null;
+    boolean success = false;
+    try {
+      // TCP_NODELAY is crucial here because of bad interactions between
+      // Nagle's Algorithm and Delayed ACKs. With connection keepalive
+      // between the client and DN, the conversation looks like:
+      //   1. Client -> DN: Read block X
+      //   2. DN -> Client: data for block X
+      //   3. Client -> DN: Status OK (successful read)
+      //   4. Client -> DN: Read block Y
+      // The fact that step #3 and #4 are both in the client->DN direction
+      // triggers Nagling. If the DN is using delayed ACKs, this results
+      // in a delay of 40ms or more.
+      //
+      // TCP_NODELAY disables nagling and thus avoids this performance
+      // disaster.
+      socket.setTcpNoDelay(true);
+      SocketChannel channel = socket.getChannel();
+      if (channel == null) {
+        peer = new BasicInetPeer(socket);
+      } else {
+        peer = new NioInetPeer(socket);
+      }
+      success = true;
+      return peer;
+    } finally {
+      if (!success) {
+        if (peer != null) peer.close();
+        socket.close();
+      }
+    }
+  }
+
+  public static Peer peerFromSocketAndKey(Socket s,
+        DataEncryptionKey key) throws IOException {
+    Peer peer = null;
+    boolean success = false;
+    try {
+      peer = peerFromSocket(s); 
+      if (key != null) {
+        peer = new EncryptedPeer(peer, key);
+      }
+      success = true;
+      return peer;
+    } finally {
+      if (!success) {
+        IOUtils.cleanup(null, peer);
+      }
+    }
+  }
+
+  /**
+   * Create a non-secure TcpPeerServer.
+   *
+   * @param socketWriteTimeout    The Socket write timeout in ms.
+   * @param bindAddr              The address to bind to.
+   * @throws IOException
+   */
+  public TcpPeerServer(int socketWriteTimeout,
+        InetSocketAddress bindAddr) throws IOException {
+    this.serverSocket = (socketWriteTimeout > 0) ?
+          ServerSocketChannel.open().socket() : new ServerSocket();
+    Server.bind(serverSocket, bindAddr, 0);
+  }
+
+  /**
+   * Create a secure TcpPeerServer.
+   *
+   * @param secureResources   Security resources.
+   */
+  public TcpPeerServer(SecureResources secureResources) {
+    this.serverSocket = secureResources.getStreamingSocket();
+  }
+  
+  /**
+   * @return     the IP address which this TcpPeerServer is listening on.
+   */
+  public InetSocketAddress getStreamingAddr() {
+    return new InetSocketAddress(
+        serverSocket.getInetAddress().getHostAddress(),
+        serverSocket.getLocalPort());
+  }
+
+  @Override
+  public void setReceiveBufferSize(int size) throws IOException {
+    this.serverSocket.setReceiveBufferSize(size);
+  }
+
+  @Override
+  public Peer accept() throws IOException, SocketTimeoutException {
+    Peer peer = peerFromSocket(serverSocket.accept());
+    return peer;
+  }
+
+  @Override
+  public String getListeningString() {
+    return serverSocket.getLocalSocketAddress().toString();
+  }
+  
+  @Override
+  public void close() throws IOException {
+    try {
+      serverSocket.close();
+    } catch(IOException e) {
+      LOG.error("error closing TcpPeerServer: ", e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "TcpPeerServer(" + getListeningString() + ")";
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1430507&r1=1430506&r2=1430507&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Jan  8 20:44:09 2013
@@ -46,6 +46,8 @@ import org.apache.hadoop.hdfs.BlockReade
 import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -206,12 +208,14 @@ public class JspHelper {
     // Use the block name for file name. 
     BlockReader blockReader = BlockReaderFactory.newBlockReader(
         new BlockReaderFactory.Params(new Conf(conf)).
-          setSocket(s).
+          setPeer(TcpPeerServer.peerFromSocketAndKey(s, encryptionKey)).
           setBlockToken(blockToken).setStartOffset(offsetIntoBlock).
           setLen(amtToRead).
-          setEncryptionKey(encryptionKey).
           setFile(BlockReaderFactory.getFileName(addr, poolId, blockId)).
-          setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)));
+          setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
+          setDatanodeID(new DatanodeID(addr.getAddress().toString(), 
+              addr.getHostName(), poolId, addr.getPort(), 0, 0)));
+    
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
     int retries = 2;
@@ -229,8 +233,7 @@ public class JspHelper {
       amtToRead -= numRead;
       readOffset += numRead;
     }
-    blockReader = null;
-    s.close();
+    blockReader.close(null);
     out.print(HtmlQuoting.quoteHtmlChars(new String(buf)));
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1430507&r1=1430506&r2=1430507&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Jan  8 20:44:09 2013
@@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -522,24 +523,19 @@ public class DataNode extends Configured
   
   private void initDataXceiver(Configuration conf) throws IOException {
     // find free port or use privileged port provided
-    ServerSocket ss;
-    if (secureResources == null) {
-      InetSocketAddress addr = DataNode.getStreamingAddr(conf);
-      ss = (dnConf.socketWriteTimeout > 0) ? 
-          ServerSocketChannel.open().socket() : new ServerSocket();
-          Server.bind(ss, addr, 0);
+    TcpPeerServer tcpPeerServer;
+    if (secureResources != null) {
+      tcpPeerServer = new TcpPeerServer(secureResources);
     } else {
-      ss = secureResources.getStreamingSocket();
+      tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
+          DataNode.getStreamingAddr(conf));
     }
-    ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); 
-
-    streamingAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
-                                     ss.getLocalPort());
-
+    tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+    streamingAddr = tcpPeerServer.getStreamingAddr();
     LOG.info("Opened streaming server at " + streamingAddr);
     this.threadGroup = new ThreadGroup("dataXceiverServer");
     this.dataXceiverServer = new Daemon(threadGroup, 
-        new DataXceiverServer(ss, conf, this));
+        new DataXceiverServer(tcpPeerServer, conf, this));
     this.threadGroup.setDaemon(true); // auto destroy when empty
   }
   



Mime
View raw message