hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1467538 [1/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/net/ src/main/java/org/apache/hadoop/hdfs/protocol/datatrans...
Date Sat, 13 Apr 2013 02:14:01 GMT
Author: atm
Date: Sat Apr 13 02:13:59 2013
New Revision: 1467538

URL: http://svn.apache.org/r1467538
Log:
HDFS-347. DFS read performance suboptimal when client co-located on nodes with data. Contributed by Colin Patrick McCabe.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/
      - copied from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/ShortCircuitLocalReads.apt.vm
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/ShortCircuitLocalReads.apt.vm
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitLegacyRead.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitLegacyRead.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
      - copied unchanged from r1467533, hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
Removed:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelLocalRead.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/pom.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props changed)
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/   (props changed)
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/   (props changed)
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props changed)
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs:r1430995-1467533

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Apr 13 02:13:59 2013
@@ -187,6 +187,9 @@ Trunk (Unreleased)
     HDFS-4633 TestDFSClientExcludedNodes fails sporadically if excluded nodes
     cache expires too quickly  (Chris Nauroth via Sanjay)
 
+    HDFS-347. DFS read performance suboptimal when client co-located on nodes
+    with data. (Colin Patrick McCabe via todd and atm)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -355,6 +358,62 @@ Trunk (Unreleased)
     HDFS-4674. TestBPOfferService fails on Windows due to failure parsing 
     datanode data directory as URI. (Chris Nauroth via suresh)
 
+  BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
+
+    HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
+    (Colin Patrick McCabe via todd)
+    
+    HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests.
+    (Colin Patrick McCabe via todd)
+    
+    HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths.
+    (Colin Patrick McCabe via todd)
+    
+    HDFS-4388. DomainSocket should throw AsynchronousCloseException when appropriate.
+    (Colin Patrick McCabe via todd)
+    
+    HDFS-4390. Bypass UNIX domain socket unit tests when they cannot be run.
+    (Colin Patrick McCabe via todd)
+    
+    HDFS-4400. DFSInputStream#getBlockReader: last retries should ignore the cache
+    (Colin Patrick McCabe via todd)
+    
+    HDFS-4401. Fix bug in DomainSocket path validation
+    (Colin Patrick McCabe via todd)
+    
+    HDFS-4402. Some small DomainSocket fixes: avoid findbugs warning, change
+    log level, etc. (Colin Patrick McCabe via todd)
+    
+    HDFS-4418. increase default FileInputStreamCache size (todd)
+    
+    HDFS-4416. Rename dfs.datanode.domain.socket.path to dfs.domain.socket.path
+    (Colin Patrick McCabe via todd)
+    
+    HDFS-4417. Fix case where local reads get disabled incorrectly
+    (Colin Patrick McCabe and todd via todd)
+    
+    HDFS-4433. Make TestPeerCache not flaky (Colin Patrick McCabe via todd)
+    
+    HDFS-4438. TestDomainSocket fails when system umask is set to 0002. (Colin
+    Patrick McCabe via atm)
+    
+    HDFS-4440. Avoid annoying log message when dfs.domain.socket.path is not
+    set. (Colin Patrick McCabe via atm)
+    
+    HDFS-4473. Don't create domain socket unless we need it. (Colin Patrick McCabe via atm)
+    
+    HDFS-4485. DN should chmod socket path a+w. (Colin Patrick McCabe via atm)
+    
+    HDFS-4453. Make a simple doc to describe the usage and design of the
+    shortcircuit read feature. (Colin Patrick McCabe via atm)
+    
+    HDFS-4496. DFSClient: don't create a domain socket unless we need it (Colin
+    Patrick McCabe via todd)
+    
+    HDFS-347: style cleanups (Colin Patrick McCabe via atm)
+    
+    HDFS-4538. Allow use of legacy blockreader (Colin Patrick McCabe via todd)
+
 Release 2.0.5-beta - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml Sat Apr 13 02:13:59 2013
@@ -290,6 +290,14 @@
        <Method name="persistPaxosData" />
        <Bug pattern="OS_OPEN_STREAM" />
      </Match>
+
+     <!-- getShortCircuitFdsForRead is supposed to return open streams. -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl" />
+       <Method name="getShortCircuitFdsForRead" />
+       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+     </Match>
+
      <!-- Don't complain about LocalDatanodeInfo's anonymous class -->
      <Match>
        <Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/pom.xml?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/pom.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/pom.xml Sat Apr 13 02:13:59 2013
@@ -515,6 +515,7 @@ http://maven.apache.org/xsd/maven-4.0.0.
           <excludes>
             <exclude>CHANGES.txt</exclude>
             <exclude>CHANGES.HDFS-1623.txt</exclude>
+            <exclude>CHANGES.HDFS-347.txt</exclude>
             <exclude>.idea/**</exclude>
             <exclude>src/main/conf/*</exclude>
             <exclude>src/main/docs/**</exclude>

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1430995-1467533

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Sat Apr 13 02:13:59 2013
@@ -18,10 +18,8 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.net.Socket;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 
 /**
  * A BlockReader is responsible for reading a single block
@@ -43,7 +41,29 @@ public interface BlockReader extends Byt
    */
   long skip(long n) throws IOException;
 
-  void close() throws IOException;
+  /**
+   * Returns an estimate of the number of bytes that can be read
+   * (or skipped over) from this input stream without performing
+   * network I/O.
+   */
+  int available() throws IOException;
+
+  /**
+   * Close the block reader.
+   *
+   * @param peerCache      The PeerCache to put the Peer we're using back
+   *                       into, or null if we should simply close the Peer
+   *                       we're using (along with its Socket).
+   *                       Ignored by Readers that don't maintain Peers.
+   * @param fisCache       The FileInputStreamCache to put our FileInputStreams
+   *                       back into, or null if we should simply close them.
+   *                       Ignored by Readers that don't maintain
+   *                       FileInputStreams.
+   *
+   * @throws IOException
+   */
+  void close(PeerCache peerCache, FileInputStreamCache fisCache)
+      throws IOException;
 
   /**
    * Read exactly the given amount of data, throwing an exception
@@ -60,20 +80,4 @@ public interface BlockReader extends Byt
    * filled or the next call will return EOF.
    */
   int readAll(byte[] buf, int offset, int len) throws IOException;
