hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [07/50] [abbrv] hadoop git commit: HDFS-8057 Move BlockReader implementation to the client implementation package. Contributed by Takanobu Asanuma
Date Thu, 28 Apr 2016 18:01:51 GMT
HDFS-8057 Move BlockReader implementation to the client implementation package.  Contributed by Takanobu Asanuma


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f308561f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f308561f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f308561f

Branch: refs/heads/YARN-3368
Commit: f308561f1d885491b88db73ac63003202056d661
Parents: 10f0f78
Author: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Authored: Mon Apr 25 12:01:48 2016 -0700
Committer: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Committed: Mon Apr 25 12:01:48 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/BlockReaderFactory.java  | 870 ------------------
 .../apache/hadoop/hdfs/BlockReaderLocal.java    | 724 ---------------
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     | 744 ----------------
 .../org/apache/hadoop/hdfs/BlockReaderUtil.java |  57 --
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |   1 +
 .../apache/hadoop/hdfs/ExternalBlockReader.java | 132 ---
 .../apache/hadoop/hdfs/RemoteBlockReader.java   | 510 -----------
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  | 472 ----------
 .../hdfs/client/impl/BlockReaderFactory.java    | 878 +++++++++++++++++++
 .../hdfs/client/impl/BlockReaderLocal.java      | 725 +++++++++++++++
 .../client/impl/BlockReaderLocalLegacy.java     | 745 ++++++++++++++++
 .../hdfs/client/impl/BlockReaderRemote.java     | 512 +++++++++++
 .../hdfs/client/impl/BlockReaderRemote2.java    | 474 ++++++++++
 .../hdfs/client/impl/BlockReaderUtil.java       |  58 ++
 .../hdfs/client/impl/ExternalBlockReader.java   | 134 +++
 .../dev-support/findbugsExcludeFile.xml         |   2 +-
 .../erasurecode/StripedBlockReader.java         |   4 +-
 .../hdfs/server/namenode/NamenodeFsck.java      |   2 +-
 .../hadoop/fs/TestEnhancedByteBufferAccess.java |   3 +-
 .../apache/hadoop/hdfs/BlockReaderTestUtil.java | 258 ------
 .../apache/hadoop/hdfs/StripedFileTestUtil.java |   1 +
 .../apache/hadoop/hdfs/TestBlockReaderBase.java |  94 --
 .../hadoop/hdfs/TestBlockReaderFactory.java     | 534 -----------
 .../hadoop/hdfs/TestBlockReaderLocal.java       | 780 ----------------
 .../hadoop/hdfs/TestBlockReaderLocalLegacy.java | 220 -----
 .../hdfs/TestClientBlockVerification.java       | 125 ---
 .../org/apache/hadoop/hdfs/TestConnCache.java   |   1 +
 .../hadoop/hdfs/TestDisableConnCache.java       |   1 +
 .../hadoop/hdfs/TestParallelReadUtil.java       |   1 +
 .../hadoop/hdfs/TestRemoteBlockReader.java      |  29 -
 .../hadoop/hdfs/TestRemoteBlockReader2.java     |  25 -
 .../hdfs/client/impl/BlockReaderTestUtil.java   | 267 ++++++
 .../hdfs/client/impl/TestBlockReaderBase.java   |  97 ++
 .../client/impl/TestBlockReaderFactory.java     | 539 ++++++++++++
 .../hdfs/client/impl/TestBlockReaderLocal.java  | 786 +++++++++++++++++
 .../client/impl/TestBlockReaderLocalLegacy.java | 227 +++++
 .../hdfs/client/impl/TestBlockReaderRemote.java |  30 +
 .../client/impl/TestBlockReaderRemote2.java     |  27 +
 .../impl/TestClientBlockVerification.java       | 126 +++
 .../blockmanagement/TestBlockTokenWithDFS.java  |   2 +-
 .../datanode/TestDataNodeVolumeFailure.java     |   2 +-
 .../server/datanode/TestFsDatasetCache.java     |   3 +-
 .../datanode/TestFsDatasetCacheRevocation.java  |   2 +-
 .../server/namenode/TestCacheDirectives.java    |   2 +-
 .../shortcircuit/TestShortCircuitCache.java     |   4 +-
 .../shortcircuit/TestShortCircuitLocalRead.java |   9 +-
 46 files changed, 5646 insertions(+), 5593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
