hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [05/50] [abbrv] hadoop git commit: HDFS-8057 Move BlockReader implementation to the client implementation package. Contributed by Takanobu Asanuma
Date Thu, 28 Apr 2016 18:01:49 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
new file mode 100644
index 0000000..f4b62d9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
@@ -0,0 +1,878 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.ReplicaAccessor;
+import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.PerformanceAdvisory;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.htrace.core.Tracer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to create BlockReader implementations.
+ */
+@InterfaceAudience.Private
+public class BlockReaderFactory implements ShortCircuitReplicaCreator {
+  static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class);
+
+  public static class FailureInjector {
+    public void injectRequestFileDescriptorsFailure() throws IOException {
+      // do nothing
+    }
+    public boolean getSupportsReceiptVerification() {
+      return true;
+    }
+  }
+
+  @VisibleForTesting
+  static ShortCircuitReplicaCreator
+      createShortCircuitReplicaInfoCallback = null;
+
+  private final DfsClientConf conf;
+
+  /**
+   * Injects failures into specific operations during unit tests.
+   */
+  private static FailureInjector failureInjector = new FailureInjector();
+
+  /**
+   * The file name, for logging and debugging purposes.
+   */
+  private String fileName;
+
+  /**
+   * The block ID and block pool ID to use.
+   */
+  private ExtendedBlock block;
+
+  /**
+   * The block token to use for security purposes.
+   */
+  private Token<BlockTokenIdentifier> token;
+
+  /**
+   * The offset within the block to start reading at.
+   */
+  private long startOffset;
+
+  /**
+   * If false, we won't try to verify the block checksum.
+   */
+  private boolean verifyChecksum;
+
+  /**
+   * The name of this client.
+   */
+  private String clientName;
+
+  /**
+   * The DataNode we're talking to.
+   */
+  private DatanodeInfo datanode;
+
+  /**
+   * StorageType of replica on DataNode.
+   */
+  private StorageType storageType;
+
+  /**
+   * If false, we won't try short-circuit local reads.
+   */
+  private boolean allowShortCircuitLocalReads;
+
+  /**
+   * The ClientContext to use for things like the PeerCache.
+   */
+  private ClientContext clientContext;
+
+  /**
+   * Number of bytes to read. Must be set to a non-negative value.
+   */
+  private long length = -1;
+
+  /**
+   * Caching strategy to use when reading the block.
+   */
+  private CachingStrategy cachingStrategy;
+
+  /**
+   * Socket address to use to connect to peer.
+   */
+  private InetSocketAddress inetSocketAddress;
+
+  /**
+   * Remote peer factory to use to create a peer, if needed.
+   */
+  private RemotePeerFactory remotePeerFactory;
+
+  /**
+   * UserGroupInformation to use for legacy block reader local objects,
+   * if needed.
+   */
+  private UserGroupInformation userGroupInformation;
+
+  /**
+   * Configuration to use for legacy block reader local objects, if needed.
+   */
+  private Configuration configuration;
+
+  /**
+   * The HTrace tracer to use.
+   */
+  private Tracer tracer;
+
+  /**
+   * Information about the domain socket path we should use to connect to the
+   * local peer-- or null if we haven't examined the local domain socket.
+   */
+  private DomainSocketFactory.PathInfo pathInfo;
+
+  /**
+   * The remaining number of times that we'll try to pull a socket out of the
+   * cache.
+   */
+  private int remainingCacheTries;
+
+  public BlockReaderFactory(DfsClientConf conf) {
+    this.conf = conf;
+    this.remainingCacheTries = conf.getNumCachedConnRetry();
+  }
+
+  public BlockReaderFactory setFileName(String fileName) {
+    this.fileName = fileName;
+    return this;
+  }
+
+  public BlockReaderFactory setBlock(ExtendedBlock block) {
+    this.block = block;
+    return this;
+  }
+
+  public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
+    this.token = token;
+    return this;
+  }
+
+  public BlockReaderFactory setStartOffset(long startOffset) {
+    this.startOffset = startOffset;
+    return this;
+  }
+
+  public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
+    this.verifyChecksum = verifyChecksum;
+    return this;
+  }
+
+  public BlockReaderFactory setClientName(String clientName) {
+    this.clientName = clientName;
+    return this;
+  }
+
+  public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
+    this.datanode = datanode;
+    return this;
+  }
+
+  public BlockReaderFactory setStorageType(StorageType storageType) {
+    this.storageType = storageType;
+    return this;
+  }
+
+  public BlockReaderFactory setAllowShortCircuitLocalReads(
+      boolean allowShortCircuitLocalReads) {
+    this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
+    return this;
+  }
+
+  public BlockReaderFactory setClientCacheContext(
+      ClientContext clientContext) {
+    this.clientContext = clientContext;
+    return this;
+  }
+
+  public BlockReaderFactory setLength(long length) {
+    this.length = length;
+    return this;
+  }
+
+  public BlockReaderFactory setCachingStrategy(
+      CachingStrategy cachingStrategy) {
+    this.cachingStrategy = cachingStrategy;
+    return this;
+  }
+
+  public BlockReaderFactory setInetSocketAddress (
+      InetSocketAddress inetSocketAddress) {
+    this.inetSocketAddress = inetSocketAddress;
+    return this;
+  }
+
+  public BlockReaderFactory setUserGroupInformation(
+      UserGroupInformation userGroupInformation) {
+    this.userGroupInformation = userGroupInformation;
+    return this;
+  }
+
+  public BlockReaderFactory setRemotePeerFactory(
+      RemotePeerFactory remotePeerFactory) {
+    this.remotePeerFactory = remotePeerFactory;
+    return this;
+  }
+
+  public BlockReaderFactory setConfiguration(
+      Configuration configuration) {
+    this.configuration = configuration;
+    return this;
+  }
+
+  public BlockReaderFactory setTracer(Tracer tracer) {
+    this.tracer = tracer;
+    return this;
+  }
+
+  @VisibleForTesting
+  public static void setFailureInjectorForTesting(FailureInjector injector) {
+    failureInjector = injector;
+  }
+
+  /**
+   * Build a BlockReader with the given options.
+   *
+   * This function will do the best it can to create a block reader that meets
+   * all of our requirements.  We prefer short-circuit block readers
+   * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
+   * former avoid the overhead of socket communication.  If short-circuit is
+   * unavailable, our next fallback is data transfer over UNIX domain sockets,
+   * if dfs.client.domain.socket.data.traffic has been enabled.  If that doesn't
+   * work, we will try to create a remote block reader that operates over TCP
+   * sockets.
+   *
+   * There are a few caches that are important here.
+   *
+   * The ShortCircuitCache stores file descriptor objects which have been passed
+   * from the DataNode.
+   *
+   * The DomainSocketFactory stores information about UNIX domain socket paths
+   * that we not been able to use in the past, so that we don't waste time
+   * retrying them over and over.  (Like all the caches, it does have a timeout,
+   * though.)
+   *
+   * The PeerCache stores peers that we have used in the past.  If we can reuse
+   * one of these peers, we avoid the overhead of re-opening a socket.  However,
+   * if the socket has been timed out on the remote end, our attempt to reuse
+   * the socket may end with an IOException.  For that reason, we limit our
+   * attempts at socket reuse to dfs.client.cached.conn.retry times.  After
+   * that, we create new sockets.  This avoids the problem where a thread tries
+   * to talk to a peer that it hasn't talked to in a while, and has to clean out
+   * every entry in a socket cache full of stale entries.
+   *
+   * @return The new BlockReader.  We will not return null.
+   *
+   * @throws InvalidToken
+   *             If the block token was invalid.
+   *         InvalidEncryptionKeyException
+   *             If the encryption key was invalid.
+   *         Other IOException
+   *             If there was another problem.
+   */
+  public BlockReader build() throws IOException {
+    Preconditions.checkNotNull(configuration);
+    Preconditions
+        .checkState(length >= 0, "Length must be set to a non-negative value");
+    BlockReader reader = tryToCreateExternalBlockReader();
+    if (reader != null) {
+      return reader;
+    }
+    final ShortCircuitConf scConf = conf.getShortCircuitConf();
+    if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
+      if (clientContext.getUseLegacyBlockReaderLocal()) {
+        reader = getLegacyBlockReaderLocal();
+        if (reader != null) {
+          LOG.trace("{}: returning new legacy block reader local.", this);
+          return reader;
+        }
+      } else {
+        reader = getBlockReaderLocal();
+        if (reader != null) {
+          LOG.trace("{}: returning new block reader local.", this);
+          return reader;
+        }
+      }
+    }
+    if (scConf.isDomainSocketDataTraffic()) {
+      reader = getRemoteBlockReaderFromDomain();
+      if (reader != null) {
+        LOG.trace("{}: returning new remote block reader using UNIX domain "
+            + "socket on {}", this, pathInfo.getPath());
+        return reader;
+      }
+    }
+    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
+        "TCP reads were disabled for testing, but we failed to " +
+        "do a non-TCP read.");
+    return getRemoteBlockReaderFromTcp();
+  }
+
+  private BlockReader tryToCreateExternalBlockReader() {
+    List<Class<? extends ReplicaAccessorBuilder>> clses =
+        conf.getReplicaAccessorBuilderClasses();
+    for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
+      try {
+        ByteArrayDataOutput bado = ByteStreams.newDataOutput();
+        token.write(bado);
+        byte tokenBytes[] = bado.toByteArray();
+
+        Constructor<? extends ReplicaAccessorBuilder> ctor =
+            cls.getConstructor();
+        ReplicaAccessorBuilder builder = ctor.newInstance();
+        long visibleLength = startOffset + length;
+        ReplicaAccessor accessor = builder.
+            setAllowShortCircuitReads(allowShortCircuitLocalReads).
+            setBlock(block.getBlockId(), block.getBlockPoolId()).
+            setGenerationStamp(block.getGenerationStamp()).
+            setBlockAccessToken(tokenBytes).
+            setClientName(clientName).
+            setConfiguration(configuration).
+            setFileName(fileName).
+            setVerifyChecksum(verifyChecksum).
+            setVisibleLength(visibleLength).
+            build();
+        if (accessor == null) {
+          LOG.trace("{}: No ReplicaAccessor created by {}",
+              this, cls.getName());
+        } else {
+          return new ExternalBlockReader(accessor, visibleLength, startOffset);
+        }
+      } catch (Throwable t) {
+        LOG.warn("Failed to construct new object of type " +
+            cls.getName(), t);
+      }
+    }
+    return null;
+  }
+
+
+  /**
+   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
+   * This block reader implements the path-based style of local reads
+   * first introduced in HDFS-2246.
+   */
+  private BlockReader getLegacyBlockReaderLocal() throws IOException {
+    LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this);
+    if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
+      LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address"
+          + "{} is not local", this, inetSocketAddress);
+      return null;
+    }
+    if (clientContext.getDisableLegacyBlockReaderLocal()) {
+      PerformanceAdvisory.LOG.debug("{}: can't construct " +
+          "BlockReaderLocalLegacy because " +
+          "disableLegacyBlockReaderLocal is set.", this);
+      return null;
+    }
+    IOException ioe;
+    try {
+      return BlockReaderLocalLegacy.newBlockReader(conf,
+          userGroupInformation, configuration, fileName, block, token,
+          datanode, startOffset, length, storageType, tracer);
+    } catch (RemoteException remoteException) {
+      ioe = remoteException.unwrapRemoteException(
+                InvalidToken.class, AccessControlException.class);
+    } catch (IOException e) {
+      ioe = e;
+    }
+    if ((!(ioe instanceof AccessControlException)) &&
+        isSecurityException(ioe)) {
+      // Handle security exceptions.
+      // We do not handle AccessControlException here, since
+      // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
+      // that the user is not in dfs.block.local-path-access.user, a condition
+      // which requires us to disable legacy SCR.
+      throw ioe;
+    }
+    LOG.warn(this + ": error creating legacy BlockReaderLocal.  " +
+        "Disabling legacy local reads.", ioe);
+    clientContext.setDisableLegacyBlockReaderLocal();
+    return null;
+  }
+
+  private BlockReader getBlockReaderLocal() throws InvalidToken {
+    LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
+        + " reads.", this);
+    if (pathInfo == null) {
+      pathInfo = clientContext.getDomainSocketFactory()
+          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
+    }
+    if (!pathInfo.getPathState().getUsableForShortCircuit()) {
+      PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
+              "giving up on BlockReaderLocal.", this, pathInfo);
+      return null;
+    }
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
+        block.getBlockPoolId());
+    ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
+    InvalidToken exc = info.getInvalidTokenException();
+    if (exc != null) {
+      LOG.trace("{}: got InvalidToken exception while trying to construct "
+          + "BlockReaderLocal via {}", this, pathInfo.getPath());
+      throw exc;
+    }
+    if (info.getReplica() == null) {
+      PerformanceAdvisory.LOG.debug("{}: failed to get " +
+          "ShortCircuitReplica. Cannot construct " +
+          "BlockReaderLocal via {}", this, pathInfo.getPath());
+      return null;
+    }
+    return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
+        setFilename(fileName).
+        setBlock(block).
+        setStartOffset(startOffset).
+        setShortCircuitReplica(info.getReplica()).
+        setVerifyChecksum(verifyChecksum).
+        setCachingStrategy(cachingStrategy).
+        setStorageType(storageType).
+        setTracer(tracer).
+        build();
+  }
+
+  /**
+   * Fetch a pair of short-circuit block descriptors from a local DataNode.
+   *
+   * @return    Null if we could not communicate with the datanode,
+   *            a new ShortCircuitReplicaInfo object otherwise.
+   *            ShortCircuitReplicaInfo objects may contain either an
+   *            InvalidToken exception, or a ShortCircuitReplica object ready to
+   *            use.
+   */
+  @Override
+  public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+    if (createShortCircuitReplicaInfoCallback != null) {
+      ShortCircuitReplicaInfo info =
+          createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
+      if (info != null) return info;
+    }
+    LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this);
+    BlockReaderPeer curPeer;
+    while (true) {
+      curPeer = nextDomainPeer();
+      if (curPeer == null) break;
+      if (curPeer.fromCache) remainingCacheTries--;
+      DomainPeer peer = (DomainPeer)curPeer.peer;
+      Slot slot = null;
+      ShortCircuitCache cache = clientContext.getShortCircuitCache();
+      try {
+        MutableBoolean usedPeer = new MutableBoolean(false);
+        slot = cache.allocShmSlot(datanode, peer, usedPeer,
+            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
+            clientName);
+        if (usedPeer.booleanValue()) {
+          LOG.trace("{}: allocShmSlot used up our previous socket {}.  "
+              + "Allocating a new one...", this, peer.getDomainSocket());
+          curPeer = nextDomainPeer();
+          if (curPeer == null) break;
+          peer = (DomainPeer)curPeer.peer;
+        }
+        ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
+        clientContext.getPeerCache().put(datanode, peer);
+        return info;
+      } catch (IOException e) {
+        if (slot != null) {
+          cache.freeSlot(slot);
+        }
+        if (curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached socket.
+          // These are considered less serious, because the socket may be stale.
+          LOG.debug("{}: closing stale domain peer {}", this, peer, e);
+          IOUtilsClient.cleanup(LOG, peer);
+        } else {
+          // Handle an I/O error we got when using a newly created socket.
+          // We temporarily disable the domain socket path for a few minutes in
+          // this case, to prevent wasting more time on it.
+          LOG.warn(this + ": I/O error requesting file descriptors.  " +
+              "Disabling domain socket " + peer.getDomainSocket(), e);
+          IOUtilsClient.cleanup(LOG, peer);
+          clientContext.getDomainSocketFactory()
+              .disableDomainSocketPath(pathInfo.getPath());
+          return null;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Request file descriptors from a DomainPeer.
+   *
+   * @param peer   The peer to use for communication.
+   * @param slot   If non-null, the shared memory slot to associate with the
+   *               new ShortCircuitReplica.
+   *
+   * @return  A ShortCircuitReplica object if we could communicate with the
+   *          datanode; null, otherwise.
+   * @throws  IOException If we encountered an I/O exception while communicating
+   *          with the datanode.
+   */
+  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
+          Slot slot) throws IOException {
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    final DataOutputStream out =
+        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+    SlotId slotId = slot == null ? null : slot.getSlotId();
+    new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
+        failureInjector.getSupportsReceiptVerification());
+    DataInputStream in = new DataInputStream(peer.getInputStream());
+    BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+        PBHelperClient.vintPrefixed(in));
+    DomainSocket sock = peer.getDomainSocket();
+    failureInjector.injectRequestFileDescriptorsFailure();
+    switch (resp.getStatus()) {
+    case SUCCESS:
+      byte buf[] = new byte[1];
+      FileInputStream[] fis = new FileInputStream[2];
+      sock.recvFileInputStreams(fis, buf, 0, buf.length);
+      ShortCircuitReplica replica = null;
+      try {
+        ExtendedBlockId key =
+            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+        if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
+          LOG.trace("Sending receipt verification byte for slot {}", slot);
+          sock.getOutputStream().write(0);
+        }
+        replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
+            Time.monotonicNow(), slot);
+        return new ShortCircuitReplicaInfo(replica);
+      } catch (IOException e) {
+        // This indicates an error reading from disk, or a format error.  Since
+        // it's not a socket communication problem, we return null rather than
+        // throwing an exception.
+        LOG.warn(this + ": error creating ShortCircuitReplica.", e);
+        return null;
+      } finally {
+        if (replica == null) {
+          IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]);
+        }
+      }
+    case ERROR_UNSUPPORTED:
+      if (!resp.hasShortCircuitAccessVersion()) {
+        LOG.warn("short-circuit read access is disabled for " +
+            "DataNode " + datanode + ".  reason: " + resp.getMessage());
+        clientContext.getDomainSocketFactory()
+            .disableShortCircuitForPath(pathInfo.getPath());
+      } else {
+        LOG.warn("short-circuit read access for the file " +
+            fileName + " is disabled for DataNode " + datanode +
+            ".  reason: " + resp.getMessage());
+      }
+      return null;
+    case ERROR_ACCESS_TOKEN:
+      String msg = "access control error while " +
+          "attempting to set up short-circuit access to " +
+          fileName + resp.getMessage();
+      LOG.debug("{}:{}", this, msg);
+      return new ShortCircuitReplicaInfo(new InvalidToken(msg));
+    default:
+      LOG.warn(this + ": unknown response code " + resp.getStatus() +
+          " while attempting to set up short-circuit access. " +
+          resp.getMessage());
+      clientContext.getDomainSocketFactory()
+          .disableShortCircuitForPath(pathInfo.getPath());
+      return null;
+    }
+  }
+
+  /**
+   * Get a BlockReaderRemote that communicates over a UNIX domain socket.
+   *
+   * @return The new BlockReader, or null if we failed to create the block
+   * reader.
+   *
+   * @throws InvalidToken    If the block token was invalid.
+   * Potentially other security-related execptions.
+   */
+  private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
+    if (pathInfo == null) {
+      pathInfo = clientContext.getDomainSocketFactory()
+          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
+    }
+    if (!pathInfo.getPathState().getUsableForDataTransfer()) {
+      PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
+          "remote block reader because the UNIX domain socket at {}" +
+           " is not usable.", this, pathInfo);
+      return null;
+    }
+    LOG.trace("{}: trying to create a remote block reader from the UNIX domain "
+        + "socket at {}", this, pathInfo.getPath());
+
+    while (true) {
+      BlockReaderPeer curPeer = nextDomainPeer();
+      if (curPeer == null) break;
+      if (curPeer.fromCache) remainingCacheTries--;
+      DomainPeer peer = (DomainPeer)curPeer.peer;
+      BlockReader blockReader = null;
+      try {
+        blockReader = getRemoteBlockReader(peer);
+        return blockReader;
+      } catch (IOException ioe) {
+        IOUtilsClient.cleanup(LOG, peer);
+        if (isSecurityException(ioe)) {
+          LOG.trace("{}: got security exception while constructing a remote "
+                  + " block reader from the unix domain socket at {}",
+              this, pathInfo.getPath(), ioe);
+          throw ioe;
+        }
+        if (curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached peer.  These are
+          // considered less serious because the underlying socket may be stale.
+          LOG.debug("Closed potentially stale domain peer {}", peer, ioe);
+        } else {
+          // Handle an I/O error we got when using a newly created domain peer.
+          // We temporarily disable the domain socket path for a few minutes in
+          // this case, to prevent wasting more time on it.
+          LOG.warn("I/O error constructing remote block reader.  Disabling " +
+              "domain socket " + peer.getDomainSocket(), ioe);
+          clientContext.getDomainSocketFactory()
+              .disableDomainSocketPath(pathInfo.getPath());
+          return null;
+        }
+      } finally {
+        if (blockReader == null) {
+          IOUtilsClient.cleanup(LOG, peer);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a BlockReaderRemote that communicates over a TCP socket.
+   *
+   * @return The new BlockReader.  We will not return null, but instead throw
+   *         an exception if this fails.
+   *
+   * @throws InvalidToken
+   *             If the block token was invalid.
+   *         InvalidEncryptionKeyException
+   *             If the encryption key was invalid.
+   *         Other IOException
+   *             If there was another problem.
+   */
+  private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
+    LOG.trace("{}: trying to create a remote block reader from a TCP socket",
+        this);
+    BlockReader blockReader = null;
+    while (true) {
+      BlockReaderPeer curPeer = null;
+      Peer peer = null;
+      try {
+        curPeer = nextTcpPeer();
+        if (curPeer.fromCache) remainingCacheTries--;
+        peer = curPeer.peer;
+        blockReader = getRemoteBlockReader(peer);
+        return blockReader;
+      } catch (IOException ioe) {
+        if (isSecurityException(ioe)) {
+          LOG.trace("{}: got security exception while constructing a remote "
+              + "block reader from {}", this, peer, ioe);
+          throw ioe;
+        }
+        if ((curPeer != null) && curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached peer.  These are
+          // considered less serious, because the underlying socket may be
+          // stale.
+          LOG.debug("Closed potentially stale remote peer {}", peer, ioe);
+        } else {
+          // Handle an I/O error we got when using a newly created peer.
+          LOG.warn("I/O error constructing remote block reader.", ioe);
+          throw ioe;
+        }
+      } finally {
+        if (blockReader == null) {
+          IOUtilsClient.cleanup(LOG, peer);
+        }
+      }
+    }
+  }
+
+  public static class BlockReaderPeer {
+    final Peer peer;
+    final boolean fromCache;
+
+    BlockReaderPeer(Peer peer, boolean fromCache) {
+      this.peer = peer;
+      this.fromCache = fromCache;
+    }
+  }
+
+  /**
+   * Get the next DomainPeer-- either from the cache or by creating it.
+   *
+   * @return the next DomainPeer, or null if we could not construct one.
+   */
+  private BlockReaderPeer nextDomainPeer() {
+    if (remainingCacheTries > 0) {
+      Peer peer = clientContext.getPeerCache().get(datanode, true);
+      if (peer != null) {
+        LOG.trace("nextDomainPeer: reusing existing peer {}", peer);
+        return new BlockReaderPeer(peer, true);
+      }
+    }
+    DomainSocket sock = clientContext.getDomainSocketFactory().
+        createSocket(pathInfo, conf.getSocketTimeout());
+    if (sock == null) return null;
+    return new BlockReaderPeer(new DomainPeer(sock), false);
+  }
+
+  /**
+   * Get the next TCP-based peer-- either from the cache or by creating it.
+   *
+   * @return the next Peer, or null if we could not construct one.
+   *
+   * @throws IOException  If there was an error while constructing the peer
+   *                      (such as an InvalidEncryptionKeyException)
+   */
+  private BlockReaderPeer nextTcpPeer() throws IOException {
+    if (remainingCacheTries > 0) {
+      Peer peer = clientContext.getPeerCache().get(datanode, false);
+      if (peer != null) {
+        LOG.trace("nextTcpPeer: reusing existing peer {}", peer);
+        return new BlockReaderPeer(peer, true);
+      }
+    }
+    try {
+      Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
+          datanode);
+      LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer);
+      return new BlockReaderPeer(peer, false);
+    } catch (IOException e) {
+      LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to"
+          + "{}", datanode);
+      throw e;
+    }
+  }
+
+  /**
+   * Determine if an exception is security-related.
+   *
+   * We need to handle these exceptions differently than other IOExceptions.
+   * They don't indicate a communication problem.  Instead, they mean that there
+   * is some action the client needs to take, such as refetching block tokens,
+   * renewing encryption keys, etc.
+   *
+   * @param ioe    The exception
+   * @return       True only if the exception is security-related.
+   */
+  private static boolean isSecurityException(IOException ioe) {
+    return (ioe instanceof InvalidToken) ||
+            (ioe instanceof InvalidEncryptionKeyException) ||
+            (ioe instanceof InvalidBlockTokenException) ||
+            (ioe instanceof AccessControlException);
+  }
+
+  @SuppressWarnings("deprecation")
+  private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
+    int networkDistance = clientContext.getNetworkDistance(datanode);
+    if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
+      return BlockReaderRemote.newBlockReader(fileName,
+          block, token, startOffset, length, conf.getIoBufferSize(),
+          verifyChecksum, clientName, peer, datanode,
+          clientContext.getPeerCache(), cachingStrategy, tracer,
+          networkDistance);
+    } else {
+      return BlockReaderRemote2.newBlockReader(
+          fileName, block, token, startOffset, length,
+          verifyChecksum, clientName, peer, datanode,
+          clientContext.getPeerCache(), cachingStrategy, tracer,
+          networkDistance);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
+  }
+
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
+  public static String getFileName(final InetSocketAddress s,
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
new file mode 100644
index 0000000..1b38996
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
@@ -0,0 +1,725 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DirectBufferPool;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlockReaderLocal enables local short circuited reads. If the DFS client is on
+ * the same machine as the datanode, then the client can read files directly
+ * from the local file system rather than going through the datanode for better
+ * performance. <br>
+ * {@link BlockReaderLocal} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the file descriptors for the metadata file and the data
+ * file for the block using
+ * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
+ * </li>
+ * <li>The client reads the file descriptors.</li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+class BlockReaderLocal implements BlockReader {
+  static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class);
+
+  private static final DirectBufferPool bufferPool = new DirectBufferPool();
+
+  public static class Builder {
+    private final int bufferSize;
+    private boolean verifyChecksum;
+    private int maxReadahead;
+    private String filename;
+    private ShortCircuitReplica replica;
+    private long dataPos;
+    private ExtendedBlock block;
+    private StorageType storageType;
+    private Tracer tracer;
+
+    public Builder(ShortCircuitConf conf) {
+      this.maxReadahead = Integer.MAX_VALUE;
+      this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
+      this.bufferSize = conf.getShortCircuitBufferSize();
+    }
+
+    public Builder setVerifyChecksum(boolean verifyChecksum) {
+      this.verifyChecksum = verifyChecksum;
+      return this;
+    }
+
+    public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
+      long readahead = cachingStrategy.getReadahead() != null ?
+          cachingStrategy.getReadahead() :
+              HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
+      this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
+      return this;
+    }
+
+    public Builder setFilename(String filename) {
+      this.filename = filename;
+      return this;
+    }
+
+    public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
+      this.replica = replica;
+      return this;
+    }
+
+    public Builder setStartOffset(long startOffset) {
+      this.dataPos = Math.max(0, startOffset);
+      return this;
+    }
+
+    public Builder setBlock(ExtendedBlock block) {
+      this.block = block;
+      return this;
+    }
+
+    public Builder setStorageType(StorageType storageType) {
+      this.storageType = storageType;
+      return this;
+    }
+
+    public Builder setTracer(Tracer tracer) {
+      this.tracer = tracer;
+      return this;
+    }
+
+    public BlockReaderLocal build() {
+      Preconditions.checkNotNull(replica);
+      return new BlockReaderLocal(this);
+    }
+  }
+
+  private boolean closed = false;
+
+  /**
+   * Pair of streams for this block.
+   */
+  private final ShortCircuitReplica replica;
+
+  /**
+   * The data FileChannel.
+   */
+  private final FileChannel dataIn;
+
+  /**
+   * The next place we'll read from in the block data FileChannel.
+   *
+   * If data is buffered in dataBuf, this offset will be larger than the
+   * offset of the next byte which a read() operation will give us.
+   */
+  private long dataPos;
+
+  /**
+   * The Checksum FileChannel.
+   */
+  private final FileChannel checksumIn;
+
+  /**
+   * Checksum type and size.
+   */
+  private final DataChecksum checksum;
+
+  /**
+   * If false, we will always skip the checksum.
+   */
+  private final boolean verifyChecksum;
+
+  /**
+   * Name of the block, for logging purposes.
+   */
+  private final String filename;
+
+  /**
+   * Block ID and Block Pool ID.
+   */
+  private final ExtendedBlock block;
+
+  /**
+   * Cache of Checksum#bytesPerChecksum.
+   */
+  private final int bytesPerChecksum;
+
+  /**
+   * Cache of Checksum#checksumSize.
+   */
+  private final int checksumSize;
+
+  /**
+   * Maximum number of chunks to allocate.
+   *
+   * This is used to allocate dataBuf and checksumBuf, in the event that
+   * we need them.
+   */
+  private final int maxAllocatedChunks;
+
+  /**
+   * True if zero readahead was requested.
+   */
+  private final boolean zeroReadaheadRequested;
+
+  /**
+   * Maximum amount of readahead we'll do.  This will always be at least the,
+   * size of a single chunk, even if {@link #zeroReadaheadRequested} is true.
+   * The reason is because we need to do a certain amount of buffering in order
+   * to do checksumming.
+   *
+   * This determines how many bytes we'll use out of dataBuf and checksumBuf.
+   * Why do we allocate buffers, and then (potentially) only use part of them?
+   * The rationale is that allocating a lot of buffers of different sizes would
+   * make it very difficult for the DirectBufferPool to re-use buffers.
+   */
+  private final int maxReadaheadLength;
+
+  /**
+   * Buffers data starting at the current dataPos and extending on
+   * for dataBuf.limit().
+   *
+   * This may be null if we don't need it.
+   */
+  private ByteBuffer dataBuf;
+
+  /**
+   * Buffers checksums starting at the current checksumPos and extending on
+   * for checksumBuf.limit().
+   *
+   * This may be null if we don't need it.
+   */
+  private ByteBuffer checksumBuf;
+
+  /**
+   * StorageType of replica on DataNode.
+   */
+  private StorageType storageType;
+
+  /**
+   * The Tracer to use.
+   */
+  private final Tracer tracer;
+
+  private BlockReaderLocal(Builder builder) {
+    this.replica = builder.replica;
+    this.dataIn = replica.getDataStream().getChannel();
+    this.dataPos = builder.dataPos;
+    this.checksumIn = replica.getMetaStream().getChannel();
+    BlockMetadataHeader header = builder.replica.getMetaHeader();
+    this.checksum = header.getChecksum();
+    this.verifyChecksum = builder.verifyChecksum &&
+        (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
+    this.filename = builder.filename;
+    this.block = builder.block;
+    this.bytesPerChecksum = checksum.getBytesPerChecksum();
+    this.checksumSize = checksum.getChecksumSize();
+
+    this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
+        ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
+    // Calculate the effective maximum readahead.
+    // We can't do more readahead than there is space in the buffer.
+    int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
+        ((Math.min(builder.bufferSize, builder.maxReadahead) +
+            bytesPerChecksum - 1) / bytesPerChecksum);
+    if (maxReadaheadChunks == 0) {
+      this.zeroReadaheadRequested = true;
+      maxReadaheadChunks = 1;
+    } else {
+      this.zeroReadaheadRequested = false;
+    }
+    this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
+    this.storageType = builder.storageType;
+    this.tracer = builder.tracer;
+  }
+
+  private synchronized void createDataBufIfNeeded() {
+    if (dataBuf == null) {
+      dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
+      dataBuf.position(0);
+      dataBuf.limit(0);
+    }
+  }
+
+  private synchronized void freeDataBufIfExists() {
+    if (dataBuf != null) {
+      // When disposing of a dataBuf, we have to move our stored file index
+      // backwards.
+      dataPos -= dataBuf.remaining();
+      dataBuf.clear();
+      bufferPool.returnBuffer(dataBuf);
+      dataBuf = null;
+    }
+  }
+
+  private synchronized void createChecksumBufIfNeeded() {
+    if (checksumBuf == null) {
+      checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
+      checksumBuf.position(0);
+      checksumBuf.limit(0);
+    }
+  }
+
+  private synchronized void freeChecksumBufIfExists() {
+    if (checksumBuf != null) {
+      checksumBuf.clear();
+      bufferPool.returnBuffer(checksumBuf);
+      checksumBuf = null;
+    }
+  }
+
+  private synchronized int drainDataBuf(ByteBuffer buf) {
+    if (dataBuf == null) return -1;
+    int oldLimit = dataBuf.limit();
+    int nRead = Math.min(dataBuf.remaining(), buf.remaining());
+    if (nRead == 0) {
+      return (dataBuf.remaining() == 0) ? -1 : 0;
+    }
+    try {
+      dataBuf.limit(dataBuf.position() + nRead);
+      buf.put(dataBuf);
+    } finally {
+      dataBuf.limit(oldLimit);
+    }
+    return nRead;
+  }
+
+  /**
+   * Read from the block file into a buffer.
+   *
+   * This function overwrites checksumBuf.  It will increment dataPos.
+   *
+   * @param buf   The buffer to read into.  May be dataBuf.
+   *              The position and limit of this buffer should be set to
+   *              multiples of the checksum size.
+   * @param canSkipChecksum  True if we can skip checksumming.
+   *
+   * @return      Total bytes read.  0 on EOF.
+   */
+  private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
+      throws IOException {
+    try (TraceScope ignored = tracer.newScope(
+        "BlockReaderLocal#fillBuffer(" + block.getBlockId() + ")")) {
+      int total = 0;
+      long startDataPos = dataPos;
+      int startBufPos = buf.position();
+      while (buf.hasRemaining()) {
+        int nRead = dataIn.read(buf, dataPos);
+        if (nRead < 0) {
+          break;
+        }
+        dataPos += nRead;
+        total += nRead;
+      }
+      if (canSkipChecksum) {
+        freeChecksumBufIfExists();
+        return total;
+      }
+      if (total > 0) {
+        try {
+          buf.limit(buf.position());
+          buf.position(startBufPos);
+          createChecksumBufIfNeeded();
+          int checksumsNeeded = (total + bytesPerChecksum - 1) /
+              bytesPerChecksum;
+          checksumBuf.clear();
+          checksumBuf.limit(checksumsNeeded * checksumSize);
+          long checksumPos = BlockMetadataHeader.getHeaderSize()
+              + ((startDataPos / bytesPerChecksum) * checksumSize);
+          while (checksumBuf.hasRemaining()) {
+            int nRead = checksumIn.read(checksumBuf, checksumPos);
+            if (nRead < 0) {
+              throw new IOException("Got unexpected checksum file EOF at " +
+                  checksumPos + ", block file position " + startDataPos +
+                  " for block " + block + " of file " + filename);
+            }
+            checksumPos += nRead;
+          }
+          checksumBuf.flip();
+
+          checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
+        } finally {
+          buf.position(buf.limit());
+        }
+      }
+      return total;
+    }
+  }
+
+  private boolean createNoChecksumContext() {
+    return !verifyChecksum ||
+        // Checksums are not stored for replicas on transient storage.  We do
+        // not anchor, because we do not intend for client activity to block
+        // eviction from transient storage on the DataNode side.
+        (storageType != null && storageType.isTransient()) ||
+        replica.addNoChecksumAnchor();
+  }
+
+  private void releaseNoChecksumContext() {
+    if (verifyChecksum) {
+      if (storageType == null || !storageType.isTransient()) {
+        replica.removeNoChecksumAnchor();
+      }
+    }
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    boolean canSkipChecksum = createNoChecksumContext();
+    try {
+      String traceFormatStr = "read(buf.remaining={}, block={}, filename={}, "
+          + "canSkipChecksum={})";
+      LOG.trace(traceFormatStr + ": starting",
+          buf.remaining(), block, filename, canSkipChecksum);
+      int nRead;
+      try {
+        if (canSkipChecksum && zeroReadaheadRequested) {
+          nRead = readWithoutBounceBuffer(buf);
+        } else {
+          nRead = readWithBounceBuffer(buf, canSkipChecksum);
+        }
+      } catch (IOException e) {
+        LOG.trace(traceFormatStr + ": I/O error",
+            buf.remaining(), block, filename, canSkipChecksum, e);
+        throw e;
+      }
+      LOG.trace(traceFormatStr + ": returning {}",
+          buf.remaining(), block, filename, canSkipChecksum, nRead);
+      return nRead;
+    } finally {
+      if (canSkipChecksum) releaseNoChecksumContext();
+    }
+  }
+
+  private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
+      throws IOException {
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
+    int total = 0;
+    while (buf.hasRemaining()) {
+      int nRead = dataIn.read(buf, dataPos);
+      if (nRead <= 0) break;
+      dataPos += nRead;
+      total += nRead;
+    }
+    return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
+  }
+
+  /**
+   * Fill the data buffer.  If necessary, validate the data against the
+   * checksums.
+   *
+   * We always want the offsets of the data contained in dataBuf to be
+   * aligned to the chunk boundary.  If we are validating checksums, we
+   * accomplish this by seeking backwards in the file until we're on a
+   * chunk boundary.  (This is necessary because we can't checksum a
+   * partial chunk.)  If we are not validating checksums, we simply only
+   * fill the latter part of dataBuf.
+   *
+   * @param canSkipChecksum  true if we can skip checksumming.
+   * @return                 true if we hit EOF.
+   * @throws IOException
+   */
+  private synchronized boolean fillDataBuf(boolean canSkipChecksum)
+      throws IOException {
+    createDataBufIfNeeded();
+    final int slop = (int)(dataPos % bytesPerChecksum);
+    final long oldDataPos = dataPos;
+    dataBuf.limit(maxReadaheadLength);
+    if (canSkipChecksum) {
+      dataBuf.position(slop);
+      fillBuffer(dataBuf, true);
+    } else {
+      dataPos -= slop;
+      dataBuf.position(0);
+      fillBuffer(dataBuf, false);
+    }
+    dataBuf.limit(dataBuf.position());
+    dataBuf.position(Math.min(dataBuf.position(), slop));
+    LOG.trace("loaded {} bytes into bounce buffer from offset {} of {}",
+        dataBuf.remaining(), oldDataPos, block);
+    return dataBuf.limit() != maxReadaheadLength;
+  }
+
+  /**
+   * Read using the bounce buffer.
+   *
+   * A 'direct' read actually has three phases. The first drains any
+   * remaining bytes from the slow read buffer. After this the read is
+   * guaranteed to be on a checksum chunk boundary. If there are still bytes
+   * to read, the fast direct path is used for as many remaining bytes as
+   * possible, up to a multiple of the checksum chunk size. Finally, any
+   * 'odd' bytes remaining at the end of the read cause another slow read to
+   * be issued, which involves an extra copy.
+   *
+   * Every 'slow' read tries to fill the slow read buffer in one go for
+   * efficiency's sake. As described above, all non-checksum-chunk-aligned
+   * reads will be served from the slower read path.
+   *
+   * @param buf              The buffer to read into.
+   * @param canSkipChecksum  True if we can skip checksums.
+   */
+  private synchronized int readWithBounceBuffer(ByteBuffer buf,
+        boolean canSkipChecksum) throws IOException {
+    int total = 0;
+    int bb = drainDataBuf(buf); // drain bounce buffer if possible
+    if (bb >= 0) {
+      total += bb;
+      if (buf.remaining() == 0) return total;
+    }
+    boolean eof = true, done = false;
+    do {
+      if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
+            && ((dataPos % bytesPerChecksum) == 0)) {
+        // Fast lane: try to read directly into user-supplied buffer, bypassing
+        // bounce buffer.
+        int oldLimit = buf.limit();
+        int nRead;
+        try {
+          buf.limit(buf.position() + maxReadaheadLength);
+          nRead = fillBuffer(buf, canSkipChecksum);
+        } finally {
+          buf.limit(oldLimit);
+        }
+        if (nRead < maxReadaheadLength) {
+          done = true;
+        }
+        if (nRead > 0) {
+          eof = false;
+        }
+        total += nRead;
+      } else {
+        // Slow lane: refill bounce buffer.
+        if (fillDataBuf(canSkipChecksum)) {
+          done = true;
+        }
+        bb = drainDataBuf(buf); // drain bounce buffer if possible
+        if (bb >= 0) {
+          eof = false;
+          total += bb;
+        }
+      }
+    } while ((!done) && (buf.remaining() > 0));
+    return (eof && total == 0) ? -1 : total;
+  }
+
+  @Override
+  public synchronized int read(byte[] arr, int off, int len)
+        throws IOException {
+    boolean canSkipChecksum = createNoChecksumContext();
+    int nRead;
+    try {
+      final String traceFormatStr = "read(arr.length={}, off={}, len={}, "
+          + "filename={}, block={}, canSkipChecksum={})";
+      LOG.trace(traceFormatStr + ": starting",
+          arr.length, off, len, filename, block, canSkipChecksum);
+      try {
+        if (canSkipChecksum && zeroReadaheadRequested) {
+          nRead = readWithoutBounceBuffer(arr, off, len);
+        } else {
+          nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+        }
+      } catch (IOException e) {
+        LOG.trace(traceFormatStr + ": I/O error",
+            arr.length, off, len, filename, block, canSkipChecksum, e);
+        throw e;
+      }
+      LOG.trace(traceFormatStr + ": returning {}",
+          arr.length, off, len, filename, block, canSkipChecksum, nRead);
+    } finally {
+      if (canSkipChecksum) releaseNoChecksumContext();
+    }
+    return nRead;
+  }
+
+  private synchronized int readWithoutBounceBuffer(byte arr[], int off,
+        int len) throws IOException {
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
+    int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
+    if (nRead > 0) {
+      dataPos += nRead;
+    } else if ((nRead == 0) && (dataPos == dataIn.size())) {
+      return -1;
+    }
+    return nRead;
+  }
+
+  private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
+        boolean canSkipChecksum) throws IOException {
+    createDataBufIfNeeded();
+    if (!dataBuf.hasRemaining()) {
+      dataBuf.position(0);
+      dataBuf.limit(maxReadaheadLength);
+      fillDataBuf(canSkipChecksum);
+    }
+    if (dataBuf.remaining() == 0) return -1;
+    int toRead = Math.min(dataBuf.remaining(), len);
+    dataBuf.get(arr, off, toRead);
+    return toRead;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    int discardedFromBuf = 0;
+    long remaining = n;
+    if ((dataBuf != null) && dataBuf.hasRemaining()) {
+      discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
+      dataBuf.position(dataBuf.position() + discardedFromBuf);
+      remaining -= discardedFromBuf;
+    }
+    LOG.trace("skip(n={}, block={}, filename={}): discarded {} bytes from "
+            + "dataBuf and advanced dataPos by {}",
+        n, block, filename, discardedFromBuf, remaining);
+    dataPos += remaining;
+    return n;
+  }
+
+  @Override
+  public int available() {
+    // We never do network I/O in BlockReaderLocal.
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) return;
+    closed = true;
+    LOG.trace("close(filename={}, block={})", filename, block);
+    replica.unref();
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
+  }
+
+  @Override
+  public synchronized void readFully(byte[] arr, int off, int len)
+      throws IOException {
+    BlockReaderUtil.readFully(this, arr, off, len);
+  }
+
+  @Override
+  public synchronized int readAll(byte[] buf, int off, int len)
+      throws IOException {
+    return BlockReaderUtil.readAll(this, buf, off, len);
+  }
+
+  @Override
+  public boolean isShortCircuit() {
+    return true;
+  }
+
+  /**
+   * Get or create a memory map for this replica.
+   *
+   * There are two kinds of ClientMmap objects we could fetch here: one that
+   * will always read pre-checksummed data, and one that may read data that
+   * hasn't been checksummed.
+   *
+   * If we fetch the former, "safe" kind of ClientMmap, we have to increment
+   * the anchor count on the shared memory slot.  This will tell the DataNode
+   * not to munlock the block until this ClientMmap is closed.
+   * If we fetch the latter, we don't bother with anchoring.
+   *
+   * @param opts     The options to use, such as SKIP_CHECKSUMS.
+   *
+   * @return         null on failure; the ClientMmap otherwise.
+   */
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    boolean anchor = verifyChecksum &&
+        !opts.contains(ReadOption.SKIP_CHECKSUMS);
+    if (anchor) {
+      if (!createNoChecksumContext()) {
+        LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not "
+            + "given, we aren't skipping checksums, and the block is not "
+            + "mlocked.", block, filename);
+        return null;
+      }
+    }
+    ClientMmap clientMmap = null;
+    try {
+      clientMmap = replica.getOrCreateClientMmap(anchor);
+    } finally {
+      if ((clientMmap == null) && anchor) {
+        releaseNoChecksumContext();
+      }
+    }
+    return clientMmap;
+  }
+
+  @VisibleForTesting
+  boolean getVerifyChecksum() {
+    return this.verifyChecksum;
+  }
+
+  @VisibleForTesting
+  int getMaxReadaheadLength() {
+    return this.maxReadaheadLength;
+  }
+
+  /**
+   * Make the replica anchorable.  Normally this can only be done by the
+   * DataNode.  This method is only for testing.
+   */
+  @VisibleForTesting
+  void forceAnchorable() {
+    replica.getSlot().makeAnchorable();
+  }
+
+  /**
+   * Make the replica unanchorable.  Normally this can only be done by the
+   * DataNode.  This method is only for testing.
+   */
+  @VisibleForTesting
+  void forceUnanchorable() {
+    replica.getSlot().makeUnanchorable();
+  }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return checksum;
+  }
+
+  @Override
+  public int getNetworkDistance() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java
new file mode 100644
index 0000000..7d20a83
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocalLegacy.java
@@ -0,0 +1,745 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DirectBufferPool;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client
+ * is on the same machine as the datanode, then the client can read files
+ * directly from the local file system rather than going through the datanode
+ * for better performance. <br>
+ *
+ * This is the legacy implementation based on HDFS-2246, which requires
+ * permissions on the datanode to be set so that clients can directly access the
+ * blocks. The new implementation based on HDFS-347 should be preferred on UNIX
+ * systems where the required native code has been implemented.<br>
+ *
+ * {@link BlockReaderLocalLegacy} works as follows:
+ * <ul>
+ * <li>The client performing short circuit reads must be configured at the
+ * datanode.</li>
+ * <li>The client gets the path to the file where block is stored using
+ * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
+ * RPC call</li>
+ * <li>Client uses kerberos authentication to connect to the datanode over RPC,
+ * if security is enabled.</li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+class BlockReaderLocalLegacy implements BlockReader {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      BlockReaderLocalLegacy.class);
+
+  //Stores the cache and proxy for a local datanode.
+  private static class LocalDatanodeInfo {
+    private ClientDatanodeProtocol proxy = null;
+    private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
+
+    LocalDatanodeInfo() {
+      final int cacheSize = 10000;
+      final float hashTableLoadFactor = 0.75f;
+      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor)
+          + 1;
+      cache = Collections
+          .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
+              hashTableCapacity, hashTableLoadFactor, true) {
+            private static final long serialVersionUID = 1;
+
+            @Override
+            protected boolean removeEldestEntry(
+                Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
+              return size() > cacheSize;
+            }
+          });
+    }
+
+    private synchronized ClientDatanodeProtocol getDatanodeProxy(
+        UserGroupInformation ugi, final DatanodeInfo node,
+        final Configuration conf, final int socketTimeout,
+        final boolean connectToDnViaHostname) throws IOException {
+      if (proxy == null) {
+        try {
+          proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
+            @Override
+            public ClientDatanodeProtocol run() throws Exception {
+              return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf,
+                  socketTimeout, connectToDnViaHostname);
+            }
+          });
+        } catch (InterruptedException e) {
+          LOG.warn("encountered exception ", e);
+        }
+      }
+      return proxy;
+    }
+
+    private synchronized void resetDatanodeProxy() {
+      if (null != proxy) {
+        RPC.stopProxy(proxy);
+        proxy = null;
+      }
+    }
+
+    private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
+      return cache.get(b);
+    }
+
+    private void setBlockLocalPathInfo(ExtendedBlock b,
+        BlockLocalPathInfo info) {
+      cache.put(b, info);
+    }
+
+    private void removeBlockLocalPathInfo(ExtendedBlock b) {
+      cache.remove(b);
+    }
+  }
+
+  // Multiple datanodes could be running on the local machine. Store proxies in
+  // a map keyed by the ipc port of the datanode.
+  private static final Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap =
+      new HashMap<>();
+
+  private final FileInputStream dataIn; // reader for the data file
+  private final FileInputStream checksumIn;   // reader for the checksum file
+
+  /**
+   * Offset from the most recent chunk boundary at which the next read should
+   * take place. Is only set to non-zero at construction time, and is
+   * decremented (usually to 0) by subsequent reads. This avoids having to do a
+   * checksum read at construction to position the read cursor correctly.
+   */
+  private int offsetFromChunkBoundary;
+
+  private byte[] skipBuf = null;
+
+  /**
+   * Used for checksummed reads that need to be staged before copying to their
+   * output buffer because they are either a) smaller than the checksum chunk
+   * size or b) issued by the slower read(byte[]...) path
+   */
+  private ByteBuffer slowReadBuff = null;
+  private ByteBuffer checksumBuff = null;
+  private DataChecksum checksum;
+  private final boolean verifyChecksum;
+
+  private static final DirectBufferPool bufferPool = new DirectBufferPool();
+
+  private final int bytesPerChecksum;
+  private final int checksumSize;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+  private final String filename;
+  private long blockId;
+  private final Tracer tracer;
+
+  /**
+   * The only way this object can be instantiated.
+   */
+  static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
+      UserGroupInformation userGroupInformation,
+      Configuration configuration, String file, ExtendedBlock blk,
+      Token<BlockTokenIdentifier> token, DatanodeInfo node,
+      long startOffset, long length, StorageType storageType,
+      Tracer tracer) throws IOException {
+    final ShortCircuitConf scConf = conf.getShortCircuitConf();
+    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
+        .getIpcPort());
+    // check the cache first
+    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
+    if (pathinfo == null) {
+      if (userGroupInformation == null) {
+        userGroupInformation = UserGroupInformation.getCurrentUser();
+      }
+      pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
+          configuration, conf.getSocketTimeout(), token,
+          conf.isConnectToDnViaHostname(), storageType);
+    }
+
+    // check to see if the file exists. It may so happen that the
+    // HDFS file has been deleted and this block-lookup is occurring
+    // on behalf of a new HDFS file. This time, the block file could
+    // be residing in a different portion of the fs.data.dir directory.
+    // In this case, we remove this entry from the cache. The next
+    // call to this method will re-populate the cache.
+    FileInputStream dataIn = null;
+    FileInputStream checksumIn = null;
+    BlockReaderLocalLegacy localBlockReader = null;
+    final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums()
+        || storageType.isTransient();
+    try {
+      // get a local file system
+      File blkfile = new File(pathinfo.getBlockPath());
+      dataIn = new FileInputStream(blkfile);
+
+      LOG.debug("New BlockReaderLocalLegacy for file {} of size {} startOffset "
+              + "{} length {} short circuit checksum {}",
+          blkfile, blkfile.length(), startOffset, length, !skipChecksumCheck);
+
+      if (!skipChecksumCheck) {
+        // get the metadata file
+        File metafile = new File(pathinfo.getMetaPath());
+        checksumIn = new FileInputStream(metafile);
+
+        final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+            new DataInputStream(checksumIn), blk);
+        long firstChunkOffset = startOffset
+            - (startOffset % checksum.getBytesPerChecksum());
+        localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk,
+            startOffset, checksum, true, dataIn, firstChunkOffset, checksumIn,
+            tracer);
+      } else {
+        localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk,
+            startOffset, dataIn, tracer);
+      }
+    } catch (IOException e) {
+      // remove from cache
+      localDatanodeInfo.removeBlockLocalPathInfo(blk);
+      LOG.warn("BlockReaderLocalLegacy: Removing " + blk
+          + " from cache because local file " + pathinfo.getBlockPath()
+          + " could not be opened.");
+      throw e;
+    } finally {
+      if (localBlockReader == null) {
+        if (dataIn != null) {
+          dataIn.close();
+        }
+        if (checksumIn != null) {
+          checksumIn.close();
+        }
+      }
+    }
+    return localBlockReader;
+  }
+
+  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
+    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
+    if (ldInfo == null) {
+      ldInfo = new LocalDatanodeInfo();
+      localDatanodeInfoMap.put(port, ldInfo);
+    }
+    return ldInfo;
+  }
+
+  private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
+      ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
+      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
+      StorageType storageType) throws IOException {
+    LocalDatanodeInfo localDatanodeInfo =
+        getLocalDatanodeInfo(node.getIpcPort());
+    BlockLocalPathInfo pathinfo;
+    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
+        conf, timeout, connectToDnViaHostname);
+    try {
+      // make RPC to local datanode to find local pathnames of blocks
+      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
+      // We can't cache the path information for a replica on transient storage.
+      // If the replica gets evicted, then it moves to a different path.  Then,
+      // our next attempt to read from the cached path would fail to find the
+      // file.  Additionally, the failure would cause us to disable legacy
+      // short-circuit read for all subsequent use in the ClientContext.  Unlike
+      // the newer short-circuit read implementation, we have no communication
+      // channel for the DataNode to notify the client that the path has been
+      // invalidated.  Therefore, our only option is to skip caching.
+      if (pathinfo != null && !storageType.isTransient()) {
+        LOG.debug("Cached location of block {} as {}", blk, pathinfo);
+        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
+      }
+    } catch (IOException e) {
+      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
+      throw e;
+    }
+    return pathinfo;
+  }
+
+  private static int getSlowReadBufferNumChunks(int bufferSizeBytes,
+      int bytesPerChecksum) {
+    if (bufferSizeBytes < bytesPerChecksum) {
+      throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
+          "buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
+          "a single chunk (" + bytesPerChecksum +  "). Please configure " +
+          HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY +
+          " appropriately");
+    }
+
+    // Round down to nearest chunk size
+    return bufferSizeBytes / bytesPerChecksum;
+  }
+
+  private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
+      ExtendedBlock block, long startOffset, FileInputStream dataIn,
+      Tracer tracer) throws IOException {
+    this(conf, hdfsfile, block, startOffset,
+        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
+        dataIn, startOffset, null, tracer);
+  }
+
+  private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
+      ExtendedBlock block, long startOffset, DataChecksum checksum,
+      boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
+      FileInputStream checksumIn, Tracer tracer) throws IOException {
+    this.filename = hdfsfile;
+    this.checksum = checksum;
+    this.verifyChecksum = verifyChecksum;
+    this.startOffset = Math.max(startOffset, 0);
+    this.blockId = block.getBlockId();
+
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+
+    this.dataIn = dataIn;
+    this.checksumIn = checksumIn;
+    this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
+
+    final int chunksPerChecksumRead = getSlowReadBufferNumChunks(
+        conf.getShortCircuitBufferSize(), bytesPerChecksum);
+    slowReadBuff = bufferPool.getBuffer(
+        bytesPerChecksum * chunksPerChecksumRead);
+    checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+    // Initially the buffers have nothing to read.
+    slowReadBuff.flip();
+    checksumBuff.flip();
+    boolean success = false;
+    try {
+      // Skip both input streams to beginning of the chunk containing
+      // startOffset
+      IOUtils.skipFully(dataIn, firstChunkOffset);
+      if (checksumIn != null) {
+        long checkSumOffset = (firstChunkOffset / bytesPerChecksum) *
+            checksumSize;
+        IOUtils.skipFully(checksumIn, checkSumOffset);
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        bufferPool.returnBuffer(slowReadBuff);
+        bufferPool.returnBuffer(checksumBuff);
+      }
+    }
+    this.tracer = tracer;
+  }
+
+  /**
+   * Reads bytes into a buffer until EOF or the buffer's limit is reached
+   */
+  private int fillBuffer(FileInputStream stream, ByteBuffer buf)
+      throws IOException {
+    try (TraceScope ignored = tracer.
+        newScope("BlockReaderLocalLegacy#fillBuffer(" + blockId + ")")) {
+      int bytesRead = stream.getChannel().read(buf);
+      if (bytesRead < 0) {
+        //EOF
+        return bytesRead;
+      }
+      while (buf.remaining() > 0) {
+        int n = stream.getChannel().read(buf);
+        if (n < 0) {
+          //EOF
+          return bytesRead;
+        }
+        bytesRead += n;
+      }
+      return bytesRead;
+    }
+  }
+
+  /**
+   * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
+   * another.
+   */
+  private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
+    int oldLimit = from.limit();
+    from.limit(from.position() + length);
+    try {
+      to.put(from);
+    } finally {
+      from.limit(oldLimit);
+    }
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    int nRead = 0;
+    if (verifyChecksum) {
+      // A 'direct' read actually has three phases. The first drains any
+      // remaining bytes from the slow read buffer. After this the read is
+      // guaranteed to be on a checksum chunk boundary. If there are still bytes
+      // to read, the fast direct path is used for as many remaining bytes as
+      // possible, up to a multiple of the checksum chunk size. Finally, any
+      // 'odd' bytes remaining at the end of the read cause another slow read to
+      // be issued, which involves an extra copy.
+
+      // Every 'slow' read tries to fill the slow read buffer in one go for
+      // efficiency's sake. As described above, all non-checksum-chunk-aligned
+      // reads will be served from the slower read path.
+
+      if (slowReadBuff.hasRemaining()) {
+        // There are remaining bytes from a small read available. This usually
+        // means this read is unaligned, which falls back to the slow path.
+        int fromSlowReadBuff = Math.min(buf.remaining(),
+            slowReadBuff.remaining());
+        writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+        nRead += fromSlowReadBuff;
+      }
+
+      if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
+        // Since we have drained the 'small read' buffer, we are guaranteed to
+        // be chunk-aligned
+        int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
+
+        // There's only enough checksum buffer space available to checksum one
+        // entire slow read buffer. This saves keeping the number of checksum
+        // chunks around.
+        len = Math.min(len, slowReadBuff.capacity());
+        int oldlimit = buf.limit();
+        buf.limit(buf.position() + len);
+        int readResult = 0;
+        try {
+          readResult = doByteBufferRead(buf);
+        } finally {
+          buf.limit(oldlimit);
+        }
+        if (readResult == -1) {
+          return nRead;
+        } else {
+          nRead += readResult;
+          buf.position(buf.position() + readResult);
+        }
+      }
+
+      // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
+      // until chunk boundary
+      if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) ||
+          offsetFromChunkBoundary > 0) {
+        int toRead = Math.min(buf.remaining(),
+            bytesPerChecksum - offsetFromChunkBoundary);
+        int readResult = fillSlowReadBuffer(toRead);
+        if (readResult == -1) {
+          return nRead;
+        } else {
+          int fromSlowReadBuff = Math.min(readResult, buf.remaining());
+          writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+          nRead += fromSlowReadBuff;
+        }
+      }
+    } else {
+      // Non-checksummed reads are much easier; we can just fill the buffer
+      // directly.
+      nRead = doByteBufferRead(buf);
+      if (nRead > 0) {
+        buf.position(buf.position() + nRead);
+      }
+    }
+    return nRead;
+  }
+
+  /**
+   * Tries to read as many bytes as possible into supplied buffer, checksumming
+   * each chunk if needed.
+   *
+   * <b>Preconditions:</b>
+   * <ul>
+   * <li>
+   * If checksumming is enabled, buf.remaining must be a multiple of
+   * bytesPerChecksum. Note that this is not a requirement for clients of
+   * read(ByteBuffer) - in the case of non-checksum-sized read requests,
+   * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
+   * method.
+   * </li>
+   * </ul>
+   * <b>Postconditions:</b>
+   * <ul>
+   * <li>buf.limit and buf.mark are unchanged.</li>
+   * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
+   * requested bytes can be read straight from the buffer</li>
+   * </ul>
+   *
+   * @param buf
+   *          byte buffer to write bytes to. If checksums are not required, buf
+   *          can have any number of bytes remaining, otherwise there must be a
+   *          multiple of the checksum chunk size remaining.
+   * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
+   *         that is, the the number of useful bytes (up to the amount
+   *         requested) readable from the buffer by the client.
+   */
+  private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
+    if (verifyChecksum) {
+      assert buf.remaining() % bytesPerChecksum == 0;
+    }
+    int dataRead;
+
+    int oldpos = buf.position();
+    // Read as much as we can into the buffer.
+    dataRead = fillBuffer(dataIn, buf);
+
+    if (dataRead == -1) {
+      return -1;
+    }
+
+    if (verifyChecksum) {
+      ByteBuffer toChecksum = buf.duplicate();
+      toChecksum.position(oldpos);
+      toChecksum.limit(oldpos + dataRead);
+
+      checksumBuff.clear();
+      // Equivalent to
+      // (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
+      int numChunks =
+          (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
+      checksumBuff.limit(checksumSize * numChunks);
+
+      fillBuffer(checksumIn, checksumBuff);
+      checksumBuff.flip();
+
+      checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
+          this.startOffset);
+    }
+
+    if (dataRead >= 0) {
+      buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
+    }
+
+    if (dataRead < offsetFromChunkBoundary) {
+      // yikes, didn't even get enough bytes to honour offset. This can happen
+      // even if we are verifying checksums if we are at EOF.
+      offsetFromChunkBoundary -= dataRead;
+      dataRead = 0;
+    } else {
+      dataRead -= offsetFromChunkBoundary;
+      offsetFromChunkBoundary = 0;
+    }
+
+    return dataRead;
+  }
+
+  /**
+   * Ensures that up to len bytes are available and checksummed in the slow read
+   * buffer. The number of bytes available to read is returned. If the buffer is
+   * not already empty, the number of remaining bytes is returned and no actual
+   * read happens.
+   *
+   * @param len
+   *          the maximum number of bytes to make available. After len bytes
+   *          are read, the underlying bytestream <b>must</b> be at a checksum
+   *          boundary, or EOF. That is, (len + currentPosition) %
+   *          bytesPerChecksum == 0.
+   * @return the number of bytes available to read, or -1 if EOF.
+   */
+  private synchronized int fillSlowReadBuffer(int len) throws IOException {
+    int nRead;
+    if (slowReadBuff.hasRemaining()) {
+      // Already got data, good to go.
+      nRead = Math.min(len, slowReadBuff.remaining());
+    } else {
+      // Round a complete read of len bytes (plus any implicit offset) to the
+      // next chunk boundary, since we try and read in multiples of a chunk
+      int nextChunk = len + offsetFromChunkBoundary +
+          (bytesPerChecksum -
+              ((len + offsetFromChunkBoundary) % bytesPerChecksum));
+      int limit = Math.min(nextChunk, slowReadBuff.capacity());
+      assert limit % bytesPerChecksum == 0;
+
+      slowReadBuff.clear();
+      slowReadBuff.limit(limit);
+
+      nRead = doByteBufferRead(slowReadBuff);
+
+      if (nRead > 0) {
+        // So that next time we call slowReadBuff.hasRemaining(), we don't get a
+        // false positive.
+        slowReadBuff.limit(nRead + slowReadBuff.position());
+      }
+    }
+    return nRead;
+  }
+
+  @Override
+  public synchronized int read(byte[] buf, int off, int len)
+      throws IOException {
+    LOG.trace("read off {} len {}", off, len);
+    if (!verifyChecksum) {
+      return dataIn.read(buf, off, len);
+    }
+
+    int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
+
+    if (nRead > 0) {
+      // Possible that buffer is filled with a larger read than we need, since
+      // we tried to read as much as possible at once
+      nRead = Math.min(len, nRead);
+      slowReadBuff.get(buf, off, nRead);
+    }
+
+    return nRead;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    LOG.debug("skip {}", n);
+    if (n <= 0) {
+      return 0;
+    }
+    if (!verifyChecksum) {
+      return dataIn.skip(n);
+    }
+
+    // caller made sure newPosition is not beyond EOF.
+    int remaining = slowReadBuff.remaining();
+    int position = slowReadBuff.position();
+    int newPosition = position + (int)n;
+
+    // if the new offset is already read into dataBuff, just reposition
+    if (n <= remaining) {
+      assert offsetFromChunkBoundary == 0;
+      slowReadBuff.position(newPosition);
+      return n;
+    }
+
+    // for small gap, read through to keep the data/checksum in sync
+    if (n - remaining <= bytesPerChecksum) {
+      slowReadBuff.position(position + remaining);
+      if (skipBuf == null) {
+        skipBuf = new byte[bytesPerChecksum];
+      }
+      int ret = read(skipBuf, 0, (int)(n - remaining));
+      return (remaining + ret);
+    }
+
+    // optimize for big gap: discard the current buffer, skip to
+    // the beginning of the appropriate checksum chunk and then
+    // read to the middle of that chunk to be in sync with checksums.
+
+    // We can't use this.offsetFromChunkBoundary because we need to know how
+    // many bytes of the offset were really read. Calling read(..) with a
+    // positive this.offsetFromChunkBoundary causes that many bytes to get
+    // silently skipped.
+    int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
+    long toskip = n - remaining - myOffsetFromChunkBoundary;
+
+    slowReadBuff.position(slowReadBuff.limit());
+    checksumBuff.position(checksumBuff.limit());
+
+    IOUtils.skipFully(dataIn, toskip);
+    long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
+    IOUtils.skipFully(checksumIn, checkSumOffset);
+
+    // read into the middle of the chunk
+    if (skipBuf == null) {
+      skipBuf = new byte[bytesPerChecksum];
+    }
+    assert skipBuf.length == bytesPerChecksum;
+    assert myOffsetFromChunkBoundary < bytesPerChecksum;
+
+    int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+
+    if (ret == -1) {  // EOS
+      return (toskip + remaining);
+    } else {
+      return (toskip + remaining + ret);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    IOUtilsClient.cleanup(LOG, dataIn, checksumIn);
+    if (slowReadBuff != null) {
+      bufferPool.returnBuffer(slowReadBuff);
+      slowReadBuff = null;
+    }
+    if (checksumBuff != null) {
+      bufferPool.returnBuffer(checksumBuff);
+      checksumBuff = null;
+    }
+    startOffset = -1;
+    checksum = null;
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return BlockReaderUtil.readAll(this, buf, offset, len);
+  }
+
+  @Override
+  public void readFully(byte[] buf, int off, int len) throws IOException {
+    BlockReaderUtil.readFully(this, buf, off, len);
+  }
+
+  @Override
+  public int available() {
+    // We never do network I/O in BlockReaderLocalLegacy.
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public boolean isShortCircuit() {
+    return true;
+  }
+
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    return null;
+  }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return checksum;
+  }
+
+  @Override
+  public int getNetworkDistance() {
+    return 0;
+  }
+}


Mime
View raw message