-
-  /**
-   * Take the socket used to talk to the DN.
-   */
-  Socket takeSocket();
-
-  /**
-   * Whether the BlockReader has reached the end of its input stream
-   * and successfully sent a status code back to the datanode.
-   */
-  boolean hasSentStatusCode();
-
-  /**
-   * @return a reference to the streams this block reader is using.
-   */
-  IOStreamPair getStreams();
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Sat Apr 13 02:13:59 2013
@@ -17,20 +17,31 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 
 
@@ -40,74 +51,181 @@ import org.apache.hadoop.security.token.
 @InterfaceAudience.Private
 public class BlockReaderFactory {
   /**
-   * @see #newBlockReader(Conf, Socket, String, ExtendedBlock, Token, long, long, int, boolean, String)
-   */
-  public static BlockReader newBlockReader(
-      Configuration conf,
-      Socket sock, String file,
-      ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len, DataEncryptionKey encryptionKey)
-          throws IOException {
-    int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
-        DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
-    return newBlockReader(new Conf(conf),
-        sock, file, block, blockToken, startOffset,
-        len, bufferSize, true, "", encryptionKey, null);
-  }
-
-  /**
    * Create a new BlockReader specifically to satisfy a read.
    * This method also sends the OP_READ_BLOCK request.
    * 
    * @param conf the DFSClient configuration
-   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
    * @param file  File location
    * @param block  The block object
    * @param blockToken  The block token for security
    * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
+   * @param len  The number of bytes to read, or -1 to read as many as
+   *             possible.
    * @param bufferSize  The IO buffer size (not the client buffer size)
+   *                    Ignored except on the legacy BlockReader.
    * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @return New BlockReader instance, or null on error.
+   * @param clientName  Client name.  Used for log messages.
+   * @param peer  The peer
+   * @param datanodeID  The datanode that the Peer is connected to
+   * @param domainSocketFactory  The DomainSocketFactory to notify if the Peer
+   *                             is a DomainPeer which turns out to be faulty.
+   *                             If null, no factory will be notified in this
+   *                             case.
+   * @param allowShortCircuitLocalReads  True if short-circuit local reads
+   *                                     should be allowed.
+   * @return New BlockReader instance
    */
   @SuppressWarnings("deprecation")
   public static BlockReader newBlockReader(
-                                     Conf conf,
-                                     Socket sock, String file,
+                                     Configuration conf,
+                                     String file,
                                      ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
-                                     int bufferSize, boolean verifyChecksum,
+                                     boolean verifyChecksum,
                                      String clientName,
-                                     DataEncryptionKey encryptionKey,
-                                     IOStreamPair ioStreams)
-                                     throws IOException {
-    
-    if (conf.useLegacyBlockReader) {
-      if (encryptionKey != null) {
-        throw new RuntimeException("Encryption is not supported with the legacy block reader.");
+                                     Peer peer,
+                                     DatanodeID datanodeID,
+                                     DomainSocketFactory domSockFactory,
+                                     boolean allowShortCircuitLocalReads)
+  throws IOException {
+    peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+        HdfsServerConstants.READ_TIMEOUT));
+    peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
+
+    if (peer.getDomainSocket() != null) {
+      if (allowShortCircuitLocalReads &&
+         (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+            DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
+        // If this is a domain socket, and short-circuit local reads are 
+        // enabled, try to set up a BlockReaderLocal.
+        BlockReader reader = newShortCircuitBlockReader(conf, file,
+            block, blockToken, startOffset, len, peer, datanodeID,
+            domSockFactory, verifyChecksum);
+        if (reader != null) {
+          // One we've constructed the short-circuit block reader, we don't
+          // need the socket any more.  So let's return it to the cache.
+          PeerCache peerCache = PeerCache.getInstance(
+              conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 
+                DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT),
+              conf.getLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, 
+                DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT));
+          peerCache.put(datanodeID, peer);
+          return reader;
+        }
+      }
+      // If this is a domain socket and we couldn't (or didn't want to) set
+      // up a BlockReaderLocal, check that we are allowed to pass data traffic
+      // over the socket before proceeding.
+      if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+            DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
+        throw new IOException("Because we can't do short-circuit access, " +
+          "and data traffic over domain sockets is disabled, " +
+          "we cannot use this socket to talk to " + datanodeID);
       }
-      return RemoteBlockReader.newBlockReader(
-          sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+    }
+
+    if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
+        DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) {
+      return RemoteBlockReader.newBlockReader(file,
+          block, blockToken, startOffset, len,
+          conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
+              DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
+          verifyChecksum, clientName, peer, datanodeID);
     } else {
-      if (ioStreams == null) {
-        ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
-            NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
-        if (encryptionKey != null) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  ioStreams.out, ioStreams.in, encryptionKey);
-          ioStreams = encryptedStreams;
+      return RemoteBlockReader2.newBlockReader(
+          file, block, blockToken, startOffset, len,
+          verifyChecksum, clientName, peer, datanodeID);
+    }
+  }
+
+  /**
+   * Create a new short-circuit BlockReader.
+   * 
+   * Here, we ask the DataNode to pass us file descriptors over our
+   * DomainSocket.  If the DataNode declines to do so, we'll return null here;
+   * otherwise, we'll return the BlockReaderLocal.  If the DataNode declines,
+   * this function will inform the DomainSocketFactory that short-circuit local
+   * reads are disabled for this DataNode, so that we don't ask again.
+   * 
+   * @param conf               the configuration.
+   * @param file               the file name. Used in log messages.
+   * @param block              The block object.
+   * @param blockToken         The block token for security.
+   * @param startOffset        The read offset, relative to block head.
+   * @param len                The number of bytes to read, or -1 to read 
+   *                           as many as possible.
+   * @param peer               The peer to use.
+   * @param datanodeID         The datanode that the Peer is connected to.
+   * @param domSockFactory     The DomainSocketFactory to notify if the Peer
+   *                           is a DomainPeer which turns out to be faulty.
+   *                           If null, no factory will be notified in this
+   *                           case.
+   * @param verifyChecksum     True if we should verify the checksums.
+   *                           Note: even if this is true, when
+   *                           DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
+   *                           set, we will skip checksums.
+   *
+   * @return                   The BlockReaderLocal, or null if the
+   *                           DataNode declined to provide short-circuit
+   *                           access.
+   * @throws IOException       If there was a communication error.
+   */
+  private static BlockReaderLocal newShortCircuitBlockReader(
+      Configuration conf, String file, ExtendedBlock block,
+      Token<BlockTokenIdentifier> blockToken, long startOffset,
+      long len, Peer peer, DatanodeID datanodeID,
+      DomainSocketFactory domSockFactory, boolean verifyChecksum)
+          throws IOException {
+    final DataOutputStream out =
+        new DataOutputStream(new BufferedOutputStream(
+          peer.getOutputStream()));
+    new Sender(out).requestShortCircuitFds(block, blockToken, 1);
+    DataInputStream in =
+        new DataInputStream(peer.getInputStream());
+    BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+        PBHelper.vintPrefixed(in));
+    DomainSocket sock = peer.getDomainSocket();
+    switch (resp.getStatus()) {
+    case SUCCESS:
+      BlockReaderLocal reader = null;
+      byte buf[] = new byte[1];
+      FileInputStream fis[] = new FileInputStream[2];
+      sock.recvFileInputStreams(fis, buf, 0, buf.length);
+      try {
+        reader = new BlockReaderLocal(conf, file, block,
+            startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum);
+      } finally {
+        if (reader == null) {
+          IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
         }
       }
-      
-      return RemoteBlockReader2.newBlockReader(
-          sock, file, block, blockToken, startOffset, len, bufferSize,
-          verifyChecksum, clientName, encryptionKey, ioStreams);
+      return reader;
+    case ERROR_UNSUPPORTED:
+      if (!resp.hasShortCircuitAccessVersion()) {
+        DFSClient.LOG.warn("short-circuit read access is disabled for " +
+            "DataNode " + datanodeID + ".  reason: " + resp.getMessage());
+        domSockFactory.disableShortCircuitForPath(sock.getPath());
+      } else {
+        DFSClient.LOG.warn("short-circuit read access for the file " +
+            file + " is disabled for DataNode " + datanodeID +
+            ".  reason: " + resp.getMessage());
+      }
+      return null;
+    case ERROR_ACCESS_TOKEN:
+      String msg = "access control error while " +
+          "attempting to set up short-circuit access to " +
+          file + resp.getMessage();
+      DFSClient.LOG.debug(msg);
+      throw new InvalidBlockTokenException(msg);
+    default:
+      DFSClient.LOG.warn("error while attempting to set up short-circuit " +
+          "access to " + file + ": " + resp.getMessage());
+      domSockFactory.disableShortCircuitForPath(sock.getPath());
+      return null;
     }
   }