deleted file mode 100644
index 7af4609..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ /dev/null
@@ -1,870 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
-import org.apache.hadoop.hdfs.util.IOUtilsClient;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.PerformanceAdvisory;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.htrace.core.Tracer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility class to create BlockReader implementations.
- */
-@InterfaceAudience.Private
-public class BlockReaderFactory implements ShortCircuitReplicaCreator {
-  static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class);
-
-  public static class FailureInjector {
-    public void injectRequestFileDescriptorsFailure() throws IOException {
-      // do nothing
-    }
-    public boolean getSupportsReceiptVerification() {
-      return true;
-    }
-  }
-
-  @VisibleForTesting
-  static ShortCircuitReplicaCreator
-      createShortCircuitReplicaInfoCallback = null;
-
-  private final DfsClientConf conf;
-
-  /**
-   * Injects failures into specific operations during unit tests.
-   */
-  private static FailureInjector failureInjector = new FailureInjector();
-
-  /**
-   * The file name, for logging and debugging purposes.
-   */
-  private String fileName;
-
-  /**
-   * The block ID and block pool ID to use.
-   */
-  private ExtendedBlock block;
-
-  /**
-   * The block token to use for security purposes.
-   */
-  private Token<BlockTokenIdentifier> token;
-
-  /**
-   * The offset within the block to start reading at.
-   */
-  private long startOffset;
-
-  /**
-   * If false, we won't try to verify the block checksum.
-   */
-  private boolean verifyChecksum;
-
-  /**
-   * The name of this client.
-   */
-  private String clientName;
-
-  /**
-   * The DataNode we're talking to.
-   */
-  private DatanodeInfo datanode;
-
-  /**
-   * StorageType of replica on DataNode.
-   */
-  private StorageType storageType;
-
-  /**
-   * If false, we won't try short-circuit local reads.
-   */
-  private boolean allowShortCircuitLocalReads;
-
-  /**
-   * The ClientContext to use for things like the PeerCache.
-   */
-  private ClientContext clientContext;
-
-  /**
-   * Number of bytes to read. Must be set to a non-negative value.
-   */
-  private long length = -1;
-
-  /**
-   * Caching strategy to use when reading the block.
-   */
-  private CachingStrategy cachingStrategy;
-
-  /**
-   * Socket address to use to connect to peer.
-   */
-  private InetSocketAddress inetSocketAddress;
-
-  /**
-   * Remote peer factory to use to create a peer, if needed.
-   */
-  private RemotePeerFactory remotePeerFactory;
-
-  /**
-   * UserGroupInformation to use for legacy block reader local objects,
-   * if needed.
-   */
-  private UserGroupInformation userGroupInformation;
-
-  /**
-   * Configuration to use for legacy block reader local objects, if needed.
-   */
-  private Configuration configuration;
-
-  /**
-   * The HTrace tracer to use.
-   */
-  private Tracer tracer;
-
-  /**
-   * Information about the domain socket path we should use to connect to the
-   * local peer-- or null if we haven't examined the local domain socket.
-   */
-  private DomainSocketFactory.PathInfo pathInfo;
-
-  /**
-   * The remaining number of times that we'll try to pull a socket out of the
-   * cache.
-   */
-  private int remainingCacheTries;
-
-  public BlockReaderFactory(DfsClientConf conf) {
-    this.conf = conf;
-    this.remainingCacheTries = conf.getNumCachedConnRetry();
-  }
-
-  public BlockReaderFactory setFileName(String fileName) {
-    this.fileName = fileName;
-    return this;
-  }
-
-  public BlockReaderFactory setBlock(ExtendedBlock block) {
-    this.block = block;
-    return this;
-  }
-
-  public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
-    this.token = token;
-    return this;
-  }
-
-  public BlockReaderFactory setStartOffset(long startOffset) {
-    this.startOffset = startOffset;
-    return this;
-  }
-
-  public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
-    this.verifyChecksum = verifyChecksum;
-    return this;
-  }
-
-  public BlockReaderFactory setClientName(String clientName) {
-    this.clientName = clientName;
-    return this;
-  }
-
-  public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
-    this.datanode = datanode;
-    return this;
-  }
-
-  public BlockReaderFactory setStorageType(StorageType storageType) {
-    this.storageType = storageType;
-    return this;
-  }
-
-  public BlockReaderFactory setAllowShortCircuitLocalReads(
-      boolean allowShortCircuitLocalReads) {
-    this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
-    return this;
-  }
-
-  public BlockReaderFactory setClientCacheContext(
-      ClientContext clientContext) {
-    this.clientContext = clientContext;
-    return this;
-  }
-
-  public BlockReaderFactory setLength(long length) {
-    this.length = length;
-    return this;
-  }
-
-  public BlockReaderFactory setCachingStrategy(
-      CachingStrategy cachingStrategy) {
-    this.cachingStrategy = cachingStrategy;
-    return this;
-  }
-
-  public BlockReaderFactory setInetSocketAddress (
-      InetSocketAddress inetSocketAddress) {
-    this.inetSocketAddress = inetSocketAddress;
-    return this;
-  }
-
-  public BlockReaderFactory setUserGroupInformation(
-      UserGroupInformation userGroupInformation) {
-    this.userGroupInformation = userGroupInformation;
-    return this;
-  }
-
-  public BlockReaderFactory setRemotePeerFactory(
-      RemotePeerFactory remotePeerFactory) {
-    this.remotePeerFactory = remotePeerFactory;
-    return this;
-  }
-
-  public BlockReaderFactory setConfiguration(
-      Configuration configuration) {
-    this.configuration = configuration;
-    return this;
-  }
-
-  public BlockReaderFactory setTracer(Tracer tracer) {
-    this.tracer = tracer;
-    return this;
-  }
-
-  @VisibleForTesting
-  public static void setFailureInjectorForTesting(FailureInjector injector) {
-    failureInjector = injector;
-  }
-
-  /**
-   * Build a BlockReader with the given options.
-   *
-   * This function will do the best it can to create a block reader that meets
-   * all of our requirements.  We prefer short-circuit block readers
-   * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
-   * former avoid the overhead of socket communication.  If short-circuit is
-   * unavailable, our next fallback is data transfer over UNIX domain sockets,
-   * if dfs.client.domain.socket.data.traffic has been enabled.  If that doesn't
-   * work, we will try to create a remote block reader that operates over TCP
-   * sockets.
-   *
-   * There are a few caches that are important here.
-   *
-   * The ShortCircuitCache stores file descriptor objects which have been passed
-   * from the DataNode.
-   *
-   * The DomainSocketFactory stores information about UNIX domain socket paths
-   * that we not been able to use in the past, so that we don't waste time
-   * retrying them over and over.  (Like all the caches, it does have a timeout,
-   * though.)
-   *
-   * The PeerCache stores peers that we have used in the past.  If we can reuse
-   * one of these peers, we avoid the overhead of re-opening a socket.  However,
-   * if the socket has been timed out on the remote end, our attempt to reuse
-   * the socket may end with an IOException.  For that reason, we limit our
-   * attempts at socket reuse to dfs.client.cached.conn.retry times.  After
-   * that, we create new sockets.  This avoids the problem where a thread tries
-   * to talk to a peer that it hasn't talked to in a while, and has to clean out
-   * every entry in a socket cache full of stale entries.
-   *
-   * @return The new BlockReader.  We will not return null.
-   *
-   * @throws InvalidToken
-   *             If the block token was invalid.
-   *         InvalidEncryptionKeyException
-   *             If the encryption key was invalid.
-   *         Other IOException
-   *             If there was another problem.
-   */
-  public BlockReader build() throws IOException {
-    Preconditions.checkNotNull(configuration);
-    Preconditions
-        .checkState(length >= 0, "Length must be set to a non-negative value");
-    BlockReader reader = tryToCreateExternalBlockReader();
-    if (reader != null) {
-      return reader;
-    }
-    final ShortCircuitConf scConf = conf.getShortCircuitConf();
-    if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
-      if (clientContext.getUseLegacyBlockReaderLocal()) {
-        reader = getLegacyBlockReaderLocal();
-        if (reader != null) {
-          LOG.trace("{}: returning new legacy block reader local.", this);
-          return reader;
-        }
-      } else {
-        reader = getBlockReaderLocal();
-        if (reader != null) {
-          LOG.trace("{}: returning new block reader local.", this);
-          return reader;
-        }
-      }
-    }
-    if (scConf.isDomainSocketDataTraffic()) {
-      reader = getRemoteBlockReaderFromDomain();
-      if (reader != null) {
-        LOG.trace("{}: returning new remote block reader using UNIX domain "
-            + "socket on {}", this, pathInfo.getPath());
-        return reader;
-      }
-    }
-    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
-        "TCP reads were disabled for testing, but we failed to " +
-        "do a non-TCP read.");
-    return getRemoteBlockReaderFromTcp();
-  }
-
-  private BlockReader tryToCreateExternalBlockReader() {
-    List<Class<? extends ReplicaAccessorBuilder>> clses =
-        conf.getReplicaAccessorBuilderClasses();
-    for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
-      try {
-        ByteArrayDataOutput bado = ByteStreams.newDataOutput();
-        token.write(bado);
-        byte tokenBytes[] = bado.toByteArray();
-
-        Constructor<? extends ReplicaAccessorBuilder> ctor =
-            cls.getConstructor();
-        ReplicaAccessorBuilder builder = ctor.newInstance();
-        long visibleLength = startOffset + length;
-        ReplicaAccessor accessor = builder.
-            setAllowShortCircuitReads(allowShortCircuitLocalReads).
-            setBlock(block.getBlockId(), block.getBlockPoolId()).
-            setGenerationStamp(block.getGenerationStamp()).
-            setBlockAccessToken(tokenBytes).
-            setClientName(clientName).
-            setConfiguration(configuration).
-            setFileName(fileName).
-            setVerifyChecksum(verifyChecksum).
-            setVisibleLength(visibleLength).
-            build();
-        if (accessor == null) {
-          LOG.trace("{}: No ReplicaAccessor created by {}",
-              this, cls.getName());
-        } else {
-          return new ExternalBlockReader(accessor, visibleLength, startOffset);
-        }
-      } catch (Throwable t) {
-        LOG.warn("Failed to construct new object of type " +
-            cls.getName(), t);
-      }
-    }
-    return null;
-  }
-
-
-  /**
-   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
-   * This block reader implements the path-based style of local reads
-   * first introduced in HDFS-2246.
-   */
-  private BlockReader getLegacyBlockReaderLocal() throws IOException {
-    LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this);
-    if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
-      LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address"
-          + "{} is not local", this, inetSocketAddress);
-      return null;
-    }
-    if (clientContext.getDisableLegacyBlockReaderLocal()) {
-      PerformanceAdvisory.LOG.debug("{}: can't construct " +
-          "BlockReaderLocalLegacy because " +
-          "disableLegacyBlockReaderLocal is set.", this);
-      return null;
-    }
-    IOException ioe;
-    try {
-      return BlockReaderLocalLegacy.newBlockReader(conf,
-          userGroupInformation, configuration, fileName, block, token,
-          datanode, startOffset, length, storageType, tracer);
-    } catch (RemoteException remoteException) {
-      ioe = remoteException.unwrapRemoteException(
-                InvalidToken.class, AccessControlException.class);
-    } catch (IOException e) {
-      ioe = e;
-    }
-    if ((!(ioe instanceof AccessControlException)) &&
-        isSecurityException(ioe)) {
-      // Handle security exceptions.
-      // We do not handle AccessControlException here, since
-      // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
-      // that the user is not in dfs.block.local-path-access.user, a condition
-      // which requires us to disable legacy SCR.
-      throw ioe;
-    }
-    LOG.warn(this + ": error creating legacy BlockReaderLocal.  " +
-        "Disabling legacy local reads.", ioe);
-    clientContext.setDisableLegacyBlockReaderLocal();
-    return null;
-  }
-
-  private BlockReader getBlockReaderLocal() throws InvalidToken {
-    LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
-        + " reads.", this);
-    if (pathInfo == null) {
-      pathInfo = clientContext.getDomainSocketFactory()
-          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
-    }
-    if (!pathInfo.getPathState().getUsableForShortCircuit()) {
-      PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
-              "giving up on BlockReaderLocal.", this, pathInfo);
-      return null;
-    }
-    ShortCircuitCache cache = clientContext.getShortCircuitCache();
-    ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
-        block.getBlockPoolId());
-    ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
-    InvalidToken exc = info.getInvalidTokenException();
-    if (exc != null) {
-      LOG.trace("{}: got InvalidToken exception while trying to construct "
-          + "BlockReaderLocal via {}", this, pathInfo.getPath());
-      throw exc;
-    }
-    if (info.getReplica() == null) {
-      PerformanceAdvisory.LOG.debug("{}: failed to get " +
-          "ShortCircuitReplica. Cannot construct " +
-          "BlockReaderLocal via {}", this, pathInfo.getPath());
-      return null;
-    }
-    return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
-        setFilename(fileName).
-        setBlock(block).
-        setStartOffset(startOffset).
-        setShortCircuitReplica(info.getReplica()).
-        setVerifyChecksum(verifyChecksum).
-        setCachingStrategy(cachingStrategy).
-        setStorageType(storageType).
-        setTracer(tracer).
-        build();
-  }
-
-  /**
-   * Fetch a pair of short-circuit block descriptors from a local DataNode.
-   *
-   * @return    Null if we could not communicate with the datanode,
-   *            a new ShortCircuitReplicaInfo object otherwise.
-   *            ShortCircuitReplicaInfo objects may contain either an
-   *            InvalidToken exception, or a ShortCircuitReplica object ready to
-   *            use.
-   */
-  @Override
-  public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
-    if (createShortCircuitReplicaInfoCallback != null) {
-      ShortCircuitReplicaInfo info =
-          createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
-      if (info != null) return info;
-    }
-    LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this);
-    BlockReaderPeer curPeer;
-    while (true) {
-      curPeer = nextDomainPeer();
-      if (curPeer == null) break;
-      if (curPeer.fromCache) remainingCacheTries--;
-      DomainPeer peer = (DomainPeer)curPeer.peer;
-      Slot slot = null;
-      ShortCircuitCache cache = clientContext.getShortCircuitCache();
-      try {
-        MutableBoolean usedPeer = new MutableBoolean(false);
-        slot = cache.allocShmSlot(datanode, peer, usedPeer,
-            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
-            clientName);
-        if (usedPeer.booleanValue()) {
-          LOG.trace("{}: allocShmSlot used up our previous socket {}.  "
-              + "Allocating a new one...", this, peer.getDomainSocket());
-          curPeer = nextDomainPeer();
-          if (curPeer == null) break;
-          peer = (DomainPeer)curPeer.peer;
-        }
-        ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
-        clientContext.getPeerCache().put(datanode, peer);
-        return info;
-      } catch (IOException e) {
-        if (slot != null) {
-          cache.freeSlot(slot);
-        }
-        if (curPeer.fromCache) {
-          // Handle an I/O error we got when using a cached socket.
-          // These are considered less serious, because the socket may be stale.
-          LOG.debug("{}: closing stale domain peer {}", this, peer, e);
-          IOUtilsClient.cleanup(LOG, peer);
-        } else {
-          // Handle an I/O error we got when using a newly created socket.
-          // We temporarily disable the domain socket path for a few minutes in
-          // this case, to prevent wasting more time on it.
-          LOG.warn(this + ": I/O error requesting file descriptors.  " +
-              "Disabling domain socket " + peer.getDomainSocket(), e);
-          IOUtilsClient.cleanup(LOG, peer);
-          clientContext.getDomainSocketFactory()
-              .disableDomainSocketPath(pathInfo.getPath());
-          return null;
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Request file descriptors from a DomainPeer.
-   *
-   * @param peer   The peer to use for communication.
-   * @param slot   If non-null, the shared memory slot to associate with the
-   *               new ShortCircuitReplica.
-   *
-   * @return  A ShortCircuitReplica object if we could communicate with the
-   *          datanode; null, otherwise.
-   * @throws  IOException If we encountered an I/O exception while communicating
-   *          with the datanode.
-   */
-  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
-          Slot slot) throws IOException {
-    ShortCircuitCache cache = clientContext.getShortCircuitCache();
-    final DataOutputStream out =
-        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
-    SlotId slotId = slot == null ? null : slot.getSlotId();
-    new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
-        failureInjector.getSupportsReceiptVerification());
-    DataInputStream in = new DataInputStream(peer.getInputStream());
-    BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
-    DomainSocket sock = peer.getDomainSocket();
-    failureInjector.injectRequestFileDescriptorsFailure();
-    switch (resp.getStatus()) {
-    case SUCCESS:
-      byte buf[] = new byte[1];
-      FileInputStream[] fis = new FileInputStream[2];
-      sock.recvFileInputStreams(fis, buf, 0, buf.length);
-      ShortCircuitReplica replica = null;
-      try {
-        ExtendedBlockId key =
-            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
-        if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
-          LOG.trace("Sending receipt verification byte for slot {}", slot);
-          sock.getOutputStream().write(0);
-        }
-        replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
-            Time.monotonicNow(), slot);
-        return new ShortCircuitReplicaInfo(replica);
-      } catch (IOException e) {
-        // This indicates an error reading from disk, or a format error.  Since
-        // it's not a socket communication problem, we return null rather than
-        // throwing an exception.
-        LOG.warn(this + ": error creating ShortCircuitReplica.", e);
-        return null;
-      } finally {
-        if (replica == null) {
-          IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]);
-        }
-      }
-    case ERROR_UNSUPPORTED:
-      if (!resp.hasShortCircuitAccessVersion()) {
-        LOG.warn("short-circuit read access is disabled for " +
-            "DataNode " + datanode + ".  reason: " + resp.getMessage());
-        clientContext.getDomainSocketFactory()
-            .disableShortCircuitForPath(pathInfo.getPath());
-      } else {
-        LOG.warn("short-circuit read access for the file " +
-            fileName + " is disabled for DataNode " + datanode +
-            ".  reason: " + resp.getMessage());
-      }
-      return null;
-    case ERROR_ACCESS_TOKEN:
-      String msg = "access control error while " +
-          "attempting to set up short-circuit access to " +
-          fileName + resp.getMessage();
-      LOG.debug("{}:{}", this, msg);
-      return new ShortCircuitReplicaInfo(new InvalidToken(msg));
-    default:
-      LOG.warn(this + ": unknown response code " + resp.getStatus() +
-          " while attempting to set up short-circuit access. " +
-          resp.getMessage());
-      clientContext.getDomainSocketFactory()
-          .disableShortCircuitForPath(pathInfo.getPath());
-      return null;
-    }
-  }
-
-  /**
-   * Get a RemoteBlockReader that communicates over a UNIX domain socket.
-   *
-   * @return The new BlockReader, or null if we failed to create the block
-   * reader.
-   *
-   * @throws InvalidToken    If the block token was invalid.
-   * Potentially other security-related execptions.
-   */
-  private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
-    if (pathInfo == null) {
-      pathInfo = clientContext.getDomainSocketFactory()
-          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
-    }
-    if (!pathInfo.getPathState().getUsableForDataTransfer()) {
-      PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
-          "remote block reader because the UNIX domain socket at {}" +
-           " is not usable.", this, pathInfo);
-      return null;
-    }
-    LOG.trace("{}: trying to create a remote block reader from the UNIX domain "
-        + "socket at {}", this, pathInfo.getPath());
-
-    while (true) {
-      BlockReaderPeer curPeer = nextDomainPeer();
-      if (curPeer == null) break;
-      if (curPeer.fromCache) remainingCacheTries--;
-      DomainPeer peer = (DomainPeer)curPeer.peer;
-      BlockReader blockReader = null;
-      try {
-        blockReader = getRemoteBlockReader(peer);
-        return blockReader;
-      } catch (IOException ioe) {
-        IOUtilsClient.cleanup(LOG, peer);
-        if (isSecurityException(ioe)) {
-          LOG.trace("{}: got security exception while constructing a remote "
-                  + " block reader from the unix domain socket at {}",
-              this, pathInfo.getPath(), ioe);
-          throw ioe;
-        }
-        if (curPeer.fromCache) {
-          // Handle an I/O error we got when using a cached peer.  These are
-          // considered less serious because the underlying socket may be stale.
-          LOG.debug("Closed potentially stale domain peer {}", peer, ioe);
-        } else {
-          // Handle an I/O error we got when using a newly created domain peer.
-          // We temporarily disable the domain socket path for a few minutes in
-          // this case, to prevent wasting more time on it.
-          LOG.warn("I/O error constructing remote block reader.  Disabling " +
-              "domain socket " + peer.getDomainSocket(), ioe);
-          clientContext.getDomainSocketFactory()
-              .disableDomainSocketPath(pathInfo.getPath());
-          return null;
-        }
-      } finally {
-        if (blockReader == null) {
-          IOUtilsClient.cleanup(LOG, peer);
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Get a RemoteBlockReader that communicates over a TCP socket.
-   *
-   * @return The new BlockReader.  We will not return null, but instead throw
-   *         an exception if this fails.
-   *
-   * @throws InvalidToken
-   *             If the block token was invalid.
-   *         InvalidEncryptionKeyException
-   *             If the encryption key was invalid.
-   *         Other IOException
-   *             If there was another problem.
-   */
-  private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
-    LOG.trace("{}: trying to create a remote block reader from a TCP socket",
-        this);
-    BlockReader blockReader = null;
-    while (true) {
-      BlockReaderPeer curPeer = null;
-      Peer peer = null;
-      try {
-        curPeer = nextTcpPeer();
-        if (curPeer.fromCache) remainingCacheTries--;
-        peer = curPeer.peer;
-        blockReader = getRemoteBlockReader(peer);
-        return blockReader;
-      } catch (IOException ioe) {
-        if (isSecurityException(ioe)) {
-          LOG.trace("{}: got security exception while constructing a remote "
-              + "block reader from {}", this, peer, ioe);
-          throw ioe;
-        }
-        if ((curPeer != null) && curPeer.fromCache) {
-          // Handle an I/O error we got when using a cached peer.  These are
-          // considered less serious, because the underlying socket may be
-          // stale.
-          LOG.debug("Closed potentially stale remote peer {}", peer, ioe);
-        } else {
-          // Handle an I/O error we got when using a newly created peer.
-          LOG.warn("I/O error constructing remote block reader.", ioe);
-          throw ioe;
-        }
-      } finally {
-        if (blockReader == null) {
-          IOUtilsClient.cleanup(LOG, peer);
-        }
-      }
-    }
-  }
-
-  public static class BlockReaderPeer {
-    final Peer peer;
-    final boolean fromCache;
-
-    BlockReaderPeer(Peer peer, boolean fromCache) {
-      this.peer = peer;
-      this.fromCache = fromCache;
-    }
-  }
-
-  /**
-   * Get the next DomainPeer-- either from the cache or by creating it.
-   *
-   * @return the next DomainPeer, or null if we could not construct one.
-   */
-  private BlockReaderPeer nextDomainPeer() {
-    if (remainingCacheTries > 0) {
-      Peer peer = clientContext.getPeerCache().get(datanode, true);
-      if (peer != null) {
-        LOG.trace("nextDomainPeer: reusing existing peer {}", peer);
-        return new BlockReaderPeer(peer, true);
-      }
-    }
-    DomainSocket sock = clientContext.getDomainSocketFactory().
-        createSocket(pathInfo, conf.getSocketTimeout());
-    if (sock == null) return null;
-    return new BlockReaderPeer(new DomainPeer(sock), false);
-  }
-
-  /**
-   * Get the next TCP-based peer-- either from the cache or by creating it.
-   *
-   * @return the next Peer, or null if we could not construct one.
-   *
-   * @throws IOException  If there was an error while constructing the peer
-   *                      (such as an InvalidEncryptionKeyException)
-   */
-  private BlockReaderPeer nextTcpPeer() throws IOException {
-    if (remainingCacheTries > 0) {
-      Peer peer = clientContext.getPeerCache().get(datanode, false);
-      if (peer != null) {
-        LOG.trace("nextTcpPeer: reusing existing peer {}", peer);
-        return new BlockReaderPeer(peer, true);
-      }
-    }
-    try {
-      Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
-          datanode);
-      LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer);
-      return new BlockReaderPeer(peer, false);
-    } catch (IOException e) {
-      LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to"
-          + "{}", datanode);
-      throw e;
-    }
-  }
-
-  /**
-   * Determine if an exception is security-related.
-   *
-   * We need to handle these exceptions differently than other IOExceptions.
-   * They don't indicate a communication problem.  Instead, they mean that there
-   * is some action the client needs to take, such as refetching block tokens,
-   * renewing encryption keys, etc.
-   *
-   * @param ioe    The exception
-   * @return       True only if the exception is security-related.
-   */
-  private static boolean isSecurityException(IOException ioe) {
-    return (ioe instanceof InvalidToken) ||
-            (ioe instanceof InvalidEncryptionKeyException) ||
-            (ioe instanceof InvalidBlockTokenException) ||
-            (ioe instanceof AccessControlException);
-  }
-
-  @SuppressWarnings("deprecation")
-  private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
-    int networkDistance = clientContext.getNetworkDistance(datanode);
-    if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
-      return RemoteBlockReader.newBlockReader(fileName,
-          block, token, startOffset, length, conf.getIoBufferSize(),
-          verifyChecksum, clientName, peer, datanode,
-          clientContext.getPeerCache(), cachingStrategy, tracer,
-          networkDistance);
-    } else {
-      return RemoteBlockReader2.newBlockReader(
-          fileName, block, token, startOffset, length,
-          verifyChecksum, clientName, peer, datanode,
-          clientContext.getPeerCache(), cachingStrategy, tracer,
-          networkDistance);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
-  }
-
-  /**
-   * File name to print when accessing a block directly (from servlets)
-   * @param s Address of the block location
-   * @param poolId Block pool ID of the block
-   * @param blockId Block ID of the block
-   * @return string that has a file name for debug purposes
-   */
-  public static String getFileName(final InetSocketAddress s,
-      final String poolId, final long blockId) {
-    return s.toString() + ":" + poolId + ":" + blockId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
deleted file mode 100644
index 68630c7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ /dev/null
@@ -1,724 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.EnumSet;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.DirectBufferPool;
-import org.apache.htrace.core.TraceScope;
-import org.apache.htrace.core.Tracer;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * BlockReaderLocal enables local short circuited reads. If the DFS client is on
- * the same machine as the datanode, then the client can read files directly
- * from the local file system rather than going through the datanode for better
- * performance. <br>
- * {@link BlockReaderLocal} works as follows:
- * <ul>
- * <li>The client performing short circuit reads must be configured at the
- * datanode.</li>
- * <li>The client gets the file descriptors for the metadata file and the data
- * file for the block using
- * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
- * </li>
- * <li>The client reads the file descriptors.</li>
- * </ul>
- */
-@InterfaceAudience.Private
-class BlockReaderLocal implements BlockReader {
-  static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class);
-
-  private static final DirectBufferPool bufferPool = new DirectBufferPool();
-
-  public static class Builder {
-    private final int bufferSize;
-    private boolean verifyChecksum;
-    private int maxReadahead;
-    private String filename;
-    private ShortCircuitReplica replica;
-    private long dataPos;
-    private ExtendedBlock block;
-    private StorageType storageType;
-    private Tracer tracer;
-
-    public Builder(ShortCircuitConf conf) {
-      this.maxReadahead = Integer.MAX_VALUE;
-      this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
-      this.bufferSize = conf.getShortCircuitBufferSize();
-    }
-
-    public Builder setVerifyChecksum(boolean verifyChecksum) {
-      this.verifyChecksum = verifyChecksum;
-      return this;
-    }
-
-    public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
-      long readahead = cachingStrategy.getReadahead() != null ?
-          cachingStrategy.getReadahead() :
-              HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
-      this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
-      return this;
-    }
-
-    public Builder setFilename(String filename) {
-      this.filename = filename;
-      return this;
-    }
-
-    public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
-      this.replica = replica;
-      return this;
-    }
-
-    public Builder setStartOffset(long startOffset) {
-      this.dataPos = Math.max(0, startOffset);
-      return this;
-    }
-
-    public Builder setBlock(ExtendedBlock block) {
-      this.block = block;
-      return this;
-    }
-
-    public Builder setStorageType(StorageType storageType) {
-      this.storageType = storageType;
-      return this;
-    }
-
-    public Builder setTracer(Tracer tracer) {
-      this.tracer = tracer;
-      return this;
-    }
-
-    public BlockReaderLocal build() {
-      Preconditions.checkNotNull(replica);
-      return new BlockReaderLocal(this);
-    }
-  }
-
-  private boolean closed = false;
-
-  /**
-   * Pair of streams for this block.
-   */
-  private final ShortCircuitReplica replica;
-
-  /**
-   * The data FileChannel.
-   */
-  private final FileChannel dataIn;
-
-  /**
-   * The next place we'll read from in the block data FileChannel.
-   *
-   * If data is buffered in dataBuf, this offset will be larger than the
-   * offset of the next byte which a read() operation will give us.
-   */
-  private long dataPos;
-
-  /**
-   * The Checksum FileChannel.
-   */
-  private final FileChannel checksumIn;
-
-  /**
-   * Checksum type and size.
-   */
-  private final DataChecksum checksum;
-
-  /**
-   * If false, we will always skip the checksum.
-   */
-  private final boolean verifyChecksum;
-
-  /**
-   * Name of the block, for logging purposes.
-   */
-  private final String filename;
-
-  /**
-   * Block ID and Block Pool ID.
-   */
-  private final ExtendedBlock block;
-
-  /**
-   * Cache of Checksum#bytesPerChecksum.
-   */
-  private final int bytesPerChecksum;
-
-  /**
-   * Cache of Checksum#checksumSize.
-   */
-  private final int checksumSize;
-
-  /**
-   * Maximum number of chunks to allocate.
-   *
-   * This is used to allocate dataBuf and checksumBuf, in the event that
-   * we need them.
-   */
-  private final int maxAllocatedChunks;
-
-  /**
-   * True if zero readahead was requested.
-   */
-  private final boolean zeroReadaheadRequested;
-
-  /**
-   * Maximum amount of readahead we'll do.  This will always be at least the,
-   * size of a single chunk, even if {@link #zeroReadaheadRequested} is true.
-   * The reason is because we need to do a certain amount of buffering in order
-   * to do checksumming.
-   *
-   * This determines how many bytes we'll use out of dataBuf and checksumBuf.
-   * Why do we allocate buffers, and then (potentially) only use part of them?
-   * The rationale is that allocating a lot of buffers of different sizes would
-   * make it very difficult for the DirectBufferPool to re-use buffers.
-   */
-  private final int maxReadaheadLength;
-
-  /**
-   * Buffers data starting at the current dataPos and extending on
-   * for dataBuf.limit().
-   *
-   * This may be null if we don't need it.
-   */
-  private ByteBuffer dataBuf;
-
-  /**
-   * Buffers checksums starting at the current checksumPos and extending on
-   * for checksumBuf.limit().
-   *
-   * This may be null if we don't need it.
-   */
-  private ByteBuffer checksumBuf;
-
-  /**
-   * StorageType of replica on DataNode.
-   */
-  private StorageType storageType;
-
-  /**
-   * The Tracer to use.
-   */
-  private final Tracer tracer;
-
-  private BlockReaderLocal(Builder builder) {
-    this.replica = builder.replica;
-    this.dataIn = replica.getDataStream().getChannel();
-    this.dataPos = builder.dataPos;
-    this.checksumIn = replica.getMetaStream().getChannel();
-    BlockMetadataHeader header = builder.replica.getMetaHeader();
-    this.checksum = header.getChecksum();
-    this.verifyChecksum = builder.verifyChecksum &&
-        (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
-    this.filename = builder.filename;
-    this.block = builder.block;
-    this.bytesPerChecksum = checksum.getBytesPerChecksum();
-    this.checksumSize = checksum.getChecksumSize();
-
-    this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
-        ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
-    // Calculate the effective maximum readahead.
-    // We can't do more readahead than there is space in the buffer.
-    int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
-        ((Math.min(builder.bufferSize, builder.maxReadahead) +
-            bytesPerChecksum - 1) / bytesPerChecksum);
-    if (maxReadaheadChunks == 0) {
-      this.zeroReadaheadRequested = true;
-      maxReadaheadChunks = 1;
-    } else {
-      this.zeroReadaheadRequested = false;
-    }
-    this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
-    this.storageType = builder.storageType;
-    this.tracer = builder.tracer;
-  }
-
-  private synchronized void createDataBufIfNeeded() {
-    if (dataBuf == null) {
-      dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
-      dataBuf.position(0);
-      dataBuf.limit(0);
-    }
-  }
-
-  private synchronized void freeDataBufIfExists() {
-    if (dataBuf != null) {
-      // When disposing of a dataBuf, we have to move our stored file index
-      // backwards.
-      dataPos -= dataBuf.remaining();
-      dataBuf.clear();
-      bufferPool.returnBuffer(dataBuf);
-      dataBuf = null;
-    }
-  }
-
-  private synchronized void createChecksumBufIfNeeded() {
-    if (checksumBuf == null) {
-      checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
-      checksumBuf.position(0);
-      checksumBuf.limit(0);
-    }
-  }
-
-  private synchronized void freeChecksumBufIfExists() {
-    if (checksumBuf != null) {
-      checksumBuf.clear();
-      bufferPool.returnBuffer(checksumBuf);
-      checksumBuf = null;
-    }
-  }
-
-  private synchronized int drainDataBuf(ByteBuffer buf) {
-    if (dataBuf == null) return -1;
-    int oldLimit = dataBuf.limit();
-    int nRead = Math.min(dataBuf.remaining(), buf.remaining());
-    if (nRead == 0) {
-      return (dataBuf.remaining() == 0) ? -1 : 0;
-    }
-    try {
-      dataBuf.limit(dataBuf.position() + nRead);
-      buf.put(dataBuf);
-    } finally {
-      dataBuf.limit(oldLimit);
-    }
-    return nRead;
-  }
-
-  /**
-   * Read from the block file into a buffer.
-   *
-   * This function overwrites checksumBuf.  It will increment dataPos.
-   *
-   * @param buf   The buffer to read into.  May be dataBuf.
-   *              The position and limit of this buffer should be set to
-   *              multiples of the checksum size.
-   * @param canSkipChecksum  True if we can skip checksumming.
-   *
-   * @return      Total bytes read.  0 on EOF.
-   */
-  private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
-      throws IOException {
-    try (TraceScope ignored = tracer.newScope(
-        "BlockReaderLocal#fillBuffer(" + block.getBlockId() + ")")) {
-      int total = 0;
-      long startDataPos = dataPos;
-      int startBufPos = buf.position();
-      while (buf.hasRemaining()) {
-        int nRead = dataIn.read(buf, dataPos);
-        if (nRead < 0) {
-          break;
-        }
-        dataPos += nRead;
-        total += nRead;
-      }
-      if (canSkipChecksum) {
-        freeChecksumBufIfExists();
-        return total;
-      }
-      if (total > 0) {
-        try {
-          buf.limit(buf.position());
-          buf.position(startBufPos);
-          createChecksumBufIfNeeded();
-          int checksumsNeeded = (total + bytesPerChecksum - 1) /
-              bytesPerChecksum;
-          checksumBuf.clear();
-          checksumBuf.limit(checksumsNeeded * checksumSize);
-          long checksumPos = BlockMetadataHeader.getHeaderSize()
-              + ((startDataPos / bytesPerChecksum) * checksumSize);
-          while (checksumBuf.hasRemaining()) {
-            int nRead = checksumIn.read(checksumBuf, checksumPos);
-            if (nRead < 0) {
-              throw new IOException("Got unexpected checksum file EOF at " +
-                  checksumPos + ", block file position " + startDataPos +
-                  " for block " + block + " of file " + filename);
-            }
-            checksumPos += nRead;
-          }
-          checksumBuf.flip();
-
-          checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
-        } finally {
-          buf.position(buf.limit());
-        }
-      }
-      return total;
-    }
-  }
-
-  private boolean createNoChecksumContext() {
-    return !verifyChecksum ||
-        // Checksums are not stored for replicas on transient storage.  We do
-        // not anchor, because we do not intend for client activity to block
-        // eviction from transient storage on the DataNode side.
-        (storageType != null && storageType.isTransient()) ||
-        replica.addNoChecksumAnchor();
-  }
-
-  private void releaseNoChecksumContext() {
-    if (verifyChecksum) {
-      if (storageType == null || !storageType.isTransient()) {
-        replica.removeNoChecksumAnchor();
-      }
-    }
-  }
-
-  @Override
-  public synchronized int read(ByteBuffer buf) throws IOException {
-    boolean canSkipChecksum = createNoChecksumContext();
-    try {
-      String traceFormatStr = "read(buf.remaining={}, block={}, filename={}, "
-          + "canSkipChecksum={})";
-      LOG.trace(traceFormatStr + ": starting",
-          buf.remaining(), block, filename, canSkipChecksum);
-      int nRead;
-      try {
-        if (canSkipChecksum && zeroReadaheadRequested) {
-          nRead = readWithoutBounceBuffer(buf);
-        } else {
-          nRead = readWithBounceBuffer(buf, canSkipChecksum);
-        }
-      } catch (IOException e) {
-        LOG.trace(traceFormatStr + ": I/O error",
-            buf.remaining(), block, filename, canSkipChecksum, e);
-        throw e;
-      }
-      LOG.trace(traceFormatStr + ": returning {}",
-          buf.remaining(), block, filename, canSkipChecksum, nRead);
-      return nRead;
-    } finally {
-      if (canSkipChecksum) releaseNoChecksumContext();
-    }
-  }
-
-  private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
-      throws IOException {
-    freeDataBufIfExists();
-    freeChecksumBufIfExists();
-    int total = 0;
-    while (buf.hasRemaining()) {
-      int nRead = dataIn.read(buf, dataPos);
-      if (nRead <= 0) break;
-      dataPos += nRead;
-      total += nRead;
-    }
-    return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
-  }
-
-  /**
-   * Fill the data buffer.  If necessary, validate the data against the
-   * checksums.
-   *
-   * We always want the offsets of the data contained in dataBuf to be
-   * aligned to the chunk boundary.  If we are validating checksums, we
-   * accomplish this by seeking backwards in the file until we're on a
-   * chunk boundary.  (This is necessary because we can't checksum a
-   * partial chunk.)  If we are not validating checksums, we simply only
-   * fill the latter part of dataBuf.
-   *
-   * @param canSkipChecksum  true if we can skip checksumming.
-   * @return                 true if we hit EOF.
-   * @throws IOException
-   */
-  private synchronized boolean fillDataBuf(boolean canSkipChecksum)
-      throws IOException {
-    createDataBufIfNeeded();
-    final int slop = (int)(dataPos % bytesPerChecksum);
-    final long oldDataPos = dataPos;
-    dataBuf.limit(maxReadaheadLength);
-    if (canSkipChecksum) {
-      dataBuf.position(slop);
-      fillBuffer(dataBuf, true);
-    } else {
-      dataPos -= slop;
-      dataBuf.position(0);
-      fillBuffer(dataBuf, false);
-    }
-    dataBuf.limit(dataBuf.position());
-    dataBuf.position(Math.min(dataBuf.position(), slop));
-    LOG.trace("loaded {} bytes into bounce buffer from offset {} of {}",
-        dataBuf.remaining(), oldDataPos, block);
-    return dataBuf.limit() != maxReadaheadLength;
-  }
-
-  /**
-   * Read using the bounce buffer.
-   *
-   * A 'direct' read actually has three phases. The first drains any
-   * remaining bytes from the slow read buffer. After this the read is
-   * guaranteed to be on a checksum chunk boundary. If there are still bytes
-   * to read, the fast direct path is used for as many remaining bytes as
-   * possible, up to a multiple of the checksum chunk size. Finally, any
-   * 'odd' bytes remaining at the end of the read cause another slow read to
-   * be issued, which involves an extra copy.
-   *
-   * Every 'slow' read tries to fill the slow read buffer in one go for
-   * efficiency's sake. As described above, all non-checksum-chunk-aligned
-   * reads will be served from the slower read path.
-   *
-   * @param buf              The buffer to read into.
-   * @param canSkipChecksum  True if we can skip checksums.
-   */
-  private synchronized int readWithBounceBuffer(ByteBuffer buf,
-        boolean canSkipChecksum) throws IOException {
-    int total = 0;
-    int bb = drainDataBuf(buf); // drain bounce buffer if possible
-    if (bb >= 0) {
-      total += bb;
-      if (buf.remaining() == 0) return total;
-    }
-    boolean eof = true, done = false;
-    do {
-      if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
-            && ((dataPos % bytesPerChecksum) == 0)) {
-        // Fast lane: try to read directly into user-supplied buffer, bypassing
-        // bounce buffer.
-        int oldLimit = buf.limit();
-        int nRead;
-        try {
-          buf.limit(buf.position() + maxReadaheadLength);
-          nRead = fillBuffer(buf, canSkipChecksum);
-        } finally {
-          buf.limit(oldLimit);
-        }
-        if (nRead < maxReadaheadLength) {
-          done = true;
-        }
-        if (nRead > 0) {
-          eof = false;
-        }
-        total += nRead;
-      } else {
-        // Slow lane: refill bounce buffer.
-        if (fillDataBuf(canSkipChecksum)) {
-          done = true;
-        }
-        bb = drainDataBuf(buf); // drain bounce buffer if possible
-        if (bb >= 0) {
-          eof = false;
-          total += bb;
-        }
-      }
-    } while ((!done) && (buf.remaining() > 0));
-    return (eof && total == 0) ? -1 : total;
-  }
-
-  @Override
-  public synchronized int read(byte[] arr, int off, int len)
-        throws IOException {
-    boolean canSkipChecksum = createNoChecksumContext();
-    int nRead;
-    try {
-      final String traceFormatStr = "read(arr.length={}, off={}, len={}, "
-          + "filename={}, block={}, canSkipChecksum={})";
-      LOG.trace(traceFormatStr + ": starting",
-          arr.length, off, len, filename, block, canSkipChecksum);
-      try {
-        if (canSkipChecksum && zeroReadaheadRequested) {
-          nRead = readWithoutBounceBuffer(arr, off, len);
-        } else {
-          nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
-        }
-      } catch (IOException e) {
-        LOG.trace(traceFormatStr + ": I/O error",
-            arr.length, off, len, filename, block, canSkipChecksum, e);
-        throw e;
-      }
-      LOG.trace(traceFormatStr + ": returning {}",
-          arr.length, off, len, filename, block, canSkipChecksum, nRead);
-    } finally {
-      if (canSkipChecksum) releaseNoChecksumContext();
-    }
-    return nRead;
-  }
-
-  private synchronized int readWithoutBounceBuffer(byte arr[], int off,
-        int len) throws IOException {
-    freeDataBufIfExists();
-    freeChecksumBufIfExists();
-    int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
-    if (nRead > 0) {
-      dataPos += nRead;
-    } else if ((nRead == 0) && (dataPos == dataIn.size())) {
-      return -1;
-    }
-    return nRead;
-  }
-
-  private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
-        boolean canSkipChecksum) throws IOException {
-    createDataBufIfNeeded();
-    if (!dataBuf.hasRemaining()) {
-      dataBuf.position(0);
-      dataBuf.limit(maxReadaheadLength);
-      fillDataBuf(canSkipChecksum);
-    }
-    if (dataBuf.remaining() == 0) return -1;
-    int toRead = Math.min(dataBuf.remaining(), len);
-    dataBuf.get(arr, off, toRead);
-    return toRead;
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    int discardedFromBuf = 0;
-    long remaining = n;
-    if ((dataBuf != null) && dataBuf.hasRemaining()) {
-      discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
-      dataBuf.position(dataBuf.position() + discardedFromBuf);
-      remaining -= discardedFromBuf;
-    }
-    LOG.trace("skip(n={}, block={}, filename={}): discarded {} bytes from "
-            + "dataBuf and advanced dataPos by {}",
-        n, block, filename, discardedFromBuf, remaining);
-    dataPos += remaining;
-    return n;
-  }
-
-  @Override
-  public int available() {
-    // We never do network I/O in BlockReaderLocal.
-    return Integer.MAX_VALUE;
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    if (closed) return;
-    closed = true;
-    LOG.trace("close(filename={}, block={})", filename, block);
-    replica.unref();
-    freeDataBufIfExists();
-    freeChecksumBufIfExists();
-  }
-
-  @Override
-  public synchronized void readFully(byte[] arr, int off, int len)
-      throws IOException {
-    BlockReaderUtil.readFully(this, arr, off, len);
-  }
-
-  @Override
-  public synchronized int readAll(byte[] buf, int off, int len)
-      throws IOException {
-    return BlockReaderUtil.readAll(this, buf, off, len);
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return true;
-  }
-
-  /**
-   * Get or create a memory map for this replica.
-   *
-   * There are two kinds of ClientMmap objects we could fetch here: one that
-   * will always read pre-checksummed data, and one that may read data that
-   * hasn't been checksummed.
-   *
-   * If we fetch the former, "safe" kind of ClientMmap, we have to increment
-   * the anchor count on the shared memory slot.  This will tell the DataNode
-   * not to munlock the block until this ClientMmap is closed.
-   * If we fetch the latter, we don't bother with anchoring.
-   *
-   * @param opts     The options to use, such as SKIP_CHECKSUMS.
-   *
-   * @return         null on failure; the ClientMmap otherwise.
-   */
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    boolean anchor = verifyChecksum &&
-        !opts.contains(ReadOption.SKIP_CHECKSUMS);
-    if (anchor) {
-      if (!createNoChecksumContext()) {
-        LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not "
-            + "given, we aren't skipping checksums, and the block is not "
-            + "mlocked.", block, filename);
-        return null;
-      }
-    }
-    ClientMmap clientMmap = null;
-    try {
-      clientMmap = replica.getOrCreateClientMmap(anchor);
-    } finally {
-      if ((clientMmap == null) && anchor) {
-        releaseNoChecksumContext();
-      }
-    }
-    return clientMmap;
-  }
-
-  @VisibleForTesting
-  boolean getVerifyChecksum() {
-    return this.verifyChecksum;
-  }
-
-  @VisibleForTesting
-  int getMaxReadaheadLength() {
-    return this.maxReadaheadLength;
-  }
-
-  /**
-   * Make the replica anchorable.  Normally this can only be done by the
-   * DataNode.  This method is only for testing.
-   */
-  @VisibleForTesting
-  void forceAnchorable() {
-    replica.getSlot().makeAnchorable();
-  }
-
-  /**
-   * Make the replica unanchorable.  Normally this can only be done by the
-   * DataNode.  This method is only for testing.
-   */
-  @VisibleForTesting
-  void forceUnanchorable() {
-    replica.getSlot().makeUnanchorable();
-  }
-
-  @Override
-  public DataChecksum getDataChecksum() {
-    return checksum;
-  }
-
-  @Override
-  public int getNetworkDistance() {
-    return 0;
-  }
-}


Mime
View raw message