-  
+
   /**
    * File name to print when accessing a block directly (from servlets)
    * @param s Address of the block location
@@ -119,4 +237,24 @@ public class BlockReaderFactory {
       final String poolId, final long blockId) {
     return s.toString() + ":" + poolId + ":" + blockId;
   }
+
+  /**
+   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
+   * This block reader implements the path-based style of local reads
+   * first introduced in HDFS-2246.
+   */
+  static BlockReader getLegacyBlockReaderLocal(UserGroupInformation ugi,
+      Configuration conf, String src, ExtendedBlock blk,
+      Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
+      int socketTimeout, long offsetIntoBlock,
+      boolean connectToDnViaHostname) throws InvalidToken, IOException {
+    try {
+      return BlockReaderLocalLegacy.newBlockReader(ugi, conf, src,
+          blk, accessToken, chosenNode, socketTimeout, offsetIntoBlock,
+          blk.getNumBytes() - offsetIntoBlock, connectToDnViaHostname);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(InvalidToken.class,
+          AccessControlException.class);
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Sat Apr 13 02:13:59 2013
@@ -18,33 +18,20 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.DataInputStream;
-import java.io.File;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
 /**
@@ -56,84 +43,19 @@ import org.apache.hadoop.util.DataChecks
  * <ul>
  * <li>The client performing short circuit reads must be configured at the
  * datanode.</li>
- * <li>The client gets the path to the file where block is stored using
- * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
- * RPC call</li>
- * <li>Client uses kerberos authentication to connect to the datanode over RPC,
- * if security is enabled.</li>
+ * <li>The client gets the file descriptors for the metadata file and the data 
+ * file for the block using
+ * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
+ * </li>
+ * <li>The client reads the file descriptors.</li>
  * </ul>
  */
 class BlockReaderLocal implements BlockReader {
-  private static final Log LOG = LogFactory.getLog(DFSClient.class);
-
-  //Stores the cache and proxy for a local datanode.
-  private static class LocalDatanodeInfo {
-    private ClientDatanodeProtocol proxy = null;
-    private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
-
-    LocalDatanodeInfo() {
-      final int cacheSize = 10000;
-      final float hashTableLoadFactor = 0.75f;
-      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
-      cache = Collections
-          .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
-              hashTableCapacity, hashTableLoadFactor, true) {
-            private static final long serialVersionUID = 1;
-
-            @Override
-            protected boolean removeEldestEntry(
-                Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
-              return size() > cacheSize;
-            }
-          });
-    }
-
-    private synchronized ClientDatanodeProtocol getDatanodeProxy(
-        UserGroupInformation ugi, final DatanodeInfo node,
-        final Configuration conf, final int socketTimeout,
-        final boolean connectToDnViaHostname) throws IOException {
-      if (proxy == null) {
-        try {
-          proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(node, conf,
-                  socketTimeout, connectToDnViaHostname);
-            }
-          });
-        } catch (InterruptedException e) {
-          LOG.warn("encountered exception ", e);
-        }
-      }
-      return proxy;
-    }
-    
-    private synchronized void resetDatanodeProxy() {
-      if (null != proxy) {
-        RPC.stopProxy(proxy);
-        proxy = null;
-      }
-    }
-
-    private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
-      return cache.get(b);
-    }
-
-    private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
-      cache.put(b, info);
-    }
-
-    private void removeBlockLocalPathInfo(ExtendedBlock b) {
-      cache.remove(b);
-    }
-  }
-  
-  // Multiple datanodes could be running on the local machine. Store proxies in
-  // a map keyed by the ipc port of the datanode.
-  private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+  static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
 
   private final FileInputStream dataIn; // reader for the data file
   private final FileInputStream checksumIn;   // reader for the checksum file
+  private final boolean verifyChecksum;
 
   /**
    * Offset from the most recent chunk boundary at which the next read should
@@ -153,7 +75,6 @@ class BlockReaderLocal implements BlockR
   private ByteBuffer slowReadBuff = null;
   private ByteBuffer checksumBuff = null;
   private DataChecksum checksum;
-  private final boolean verifyChecksum;
 
   private static DirectBufferPool bufferPool = new DirectBufferPool();
 
@@ -163,187 +84,92 @@ class BlockReaderLocal implements BlockR
   /** offset in block where reader wants to actually read */
   private long startOffset;
   private final String filename;
-  
-  /**
-   * The only way this object can be instantiated.
-   */
-  static BlockReaderLocal newBlockReader(UserGroupInformation ugi,
-      Configuration conf, String file, ExtendedBlock blk,
-      Token<BlockTokenIdentifier> token, DatanodeInfo node, int socketTimeout,
-      long startOffset, long length, boolean connectToDnViaHostname)
-      throws IOException {
-
-    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
-        .getIpcPort());
-    // check the cache first
-    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
-    if (pathinfo == null) {
-      pathinfo = getBlockPathInfo(ugi, blk, node, conf, socketTimeout, token,
-          connectToDnViaHostname);
-    }
-
-    // check to see if the file exists. It may so happen that the
-    // HDFS file has been deleted and this block-lookup is occurring
-    // on behalf of a new HDFS file. This time, the block file could
-    // be residing in a different portion of the fs.data.dir directory.
-    // In this case, we remove this entry from the cache. The next
-    // call to this method will re-populate the cache.
-    FileInputStream dataIn = null;
-    FileInputStream checksumIn = null;
-    BlockReaderLocal localBlockReader = null;
-    boolean skipChecksumCheck = skipChecksumCheck(conf);
-    try {
-      // get a local file system
-      File blkfile = new File(pathinfo.getBlockPath());
-      dataIn = new FileInputStream(blkfile);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
-            + blkfile.length() + " startOffset " + startOffset + " length "
-            + length + " short circuit checksum " + !skipChecksumCheck);
-      }
 
-      if (!skipChecksumCheck) {
-        // get the metadata file
-        File metafile = new File(pathinfo.getMetaPath());
-        checksumIn = new FileInputStream(metafile);
-
-        // read and handle the common header here. For now just a version
-        BlockMetadataHeader header = BlockMetadataHeader
-            .readHeader(new DataInputStream(checksumIn));
-        short version = header.getVersion();
-        if (version != BlockMetadataHeader.VERSION) {
-          LOG.warn("Wrong version (" + version + ") for metadata file for "
-              + blk + " ignoring ...");
-        }
-        DataChecksum checksum = header.getChecksum();
-        long firstChunkOffset = startOffset
-            - (startOffset % checksum.getBytesPerChecksum());
-        localBlockReader = new BlockReaderLocal(conf, file, blk, token,
-            startOffset, length, pathinfo, checksum, true, dataIn,
-            firstChunkOffset, checksumIn);
-      } else {
-        localBlockReader = new BlockReaderLocal(conf, file, blk, token,
-            startOffset, length, pathinfo, dataIn);
-      }
-    } catch (IOException e) {
-      // remove from cache
-      localDatanodeInfo.removeBlockLocalPathInfo(blk);
-      DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk
-          + " from cache because local file " + pathinfo.getBlockPath()
-          + " could not be opened.");
-      throw e;
-    } finally {
-      if (localBlockReader == null) {
-        if (dataIn != null) {
-          dataIn.close();
-        }
-        if (checksumIn != null) {
-          checksumIn.close();
-        }
-      }
-    }
-    return localBlockReader;
-  }
+  private final DatanodeID datanodeID;
+  private final ExtendedBlock block;
   
-  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
-    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
-    if (ldInfo == null) {
-      ldInfo = new LocalDatanodeInfo();
-      localDatanodeInfoMap.put(port, ldInfo);
-    }
-    return ldInfo;
-  }
-  
-  private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
-      ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
-      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
-      throws IOException {
-    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
-    BlockLocalPathInfo pathinfo = null;
-    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
-        conf, timeout, connectToDnViaHostname);
-    try {
-      // make RPC to local datanode to find local pathnames of blocks
-      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
-      if (pathinfo != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
-        }
-        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
-      }
-    } catch (IOException e) {
-      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
-      throw e;
-    }
-    return pathinfo;
-  }
-  
-  private static boolean skipChecksumCheck(Configuration conf) {
-    return conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
-  }
-  
-  private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
-    int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-
-    if (bufferSizeBytes < bytesPerChecksum) {
-      throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + bufferSizeBytes + ") " +
-          "is not large enough to hold a single chunk (" + bytesPerChecksum +  "). Please configure " +
+  private static int getSlowReadBufferNumChunks(Configuration conf,
+      int bytesPerChecksum) {
+
+    int bufSize =
+        conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+
+    if (bufSize < bytesPerChecksum) {
+      throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
+          bufSize + ") is not large enough to hold a single chunk (" +
+          bytesPerChecksum +  "). Please configure " +
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
     }
 
     // Round down to nearest chunk size
-    return bufferSizeBytes / bytesPerChecksum;
-  }
-
-  private BlockReaderLocal(Configuration conf, String hdfsfile,
-      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
-      long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
-      throws IOException {
-    this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
-        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
-        dataIn, startOffset, null);
+    return bufSize / bytesPerChecksum;
   }
 
-  private BlockReaderLocal(Configuration conf, String hdfsfile,
-      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
-      long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
-      boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
-      FileInputStream checksumIn) throws IOException {
-    this.filename = hdfsfile;
-    this.checksum = checksum;
-    this.verifyChecksum = verifyChecksum;
-    this.startOffset = Math.max(startOffset, 0);
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-
+  public BlockReaderLocal(Configuration conf, String filename,
+      ExtendedBlock block, long startOffset, long length,
+      FileInputStream dataIn, FileInputStream checksumIn,
+      DatanodeID datanodeID, boolean verifyChecksum) throws IOException {
     this.dataIn = dataIn;
     this.checksumIn = checksumIn;
-    this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
-
-    int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
-    slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
-    checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
-    // Initially the buffers have nothing to read.
-    slowReadBuff.flip();
-    checksumBuff.flip();
+    this.startOffset = Math.max(startOffset, 0);
+    this.filename = filename;
+    this.datanodeID = datanodeID;
+    this.block = block;
+
+    // read and handle the common header here. For now just a version
+    checksumIn.getChannel().position(0);
+    BlockMetadataHeader header = BlockMetadataHeader
+        .readHeader(new DataInputStream(
+            new BufferedInputStream(checksumIn,
+                BlockMetadataHeader.getHeaderSize())));
+    short version = header.getVersion();
+    if (version != BlockMetadataHeader.VERSION) {
+      throw new IOException("Wrong version (" + version + ") of the " +
+          "metadata file for " + filename + ".");
+    }
+    if (!verifyChecksum) {
+      this.verifyChecksum = false; 
+    } else {
+      this.verifyChecksum = !conf.getBoolean(DFSConfigKeys.
+          DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, 
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+    }
+    long firstChunkOffset;
+    if (this.verifyChecksum) {
+      this.checksum = header.getChecksum();
+      this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
+      this.checksumSize = this.checksum.getChecksumSize();
+      firstChunkOffset = startOffset
+          - (startOffset % checksum.getBytesPerChecksum());
+      this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
+
+      int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+      slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+      checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+      // Initially the buffers have nothing to read.
+      slowReadBuff.flip();
+      checksumBuff.flip();
+      long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+      IOUtils.skipFully(checksumIn, checkSumOffset);
+    } else {
+      firstChunkOffset = startOffset;
+      this.checksum = null;
+      this.bytesPerChecksum = 0;
+      this.checksumSize = 0;
+      this.offsetFromChunkBoundary = 0;
+    }
+    
     boolean success = false;
     try {
-      // Skip both input streams to beginning of the chunk containing startOffset
-      IOUtils.skipFully(dataIn, firstChunkOffset);
-      if (checksumIn != null) {
-        long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
-        IOUtils.skipFully(checksumIn, checkSumOffset);
-      }
+      // Reposition both input streams to the beginning of the chunk
+      // containing startOffset
+      this.dataIn.getChannel().position(firstChunkOffset);
       success = true;
     } finally {
       if (!success) {
-        bufferPool.returnBuffer(slowReadBuff);
-        bufferPool.returnBuffer(checksumBuff);
+        if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
+        if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
       }
     }
   }
@@ -663,10 +489,17 @@ class BlockReaderLocal implements BlockR
   }
 
   @Override
-  public synchronized void close() throws IOException {
-    dataIn.close();
-    if (checksumIn != null) {
-      checksumIn.close();
+  public synchronized void close(PeerCache peerCache,
+      FileInputStreamCache fisCache) throws IOException {
+    if (fisCache != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("putting FileInputStream for " + filename +
+            " back into FileInputStreamCache");
+      }
+      fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn});
+    } else {
+      LOG.debug("closing FileInputStream for " + filename);
+      IOUtils.cleanup(LOG, dataIn, checksumIn);
     }
     if (slowReadBuff != null) {
       bufferPool.returnBuffer(slowReadBuff);
@@ -691,17 +524,8 @@ class BlockReaderLocal implements BlockR
   }
 
   @Override
-  public Socket takeSocket() {
-    return null;
-  }
-
-  @Override
-  public boolean hasSentStatusCode() {
-    return false;
-  }
-
-  @Override
-  public IOStreamPair getStreams() {
-    return null;
+  public int available() throws IOException {
+    // We never do network I/O in BlockReaderLocal.
+    return Integer.MAX_VALUE;
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Sat Apr 13 02:13:59 2013
@@ -129,7 +129,6 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -193,12 +192,13 @@ public class DFSClient implements java.i
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
   private final String authority;
-  final SocketCache socketCache;
+  final PeerCache peerCache;
   final Conf dfsClientConf;
   private Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
-
+  private boolean shouldUseLegacyBlockReaderLocal;
+  
   /**
    * DFSClient configuration 
    */
@@ -225,11 +225,16 @@ public class DFSClient implements java.i
     final short defaultReplication;
     final String taskId;
     final FsPermission uMask;
-    final boolean useLegacyBlockReader;
+    final boolean useLegacyBlockReaderLocal;
     final boolean connectToDnViaHostname;
     final boolean getHdfsBlocksMetadataEnabled;
     final int getFileBlockStorageLocationsNumThreads;
     final int getFileBlockStorageLocationsTimeout;
+    final String domainSocketPath;
+    final boolean skipShortCircuitChecksums;
+    final int shortCircuitBufferSize;
+    final boolean shortCircuitLocalReads;
+    final boolean domainSocketDataTraffic;
 
     Conf(Configuration conf) {
       maxFailoverAttempts = conf.getInt(
@@ -280,9 +285,9 @@ public class DFSClient implements java.i
           .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
               DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
       uMask = FsPermission.getUMask(conf);
-      useLegacyBlockReader = conf.getBoolean(
-          DFS_CLIENT_USE_LEGACY_BLOCKREADER,
-          DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
+      useLegacyBlockReaderLocal = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
       connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
           DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
       getHdfsBlocksMetadataEnabled = conf.getBoolean(
@@ -294,6 +299,20 @@ public class DFSClient implements java.i
       getFileBlockStorageLocationsTimeout = conf.getInt(
           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
+      domainSocketPath = conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+          DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
+      skipShortCircuitChecksums = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+      shortCircuitBufferSize = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+      shortCircuitLocalReads = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+      domainSocketDataTraffic = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+        DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
     }
 
     private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -351,7 +370,7 @@ public class DFSClient implements java.i
   private final Map<String, DFSOutputStream> filesBeingWritten
       = new HashMap<String, DFSOutputStream>();
 
-  private boolean shortCircuitLocalReads;
+  private final DomainSocketFactory domainSocketFactory;
   
   /**
    * Same as this(NameNode.getAddress(conf), conf);
@@ -395,6 +414,11 @@ public class DFSClient implements java.i
     throws IOException {
     // Copy only the required DFSClient configuration
     this.dfsClientConf = new Conf(conf);
+    this.shouldUseLegacyBlockReaderLocal = 
+        this.dfsClientConf.useLegacyBlockReaderLocal;
+    if (this.dfsClientConf.useLegacyBlockReaderLocal) {
+      LOG.debug("Using legacy short-circuit local reads.");
+    }
     this.conf = conf;
     this.stats = stats;
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
@@ -424,12 +448,8 @@ public class DFSClient implements java.i
     }
 
     // read directly from the block file if configured.
-    this.shortCircuitLocalReads = conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
-        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Short circuit read is " + shortCircuitLocalReads);
-    }
+    this.domainSocketFactory = new DomainSocketFactory(dfsClientConf);
+
     String localInterfaces[] =
       conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
     localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
@@ -439,7 +459,7 @@ public class DFSClient implements java.i
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
     
-    this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
+    this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
   }
 
   /**
@@ -794,29 +814,11 @@ public class DFSClient implements java.i
                                      AccessControlException.class);
     }
   }
-
-  /**
-   * Get {@link BlockReader} for short circuited local reads.
-   */
-  static BlockReader getLocalBlockReader(UserGroupInformation ugi,
-      Configuration conf, String src, ExtendedBlock blk,
-      Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
-      int socketTimeout, long offsetIntoBlock, boolean connectToDnViaHostname)
-      throws InvalidToken, IOException {
-    try {
-      return BlockReaderLocal.newBlockReader(ugi, conf, src, blk, accessToken,
-          chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
-              - offsetIntoBlock, connectToDnViaHostname);
-    } catch (RemoteException re) {
-      throw re.unwrapRemoteException(InvalidToken.class,
-          AccessControlException.class);
-    }
-  }
   
   private static Map<String, Boolean> localAddrMap = Collections
       .synchronizedMap(new HashMap<String, Boolean>());
   
-  private static boolean isLocalAddress(InetSocketAddress targetAddr) {
+  static boolean isLocalAddress(InetSocketAddress targetAddr) {
     InetAddress addr = targetAddr.getAddress();
     Boolean cached = localAddrMap.get(addr.getHostAddress());
     if (cached != null) {
@@ -2218,10 +2220,6 @@ public class DFSClient implements java.i
       super(in);
     }
   }
-  
-  boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
-    return shortCircuitLocalReads && isLocalAddress(targetAddr);
-  }
 
   void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
     DatanodeInfo [] dnArr = { dn };
@@ -2245,13 +2243,15 @@ public class DFSClient implements java.i
         + ", ugi=" + ugi + "]"; 
   }
 
-  void disableShortCircuit() {
-    LOG.info("Short circuit is disabled");
-    shortCircuitLocalReads = false;
+  public DomainSocketFactory getDomainSocketFactory() {
+    return domainSocketFactory;
   }
-  
-  @VisibleForTesting
-  boolean getShortCircuitLocalReads() {
-    return shortCircuitLocalReads;
+
+  public void disableLegacyBlockReaderLocal() {
+    shouldUseLegacyBlockReaderLocal = false;
+  }
+
+  public boolean useLegacyBlockReaderLocal() {
+    return shouldUseLegacyBlockReaderLocal;
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Sat Apr 13 02:13:59 2013
@@ -265,6 +265,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
   public static final String  DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader";
   public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false;
+  public static final String  DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL = "dfs.client.use.legacy.blockreader.local";
+  public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false;
   public static final String  DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
   public static final long    DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
@@ -347,7 +349,13 @@ public class DFSConfigKeys extends Commo
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
+  public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY = "dfs.client.read.shortcircuit.streams.cache.size";
+  public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 100;
+  public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY = "dfs.client.read.shortcircuit.streams.cache.expiry.ms";
+  public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5000;
   public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
+  public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
+  public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -404,6 +412,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_MAX_OP_SIZE_DEFAULT = 50 * 1024 * 1024;
   
   public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
+  public static final String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path";
+  public static final String DFS_DOMAIN_SOCKET_PATH_DEFAULT = "";
 
   // HA related configuration
   public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Sat Apr 13 02:13:59 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -32,18 +33,20 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -51,20 +54,23 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /****************************************************************
  * DFSInputStream provides bytes from a named file.  It handles 
  * negotiation of the namenode and various datanodes as necessary.
  ****************************************************************/
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
-  private final SocketCache socketCache;
-
+  @VisibleForTesting
+  static boolean tcpReadsDisabledForTesting = false;
+  private final PeerCache peerCache;
   private final DFSClient dfsClient;
   private boolean closed = false;
-
   private final String src;
   private final long prefetchSize;
   private BlockReader blockReader = null;
@@ -76,6 +82,8 @@ public class DFSInputStream extends FSIn
   private long pos = 0;
   private long blockEnd = -1;
 
+  private final FileInputStreamCache fileInputStreamCache;
+
   /**
    * This variable tracks the number of failures since the start of the
    * most recent user-facing operation. That is to say, it should be reset
@@ -110,7 +118,14 @@ public class DFSInputStream extends FSIn
     this.verifyChecksum = verifyChecksum;
     this.buffersize = buffersize;
     this.src = src;
-    this.socketCache = dfsClient.socketCache;
+    this.peerCache = dfsClient.peerCache;
+    this.fileInputStreamCache = new FileInputStreamCache(
+      dfsClient.conf.getInt(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
+      dfsClient.conf.getLong(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT));
     prefetchSize = dfsClient.getConf().prefetchSize;
     timeWindow = dfsClient.getConf().timeWindow;
     nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@@ -243,7 +258,9 @@ public class DFSInputStream extends FSIn
         locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
   }
 
-  private synchronized boolean blockUnderConstruction() {
+  // Short circuit local reads are forbidden for files that are
+  // under construction.  See HDFS-2757.
+  synchronized boolean shortCircuitForbidden() {
     return locatedBlocks.isUnderConstruction();
   }
 
@@ -424,7 +441,7 @@ public class DFSInputStream extends FSIn
 
     // Will be getting a new BlockReader.
     if (blockReader != null) {
-      closeBlockReader(blockReader);
+      blockReader.close(peerCache, fileInputStreamCache);
       blockReader = null;
     }
 
@@ -462,7 +479,7 @@ public class DFSInputStream extends FSIn
         return chosenNode;
       } catch (AccessControlException ex) {
         DFSClient.LOG.warn("Short circuit access failed " + ex);
-        dfsClient.disableShortCircuit();
+        dfsClient.disableLegacyBlockReaderLocal();
         continue;
       } catch (IOException ex) {
         if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
@@ -510,10 +527,11 @@ public class DFSInputStream extends FSIn
     dfsClient.checkOpen();
 
     if (blockReader != null) {
-      closeBlockReader(blockReader);
+      blockReader.close(peerCache, fileInputStreamCache);
       blockReader = null;
     }
     super.close();
+    fileInputStreamCache.close();
     closed = true;
   }
 
@@ -811,7 +829,7 @@ public class DFSInputStream extends FSIn
         addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
       } catch (AccessControlException ex) {
         DFSClient.LOG.warn("Short circuit access failed " + ex);
-        dfsClient.disableShortCircuit();
+        dfsClient.disableLegacyBlockReaderLocal();
         continue;
       } catch (IOException e) {
         if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
@@ -837,7 +855,7 @@ public class DFSInputStream extends FSIn
         }
       } finally {
         if (reader != null) {
-          closeBlockReader(reader);
+          reader.close(peerCache, fileInputStreamCache);
         }
       }
       // Put chosen node into dead list, continue
@@ -845,22 +863,34 @@ public class DFSInputStream extends FSIn
     }
   }
 
-  /**
-   * Close the given BlockReader and cache its socket.
-   */
-  private void closeBlockReader(BlockReader reader) throws IOException {
-    if (reader.hasSentStatusCode()) {
-      IOStreamPair ioStreams = reader.getStreams();
-      Socket oldSock = reader.takeSocket();
-      socketCache.put(oldSock, ioStreams);
+  private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
+    Peer peer = null;
+    boolean success = false;
+    Socket sock = null;
+    try {
+      sock = dfsClient.socketFactory.createSocket();
+      NetUtils.connect(sock, addr,
+        dfsClient.getRandomLocalInterfaceAddr(),
+        dfsClient.getConf().socketTimeout);
+      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
+          dfsClient.getDataEncryptionKey());
+      success = true;
+      return peer;
+    } finally {
+      if (!success) {
+        IOUtils.closeQuietly(peer);
+        IOUtils.closeQuietly(sock);
+      }
     }
-    reader.close();
   }
 
   /**
    * Retrieve a BlockReader suitable for reading.
    * This method will reuse the cached connection to the DN if appropriate.
    * Otherwise, it will create a new connection.
+   * Throwing an IOException from this method is basically equivalent to 
+   * declaring the DataNode bad, so we try to connect a lot of different ways
+   * before doing that.
    *
    * @param dnAddr  Address of the datanode
    * @param chosenNode Chosen datanode information
@@ -885,82 +915,113 @@ public class DFSInputStream extends FSIn
                                        boolean verifyChecksum,
                                        String clientName)
       throws IOException {
-    
-    // Can't local read a block under construction, see HDFS-2757
-    if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
-        !blockUnderConstruction()) {
-      return DFSClient.getLocalBlockReader(dfsClient.ugi, dfsClient.conf,
-          src, block, blockToken, chosenNode, dfsClient.hdfsTimeout,
-          startOffset, dfsClient.connectToDnViaHostname());
+    // Firstly, we check to see if we have cached any file descriptors for
+    // local blocks.  If so, we can just re-use those file descriptors.
+    FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
+    if (fis != null) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
+            "the FileInputStreamCache.");
+      }
+      return new BlockReaderLocal(dfsClient.conf, file,
+        block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum);
     }
     
-    IOException err = null;
-    boolean fromCache = true;
+    // If the legacy local block reader is enabled and we are reading a local
+    // block, try to create a BlockReaderLocalLegacy.  The legacy local block
+    // reader implements local reads in the style first introduced by HDFS-2246.
+    if ((dfsClient.useLegacyBlockReaderLocal()) &&
+        DFSClient.isLocalAddress(dnAddr) &&
+        (!shortCircuitForbidden())) {
+      try {
+        return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient.ugi,
+            dfsClient.conf, clientName, block, blockToken, chosenNode,
+            dfsClient.hdfsTimeout, startOffset,dfsClient.connectToDnViaHostname());
+      } catch (IOException e) {
+        DFSClient.LOG.warn("error creating legacy BlockReaderLocal.  " +
+            "Disabling legacy local reads.", e);
+        dfsClient.disableLegacyBlockReaderLocal();
+      }
+    }
 
-    // Allow retry since there is no way of knowing whether the cached socket
-    // is good until we actually use it.
-    for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
-      SocketAndStreams sockAndStreams = null;
-      // Don't use the cache on the last attempt - it's possible that there
-      // are arbitrarily many unusable sockets in the cache, but we don't
-      // want to fail the read.
-      if (retries < nCachedConnRetry) {
-        sockAndStreams = socketCache.get(dnAddr);
-      }
-      Socket sock;
-      if (sockAndStreams == null) {
-        fromCache = false;
+    // Look for cached domain peers.
+    int cacheTries = 0;
+    DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
+    BlockReader reader = null;
+    for (; cacheTries < nCachedConnRetry; ++cacheTries) {
+      Peer peer = peerCache.get(chosenNode, true);
+      if (peer == null) break;
+      try {
+        boolean allowShortCircuitLocalReads = dfsClient.getConf().
+            shortCircuitLocalReads && (!shortCircuitForbidden());
+        reader = BlockReaderFactory.newBlockReader(
+            dfsClient.conf, file, block, blockToken, startOffset,
+            len, verifyChecksum, clientName, peer, chosenNode, 
+            dsFactory, allowShortCircuitLocalReads);
+        return reader;
+      } catch (IOException ex) {
+        DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
+            "Closing stale " + peer, ex);
+      } finally {
+        if (reader == null) {
+          IOUtils.closeQuietly(peer);
+        }
+      }
+    }
 
-        sock = dfsClient.socketFactory.createSocket();
-        
-        // TCP_NODELAY is crucial here because of bad interactions between
-        // Nagle's Algorithm and Delayed ACKs. With connection keepalive
-        // between the client and DN, the conversation looks like:
-        //   1. Client -> DN: Read block X
-        //   2. DN -> Client: data for block X
-        //   3. Client -> DN: Status OK (successful read)
-        //   4. Client -> DN: Read block Y
-        // The fact that step #3 and #4 are both in the client->DN direction
-        // triggers Nagling. If the DN is using delayed ACKs, this results
-        // in a delay of 40ms or more.
-        //
-        // TCP_NODELAY disables nagling and thus avoids this performance
-        // disaster.
-        sock.setTcpNoDelay(true);
-
-        NetUtils.connect(sock, dnAddr,
-            dfsClient.getRandomLocalInterfaceAddr(),
-            dfsClient.getConf().socketTimeout);
-        sock.setSoTimeout(dfsClient.getConf().socketTimeout);
-      } else {
-        sock = sockAndStreams.sock;
+    // Try to create a DomainPeer.
+    DomainSocket domSock = dsFactory.create(dnAddr, this);
+    if (domSock != null) {
+      Peer peer = new DomainPeer(domSock);
+      try {
+        boolean allowShortCircuitLocalReads = dfsClient.getConf().
+            shortCircuitLocalReads && (!shortCircuitForbidden());
+        reader = BlockReaderFactory.newBlockReader(
+            dfsClient.conf, file, block, blockToken, startOffset,
+            len, verifyChecksum, clientName, peer, chosenNode, 
+            dsFactory, allowShortCircuitLocalReads);
+        return reader;
+      } catch (IOException e) {
+        DFSClient.LOG.warn("failed to connect to " + domSock, e);
+      } finally {
+        if (reader == null) {
+         // If the Peer that we got the error from was a DomainPeer,
+         // mark the socket path as bad, so that newDataSocket will not try 
+         // to re-open this socket for a while.
+         dsFactory.disableDomainSocketPath(domSock.getPath());
+         IOUtils.closeQuietly(peer);
+        }
       }
+    }
 
+    // Look for cached peers.
+    for (; cacheTries < nCachedConnRetry; ++cacheTries) {
+      Peer peer = peerCache.get(chosenNode, false);
+      if (peer == null) break;
       try {
-        // The OP_READ_BLOCK request is sent as we make the BlockReader
-        BlockReader reader =
-            BlockReaderFactory.newBlockReader(dfsClient.getConf(),
-                                       sock, file, block,
-                                       blockToken,
-                                       startOffset, len,
-                                       bufferSize, verifyChecksum,
-                                       clientName,
-                                       dfsClient.getDataEncryptionKey(),
-                                       sockAndStreams == null ? null : sockAndStreams.ioStreams);
+        reader = BlockReaderFactory.newBlockReader(
+            dfsClient.conf, file, block, blockToken, startOffset,
+            len, verifyChecksum, clientName, peer, chosenNode, 
+            dsFactory, false);
         return reader;
       } catch (IOException ex) {
-        // Our socket is no good.
-        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
-        if (sockAndStreams != null) {
-          sockAndStreams.close();
-        } else {
-          sock.close();
+        DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
+          peer, ex);
+      } finally {
+        if (reader == null) {
+          IOUtils.closeQuietly(peer);
         }
-        err = ex;
       }
     }
-
-    throw err;
+    if (tcpReadsDisabledForTesting) {
+      throw new IOException("TCP reads are disabled.");
+    }
+    // Try to create a new remote peer.
+    Peer peer = newTcpPeer(dnAddr);
+    return BlockReaderFactory.newBlockReader(
+        dfsClient.conf, file, block, blockToken, startOffset,
+        len, verifyChecksum, clientName, peer, chosenNode, 
+        dsFactory, false);
   }
 
 
@@ -1094,7 +1155,7 @@ public class DFSInputStream extends FSIn
       // the TCP buffer, then just eat up the intervening data.
       //
       int diff = (int)(targetPos - pos);
-      if (diff <= DFSClient.TCP_WINDOW_SIZE) {
+      if (diff <= blockReader.available()) {
         try {
           pos += blockReader.skip(diff);
           if (pos == targetPos) {



Mime
View raw message