hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [27/50] [abbrv] hadoop git commit: HDFS-8925. Move BlockReaderLocal to hdfs-client. Contributed by Mingliang Liu.
Date Wed, 02 Sep 2015 05:58:39 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
new file mode 100644
index 0000000..55aa741
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache of input stream sockets to Data Node.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+@VisibleForTesting
+public class PeerCache {
+  private static final Logger LOG = LoggerFactory.getLogger(PeerCache.class);
+  
+  private static class Key {
+    final DatanodeID dnID;
+    final boolean isDomain;
+    
+    Key(DatanodeID dnID, boolean isDomain) {
+      this.dnID = dnID;
+      this.isDomain = isDomain;
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof Key)) {
+        return false;
+      }
+      Key other = (Key)o;
+      return dnID.equals(other.dnID) && isDomain == other.isDomain;
+    }
+
+    @Override
+    public int hashCode() {
+      return dnID.hashCode() ^ (isDomain ? 1 : 0);
+    }
+  }
+  
+  private static class Value {
+    private final Peer peer;
+    private final long time;
+
+    Value(Peer peer, long time) {
+      this.peer = peer;
+      this.time = time;
+    }
+
+    Peer getPeer() {
+      return peer;
+    }
+
+    long getTime() {
+      return time;
+    }
+  }
+
+  private Daemon daemon;
+  /** A map for per user per datanode. */
+  private final LinkedListMultimap<Key, Value> multimap =
+    LinkedListMultimap.create();
+  private final int capacity;
+  private final long expiryPeriod;
+  
+  public PeerCache(int c, long e) {
+    this.capacity = c;
+    this.expiryPeriod = e;
+
+    if (capacity == 0 ) {
+      LOG.info("SocketCache disabled.");
+    } else if (expiryPeriod == 0) {
+      throw new IllegalStateException("Cannot initialize expiryPeriod to " +
+         expiryPeriod + " when cache is enabled.");
+    }
+  }
+ 
+  private boolean isDaemonStarted() {
+    return (daemon == null)? false: true;
+  }
+
+  private synchronized void startExpiryDaemon() {
+    // start daemon only if not already started
+    if (isDaemonStarted() == true) {
+      return;
+    }
+    
+    daemon = new Daemon(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          PeerCache.this.run();
+        } catch(InterruptedException e) {
+          //noop
+        } finally {
+          PeerCache.this.clear();
+        }
+      }
+
+      @Override
+      public String toString() {
+        return String.valueOf(PeerCache.this);
+      }
+    });
+    daemon.start();
+  }
+
+  /**
+   * Get a cached peer connected to the given DataNode.
+   * @param dnId         The DataNode to get a Peer for.
+   * @param isDomain     Whether to retrieve a DomainPeer or not.
+   *
+   * @return             An open Peer connected to the DN, or null if none
+   *                     was found. 
+   */
+  public Peer get(DatanodeID dnId, boolean isDomain) {
+
+    if (capacity <= 0) { // disabled
+      return null;
+    }
+    return getInternal(dnId, isDomain);
+  }
+
+  private synchronized Peer getInternal(DatanodeID dnId, boolean isDomain) {
+    List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain));
+    if (sockStreamList == null) {
+      return null;
+    }
+
+    Iterator<Value> iter = sockStreamList.iterator();
+    while (iter.hasNext()) {
+      Value candidate = iter.next();
+      iter.remove();
+      long ageMs = Time.monotonicNow() - candidate.getTime();
+      Peer peer = candidate.getPeer();
+      if (ageMs >= expiryPeriod) {
+        try {
+          peer.close();
+        } catch (IOException e) {
+          LOG.warn("got IOException closing stale peer " + peer +
+                ", which is " + ageMs + " ms old");
+        }
+      } else if (!peer.isClosed()) {
+        return peer;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Give an unused socket to the cache.
+   */
+  public void put(DatanodeID dnId, Peer peer) {
+    Preconditions.checkNotNull(dnId);
+    Preconditions.checkNotNull(peer);
+    if (peer.isClosed()) return;
+    if (capacity <= 0) {
+      // Cache disabled.
+      IOUtilsClient.cleanup(LOG, peer);
+      return;
+    }
+    putInternal(dnId, peer);
+  }
+
+  private synchronized void putInternal(DatanodeID dnId, Peer peer) {
+    startExpiryDaemon();
+
+    if (capacity == multimap.size()) {
+      evictOldest();
+    }
+    multimap.put(new Key(dnId, peer.getDomainSocket() != null),
+        new Value(peer, Time.monotonicNow()));
+  }
+
+  public synchronized int size() {
+    return multimap.size();
+  }
+
+  /**
+   * Evict and close sockets older than expiry period from the cache.
+   */
+  private synchronized void evictExpired(long expiryPeriod) {
+    while (multimap.size() != 0) {
+      Iterator<Entry<Key, Value>> iter =
+        multimap.entries().iterator();
+      Entry<Key, Value> entry = iter.next();
+      // if oldest socket expired, remove it
+      if (entry == null || 
+        Time.monotonicNow() - entry.getValue().getTime() <
+        expiryPeriod) {
+        break;
+      }
+      IOUtilsClient.cleanup(LOG, entry.getValue().getPeer());
+      iter.remove();
+    }
+  }
+
+  /**
+   * Evict the oldest entry in the cache.
+   */
+  private synchronized void evictOldest() {
+    // We can get the oldest element immediately, because of an interesting
+    // property of LinkedListMultimap: its iterator traverses entries in the
+    // order that they were added.
+    Iterator<Entry<Key, Value>> iter =
+      multimap.entries().iterator();
+    if (!iter.hasNext()) {
+      throw new IllegalStateException("Cannot evict from empty cache! " +
+        "capacity: " + capacity);
+    }
+    Entry<Key, Value> entry = iter.next();
+    IOUtilsClient.cleanup(LOG, entry.getValue().getPeer());
+    iter.remove();
+  }
+
+  /**
+   * Periodically check in the cache and expire the entries
+   * older than expiryPeriod minutes
+   */
+  private void run() throws InterruptedException {
+    for(long lastExpiryTime = Time.monotonicNow();
+        !Thread.interrupted();
+        Thread.sleep(expiryPeriod)) {
+      final long elapsed = Time.monotonicNow() - lastExpiryTime;
+      if (elapsed >= expiryPeriod) {
+        evictExpired(expiryPeriod);
+        lastExpiryTime = Time.monotonicNow();
+      }
+    }
+    clear();
+    throw new InterruptedException("Daemon Interrupted");
+  }
+
+  /**
+   * Empty the cache, and close all sockets.
+   */
+  @VisibleForTesting
+  synchronized void clear() {
+    for (Value value : multimap.values()) {
+      IOUtilsClient.cleanup(LOG, value.getPeer());
+    }
+    multimap.clear();
+  }
+  
+  @VisibleForTesting
+  void close() {
+    clear();
+    if (daemon != null) {
+      daemon.interrupt();
+      try {
+        daemon.join();
+      } catch (InterruptedException e) {
+        throw new RuntimeException("failed to join thread");
+      }
+    }
+    daemon = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java
new file mode 100644
index 0000000..07f4836
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Options that can be specified when manually triggering a block report.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class BlockReportOptions {
+  private final boolean incremental;
+
+  private BlockReportOptions(boolean incremental) {
+    this.incremental = incremental;
+  }
+
+  public boolean isIncremental() {
+    return incremental;
+  }
+
+  public static class Factory {
+    private boolean incremental = false;
+
+    public Factory() {
+    }
+
+    public Factory setIncremental(boolean incremental) {
+      this.incremental = incremental;
+      return this;
+    }
+
+    public BlockReportOptions build() {
+      return new BlockReportOptions(incremental);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "BlockReportOptions{incremental=" + incremental + "}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 3b492ae..7b1e438 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -113,6 +113,11 @@ public interface HdfsClientConfigKeys {
       "dfs.datanode.hdfs-blocks-metadata.enabled";
   boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
 
+  String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal";
+  String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
+  long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
+  String  DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri";
+
   String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
       PREFIX + "replica.accessor.builder.classes";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
new file mode 100644
index 0000000..69fa52d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A block and the full path information to the block data file and
+ * the metadata file stored on the local file system.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockLocalPathInfo {
+  private final ExtendedBlock block;
+  private String localBlockPath = "";  // local file storing the data
+  private String localMetaPath = "";   // local file storing the checksum
+
+  /**
+   * Constructs BlockLocalPathInfo.
+   * @param b The block corresponding to this lock path info. 
+   * @param file Block data file.
+   * @param metafile Metadata file for the block.
+   */
+  public BlockLocalPathInfo(ExtendedBlock b, String file, String metafile) {
+    block = b;
+    localBlockPath = file;
+    localMetaPath = metafile;
+  }
+
+  /**
+   * Get the Block data file.
+   * @return Block data file.
+   */
+  public String getBlockPath() {return localBlockPath;}
+  
+  /**
+   * @return the Block
+   */
+  public ExtendedBlock getBlock() { return block;}
+  
+  /**
+   * Get the Block metadata file.
+   * @return Block metadata file.
+   */
+  public String getMetaPath() {return localMetaPath;}
+
+  /**
+   * Get number of bytes in the block.
+   * @return Number of bytes in the block.
+   */
+  public long getNumBytes() {
+    return block.getNumBytes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
new file mode 100644
index 0000000..3374868
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.hdfs.client.BlockReportOptions;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenInfo;
+
+/** An client-datanode protocol for block recovery
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+@KerberosInfo(
+    serverPrincipal = HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
+@TokenInfo(BlockTokenSelector.class)
+public interface ClientDatanodeProtocol {
+  /**
+   * Until version 9, this class ClientDatanodeProtocol served as both
+   * the client interface to the DN AND the RPC protocol used to 
+   * communicate with the NN.
+   * 
+   * This class is used by both the DFSClient and the 
+   * DN server side to insulate from the protocol serialization.
+   * 
+   * If you are adding/changing DN's interface then you need to 
+   * change both this class and ALSO related protocol buffer
+   * wire protocol definition in ClientDatanodeProtocol.proto.
+   * 
+   * For more details on protocol buffer wire protocol, please see 
+   * .../org/apache/hadoop/hdfs/protocolPB/overview.html
+   * 
+   * The log of historical changes can be retrieved from the svn).
+   * 9: Added deleteBlockPool method
+   * 
+   * 9 is the last version id when this class was used for protocols
+   *  serialization. DO not update this version any further. 
+   */
+  public static final long versionID = 9L;
+
+  /** Return the visible length of a replica. */
+  long getReplicaVisibleLength(ExtendedBlock b) throws IOException;
+  
+  /**
+   * Refresh the list of federated namenodes from updated configuration
+   * Adds new namenodes and stops the deleted namenodes.
+   * 
+   * @throws IOException on error
+   **/
+  void refreshNamenodes() throws IOException;
+
+  /**
+   * Delete the block pool directory. If force is false it is deleted only if
+   * it is empty, otherwise it is deleted along with its contents.
+   * 
+   * @param bpid Blockpool id to be deleted.
+   * @param force If false blockpool directory is deleted only if it is empty 
+   *          i.e. if it doesn't contain any block files, otherwise it is 
+   *          deleted along with its contents.
+   * @throws IOException
+   */
+  void deleteBlockPool(String bpid, boolean force) throws IOException;
+  
+  /**
+   * Retrieves the path names of the block file and metadata file stored on the
+   * local file system.
+   * 
+   * In order for this method to work, one of the following should be satisfied:
+   * <ul>
+   * <li>
+   * The client user must be configured at the datanode to be able to use this
+   * method.</li>
+   * <li>
+   * When security is enabled, kerberos authentication must be used to connect
+   * to the datanode.</li>
+   * </ul>
+   * 
+   * @param block
+   *          the specified block on the local datanode
+   * @param token
+   *          the block access token.
+   * @return the BlockLocalPathInfo of a block
+   * @throws IOException
+   *           on error
+   */
+  BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+      Token<BlockTokenIdentifier> token) throws IOException;
+  
+  /**
+   * Shuts down a datanode.
+   *
+   * @param forUpgrade If true, data node does extra prep work before shutting
+   *          down. The work includes advising clients to wait and saving
+   *          certain states for quick restart. This should only be used when
+   *          the stored data will remain the same during upgrade/restart.
+   * @throws IOException 
+   */
+  void shutdownDatanode(boolean forUpgrade) throws IOException;  
+
+  /**
+   * Obtains datanode info
+   *
+   * @return software/config version and uptime of the datanode
+   */
+  DatanodeLocalInfo getDatanodeInfo() throws IOException;
+
+  /**
+   * Asynchronously reload configuration on disk and apply changes.
+   */
+  void startReconfiguration() throws IOException;
+
+  /**
+   * Get the status of the previously issued reconfig task.
+   * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
+   */
+  ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
+
+  /**
+   * Get a list of allowed properties for reconfiguration.
+   */
+  List<String> listReconfigurableProperties() throws IOException;
+
+  /**
+   * Trigger a new block report.
+   */
+  void triggerBlockReport(BlockReportOptions options)
+    throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
new file mode 100644
index 0000000..170467e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Encryption key verification failed.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class InvalidEncryptionKeyException extends IOException {
+  private static final long serialVersionUID = 0l;
+
+  public InvalidEncryptionKeyException() {
+    super();
+  }
+
+  public InvalidEncryptionKeyException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java
new file mode 100644
index 0000000..7e3f66b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+
+@KerberosInfo(
+    serverPrincipal = HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
+@TokenInfo(BlockTokenSelector.class)
+@ProtocolInfo(protocolName = 
+    "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol",
+    protocolVersion = 1)
+@InterfaceAudience.Private
+public interface ClientDatanodeProtocolPB extends
+    ClientDatanodeProtocolService.BlockingInterface {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
new file mode 100644
index 0000000..311fcea
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.client.BlockReportOptions;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link ClientDatanodeProtocol} interfaces to the RPC server implementing
+ * {@link ClientDatanodeProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class ClientDatanodeProtocolTranslatorPB implements
+    ProtocolMetaInterface, ClientDatanodeProtocol,
+    ProtocolTranslator, Closeable {
+  public static final Logger LOG = LoggerFactory
+      .getLogger(ClientDatanodeProtocolTranslatorPB.class);
+  
+  /** RpcController is not used and hence is set to null */
+  private final static RpcController NULL_CONTROLLER = null;
+  private final ClientDatanodeProtocolPB rpcProxy;
+  private final static RefreshNamenodesRequestProto VOID_REFRESH_NAMENODES = 
+      RefreshNamenodesRequestProto.newBuilder().build();
+  private final static GetDatanodeInfoRequestProto VOID_GET_DATANODE_INFO =
+      GetDatanodeInfoRequestProto.newBuilder().build();
+  private final static GetReconfigurationStatusRequestProto VOID_GET_RECONFIG_STATUS =
+      GetReconfigurationStatusRequestProto.newBuilder().build();
+  private final static StartReconfigurationRequestProto VOID_START_RECONFIG =
+      StartReconfigurationRequestProto.newBuilder().build();
+  private static final ListReconfigurablePropertiesRequestProto
+      VOID_LIST_RECONFIGURABLE_PROPERTIES =
+      ListReconfigurablePropertiesRequestProto.newBuilder().build();
+
+  public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
+      Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
+      LocatedBlock locatedBlock) throws IOException {
+    rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf, 
+                  socketTimeout, connectToDnViaHostname, locatedBlock);
+  }
+  
+  public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
+      UserGroupInformation ticket, Configuration conf, SocketFactory factory)
+      throws IOException {
+    rpcProxy = createClientDatanodeProtocolProxy(addr, ticket, conf, factory, 0);
+  }
+  
+  /**
+   * Constructor.
+   * @param datanodeid Datanode to connect to.
+   * @param conf Configuration.
+   * @param socketTimeout Socket timeout to use.
+   * @param connectToDnViaHostname connect to the Datanode using its hostname
+   * @throws IOException
+   */
+  public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
+      Configuration conf, int socketTimeout, boolean connectToDnViaHostname)
+      throws IOException {
+    final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
+    InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
+    }
+    rpcProxy = createClientDatanodeProtocolProxy(addr,
+        UserGroupInformation.getCurrentUser(), conf,
+        NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+  }
+
+  static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf, int socketTimeout,
+      boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
+    final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
+    InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
+    }
+    
+    // Since we're creating a new UserGroupInformation here, we know that no
+    // future RPC proxies will be able to re-use the same connection. And
+    // usages of this proxy tend to be one-off calls.
+    //
+    // This is a temporary fix: callers should really achieve this by using
+    // RPC.stopProxy() on the resulting object, but this is currently not
+    // working in trunk. See the discussion on HDFS-1965.
+    Configuration confWithNoIpcIdle = new Configuration(conf);
+    confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic
+        .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
+
+    UserGroupInformation ticket = UserGroupInformation
+        .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
+    ticket.addToken(locatedBlock.getBlockToken());
+    return createClientDatanodeProtocolProxy(addr, ticket, confWithNoIpcIdle,
+        NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+  }
+  
+  static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory, int socketTimeout) throws IOException {
+    RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    return RPC.getProxy(ClientDatanodeProtocolPB.class,
+        RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), addr, ticket,
+        conf, factory, socketTimeout);
+  }
+
+  @Override
+  public void close() {
+    RPC.stopProxy(rpcProxy);
+  }
+
+  @Override
+  public long getReplicaVisibleLength(ExtendedBlock b) throws IOException {
+    GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto
+        .newBuilder().setBlock(PBHelperClient.convert(b)).build();
+    try {
+      return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void refreshNamenodes() throws IOException {
+    try {
+      rpcProxy.refreshNamenodes(NULL_CONTROLLER, VOID_REFRESH_NAMENODES);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void deleteBlockPool(String bpid, boolean force) throws IOException {
+    DeleteBlockPoolRequestProto req = DeleteBlockPoolRequestProto.newBuilder()
+        .setBlockPool(bpid).setForce(force).build();
+    try {
+      rpcProxy.deleteBlockPool(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+      Token<BlockTokenIdentifier> token) throws IOException {
+    GetBlockLocalPathInfoRequestProto req =
+        GetBlockLocalPathInfoRequestProto.newBuilder()
+        .setBlock(PBHelperClient.convert(block))
+        .setToken(PBHelperClient.convert(token)).build();
+    GetBlockLocalPathInfoResponseProto resp;
+    try {
+      resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    return new BlockLocalPathInfo(PBHelperClient.convert(resp.getBlock()),
+        resp.getLocalPath(), resp.getLocalMetaPath());
+  }
+
+  @Override
+  public boolean isMethodSupported(String methodName) throws IOException {
+    return RpcClientUtil.isMethodSupported(rpcProxy,
+        ClientDatanodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName);
+  }
+
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
+
+  @Override
+  public void shutdownDatanode(boolean forUpgrade) throws IOException {
+    ShutdownDatanodeRequestProto request = ShutdownDatanodeRequestProto
+        .newBuilder().setForUpgrade(forUpgrade).build();
+    try {
+      rpcProxy.shutdownDatanode(NULL_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public DatanodeLocalInfo getDatanodeInfo() throws IOException {
+    GetDatanodeInfoResponseProto response;
+    try {
+      response = rpcProxy.getDatanodeInfo(NULL_CONTROLLER, VOID_GET_DATANODE_INFO);
+      return PBHelperClient.convert(response.getLocalInfo());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void startReconfiguration() throws IOException {
+    try {
+      rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
+    GetReconfigurationStatusResponseProto response;
+    Map<PropertyChange, Optional<String>> statusMap = null;
+    long startTime;
+    long endTime = 0;
+    try {
+      response = rpcProxy.getReconfigurationStatus(NULL_CONTROLLER,
+          VOID_GET_RECONFIG_STATUS);
+      startTime = response.getStartTime();
+      if (response.hasEndTime()) {
+        endTime = response.getEndTime();
+      }
+      if (response.getChangesCount() > 0) {
+        statusMap = Maps.newHashMap();
+        for (GetReconfigurationStatusConfigChangeProto change :
+            response.getChangesList()) {
+          PropertyChange pc = new PropertyChange(
+              change.getName(), change.getNewValue(), change.getOldValue());
+          String errorMessage = null;
+          if (change.hasErrorMessage()) {
+            errorMessage = change.getErrorMessage();
+          }
+          statusMap.put(pc, Optional.fromNullable(errorMessage));
+        }
+      }
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
+  }
+
+  @Override
+  public List<String> listReconfigurableProperties()
+      throws IOException {
+    ListReconfigurablePropertiesResponseProto response;
+    try {
+      response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
+          VOID_LIST_RECONFIGURABLE_PROPERTIES);
+      return response.getNameList();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void triggerBlockReport(BlockReportOptions options)
+      throws IOException {
+    try {
+      rpcProxy.triggerBlockReport(NULL_CONTROLLER,
+          TriggerBlockReportRequestProto.newBuilder().
+            setIncremental(options.isIncremental()).
+            build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index edf658a..d921507 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -23,12 +23,14 @@ import com.google.protobuf.CodedInputStream;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeLocalInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
@@ -185,6 +187,17 @@ public class PBHelperClient {
     return pinnings;
   }
 
+  public static ExtendedBlock convert(ExtendedBlockProto eb) {
+    if (eb == null) return null;
+    return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(),
+        eb.getGenerationStamp());
+  }
+
+  public static DatanodeLocalInfo convert(DatanodeLocalInfoProto proto) {
+    return new DatanodeLocalInfo(proto.getSoftwareVersion(),
+        proto.getConfigVersion(), proto.getUptime());
+  }
+
   static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
     if (di == null) return null;
     return convert(di);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
new file mode 100644
index 0000000..f67ca00
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * A block token selector for HDFS
+ */
+@InterfaceAudience.Private
+public class BlockTokenSelector implements TokenSelector<BlockTokenIdentifier> {
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Token<BlockTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) {
+      return null;
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (BlockTokenIdentifier.KIND_NAME.equals(token.getKind())) {
+        return (Token<BlockTokenIdentifier>) token;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 9f77e85..a561909 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -858,6 +858,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone
     classes from BlockManager. (Mingliang Liu via wheat9)
 
+    HDFS-8925. Move BlockReaderLocal to hdfs-client.
+    (Mingliang Liu via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
deleted file mode 100644
index aa3e8ba..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.util.EnumSet;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-
-/**
- * A BlockReader is responsible for reading a single block
- * from a single datanode.
- */
-@InterfaceAudience.Private
-public interface BlockReader extends ByteBufferReadable {
-  
-
-  /* same interface as inputStream java.io.InputStream#read()
-   * used by DFSInputStream#read()
-   * This violates one rule when there is a checksum error:
-   * "Read should not modify user buffer before successful read"
-   * because it first reads the data to user buffer and then checks
-   * the checksum.
-   * Note: this must return -1 on EOF, even in the case of a 0-byte read.
-   * See HDFS-5762 for details.
-   */
-  int read(byte[] buf, int off, int len) throws IOException;
-
-  /**
-   * Skip the given number of bytes
-   */
-  long skip(long n) throws IOException;
-
-  /**
-   * Returns an estimate of the number of bytes that can be read
-   * (or skipped over) from this input stream without performing
-   * network I/O.
-   * This may return more than what is actually present in the block.
-   */
-  int available() throws IOException;
-
-  /**
-   * Close the block reader.
-   *
-   * @throws IOException
-   */
-  void close() throws IOException;
-
-  /**
-   * Read exactly the given amount of data, throwing an exception
-   * if EOF is reached before that amount
-   */
-  void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException;
-
-  /**
-   * Similar to {@link #readFully(byte[], int, int)} except that it will
-   * not throw an exception on EOF. However, it differs from the simple
-   * {@link #read(byte[], int, int)} call in that it is guaranteed to
-   * read the data if it is available. In other words, if this call
-   * does not throw an exception, then either the buffer has been
-   * filled or the next call will return EOF.
-   */
-  int readAll(byte[] buf, int offset, int len) throws IOException;
-
-  /**
-   * @return              true only if this is a local read.
-   */
-  boolean isLocal();
-  
-  /**
-   * @return              true only if this is a short-circuit read.
-   *                      All short-circuit reads are also local.
-   */
-  boolean isShortCircuit();
-
-  /**
-   * Get a ClientMmap object for this BlockReader.
-   *
-   * @param opts          The read options to use.
-   * @return              The ClientMmap object, or null if mmap is not
-   *                      supported.
-   */
-  ClientMmap getClientMmap(EnumSet<ReadOption> opts);
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
deleted file mode 100644
index d913f3a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ /dev/null
@@ -1,741 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.EnumSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.DirectBufferPool;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-/**
- * BlockReaderLocal enables local short circuited reads. If the DFS client is on
- * the same machine as the datanode, then the client can read files directly
- * from the local file system rather than going through the datanode for better
- * performance. <br>
- * {@link BlockReaderLocal} works as follows:
- * <ul>
- * <li>The client performing short circuit reads must be configured at the
- * datanode.</li>
- * <li>The client gets the file descriptors for the metadata file and the data 
- * file for the block using
- * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
- * </li>
- * <li>The client reads the file descriptors.</li>
- * </ul>
- */
-@InterfaceAudience.Private
-class BlockReaderLocal implements BlockReader {
-  static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
-
-  private static final DirectBufferPool bufferPool = new DirectBufferPool();
-
-  public static class Builder {
-    private final int bufferSize;
-    private boolean verifyChecksum;
-    private int maxReadahead;
-    private String filename;
-    private ShortCircuitReplica replica;
-    private long dataPos;
-    private ExtendedBlock block;
-    private StorageType storageType;
-
-    public Builder(ShortCircuitConf conf) {
-      this.maxReadahead = Integer.MAX_VALUE;
-      this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
-      this.bufferSize = conf.getShortCircuitBufferSize();
-    }
-
-    public Builder setVerifyChecksum(boolean verifyChecksum) {
-      this.verifyChecksum = verifyChecksum;
-      return this;
-    }
-
-    public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
-      long readahead = cachingStrategy.getReadahead() != null ?
-          cachingStrategy.getReadahead() :
-              DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
-      this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
-      return this;
-    }
-
-    public Builder setFilename(String filename) {
-      this.filename = filename;
-      return this;
-    }
-
-    public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
-      this.replica = replica;
-      return this;
-    }
-
-    public Builder setStartOffset(long startOffset) {
-      this.dataPos = Math.max(0, startOffset);
-      return this;
-    }
-
-    public Builder setBlock(ExtendedBlock block) {
-      this.block = block;
-      return this;
-    }
-
-    public Builder setStorageType(StorageType storageType) {
-      this.storageType = storageType;
-      return this;
-    }
-
-    public BlockReaderLocal build() {
-      Preconditions.checkNotNull(replica);
-      return new BlockReaderLocal(this);
-    }
-  }
-
-  private boolean closed = false;
-
-  /**
-   * Pair of streams for this block.
-   */
-  private final ShortCircuitReplica replica;
-
-  /**
-   * The data FileChannel.
-   */
-  private final FileChannel dataIn;
-
-  /**
-   * The next place we'll read from in the block data FileChannel.
-   *
-   * If data is buffered in dataBuf, this offset will be larger than the
-   * offset of the next byte which a read() operation will give us.
-   */
-  private long dataPos;
-
-  /**
-   * The Checksum FileChannel.
-   */
-  private final FileChannel checksumIn;
-  
-  /**
-   * Checksum type and size.
-   */
-  private final DataChecksum checksum;
-
-  /**
-   * If false, we will always skip the checksum.
-   */
-  private final boolean verifyChecksum;
-
-  /**
-   * Name of the block, for logging purposes.
-   */
-  private final String filename;
-  
-  /**
-   * Block ID and Block Pool ID.
-   */
-  private final ExtendedBlock block;
-  
-  /**
-   * Cache of Checksum#bytesPerChecksum.
-   */
-  private final int bytesPerChecksum;
-
-  /**
-   * Cache of Checksum#checksumSize.
-   */
-  private final int checksumSize;
-
-  /**
-   * Maximum number of chunks to allocate.
-   *
-   * This is used to allocate dataBuf and checksumBuf, in the event that
-   * we need them.
-   */
-  private final int maxAllocatedChunks;
-
-  /**
-   * True if zero readahead was requested.
-   */
-  private final boolean zeroReadaheadRequested;
-
-  /**
-   * Maximum amount of readahead we'll do.  This will always be at least the,
-   * size of a single chunk, even if {@link #zeroReadaheadRequested} is true.
-   * The reason is because we need to do a certain amount of buffering in order
-   * to do checksumming.
-   * 
-   * This determines how many bytes we'll use out of dataBuf and checksumBuf.
-   * Why do we allocate buffers, and then (potentially) only use part of them?
-   * The rationale is that allocating a lot of buffers of different sizes would
-   * make it very difficult for the DirectBufferPool to re-use buffers. 
-   */
-  private final int maxReadaheadLength;
-
-  /**
-   * Buffers data starting at the current dataPos and extending on
-   * for dataBuf.limit().
-   *
-   * This may be null if we don't need it.
-   */
-  private ByteBuffer dataBuf;
-
-  /**
-   * Buffers checksums starting at the current checksumPos and extending on
-   * for checksumBuf.limit().
-   *
-   * This may be null if we don't need it.
-   */
-  private ByteBuffer checksumBuf;
-
-  /**
-   * StorageType of replica on DataNode.
-   */
-  private StorageType storageType;
-
-  private BlockReaderLocal(Builder builder) {
-    this.replica = builder.replica;
-    this.dataIn = replica.getDataStream().getChannel();
-    this.dataPos = builder.dataPos;
-    this.checksumIn = replica.getMetaStream().getChannel();
-    BlockMetadataHeader header = builder.replica.getMetaHeader();
-    this.checksum = header.getChecksum();
-    this.verifyChecksum = builder.verifyChecksum &&
-        (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
-    this.filename = builder.filename;
-    this.block = builder.block;
-    this.bytesPerChecksum = checksum.getBytesPerChecksum();
-    this.checksumSize = checksum.getChecksumSize();
-
-    this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
-        ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
-    // Calculate the effective maximum readahead.
-    // We can't do more readahead than there is space in the buffer.
-    int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
-        ((Math.min(builder.bufferSize, builder.maxReadahead) +
-            bytesPerChecksum - 1) / bytesPerChecksum);
-    if (maxReadaheadChunks == 0) {
-      this.zeroReadaheadRequested = true;
-      maxReadaheadChunks = 1;
-    } else {
-      this.zeroReadaheadRequested = false;
-    }
-    this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
-    this.storageType = builder.storageType;
-  }
-
-  private synchronized void createDataBufIfNeeded() {
-    if (dataBuf == null) {
-      dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
-      dataBuf.position(0);
-      dataBuf.limit(0);
-    }
-  }
-
-  private synchronized void freeDataBufIfExists() {
-    if (dataBuf != null) {
-      // When disposing of a dataBuf, we have to move our stored file index
-      // backwards.
-      dataPos -= dataBuf.remaining();
-      dataBuf.clear();
-      bufferPool.returnBuffer(dataBuf);
-      dataBuf = null;
-    }
-  }
-
-  private synchronized void createChecksumBufIfNeeded() {
-    if (checksumBuf == null) {
-      checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
-      checksumBuf.position(0);
-      checksumBuf.limit(0);
-    }
-  }
-
-  private synchronized void freeChecksumBufIfExists() {
-    if (checksumBuf != null) {
-      checksumBuf.clear();
-      bufferPool.returnBuffer(checksumBuf);
-      checksumBuf = null;
-    }
-  }
-
-  private synchronized int drainDataBuf(ByteBuffer buf) {
-    if (dataBuf == null) return -1;
-    int oldLimit = dataBuf.limit();
-    int nRead = Math.min(dataBuf.remaining(), buf.remaining());
-    if (nRead == 0) {
-      return (dataBuf.remaining() == 0) ? -1 : 0;
-    }
-    try {
-      dataBuf.limit(dataBuf.position() + nRead);
-      buf.put(dataBuf);
-    } finally {
-      dataBuf.limit(oldLimit);
-    }
-    return nRead;
-  }
-
-  /**
-   * Read from the block file into a buffer.
-   *
-   * This function overwrites checksumBuf.  It will increment dataPos.
-   *
-   * @param buf   The buffer to read into.  May be dataBuf.
-   *              The position and limit of this buffer should be set to
-   *              multiples of the checksum size.
-   * @param canSkipChecksum  True if we can skip checksumming.
-   *
-   * @return      Total bytes read.  0 on EOF.
-   */
-  private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
-      throws IOException {
-    TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" +
-        block.getBlockId() + ")", Sampler.NEVER);
-    try {
-      int total = 0;
-      long startDataPos = dataPos;
-      int startBufPos = buf.position();
-      while (buf.hasRemaining()) {
-        int nRead = dataIn.read(buf, dataPos);
-        if (nRead < 0) {
-          break;
-        }
-        dataPos += nRead;
-        total += nRead;
-      }
-      if (canSkipChecksum) {
-        freeChecksumBufIfExists();
-        return total;
-      }
-      if (total > 0) {
-        try {
-          buf.limit(buf.position());
-          buf.position(startBufPos);
-          createChecksumBufIfNeeded();
-          int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
-          checksumBuf.clear();
-          checksumBuf.limit(checksumsNeeded * checksumSize);
-          long checksumPos = BlockMetadataHeader.getHeaderSize()
-              + ((startDataPos / bytesPerChecksum) * checksumSize);
-          while (checksumBuf.hasRemaining()) {
-            int nRead = checksumIn.read(checksumBuf, checksumPos);
-            if (nRead < 0) {
-              throw new IOException("Got unexpected checksum file EOF at " +
-                  checksumPos + ", block file position " + startDataPos + " for " +
-                  "block " + block + " of file " + filename);
-            }
-            checksumPos += nRead;
-          }
-          checksumBuf.flip();
-
-          checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
-        } finally {
-          buf.position(buf.limit());
-        }
-      }
-      return total;
-    } finally {
-      scope.close();
-    }
-  }
-
-  private boolean createNoChecksumContext() {
-    if (verifyChecksum) {
-      if (storageType != null && storageType.isTransient()) {
-        // Checksums are not stored for replicas on transient storage.  We do not
-        // anchor, because we do not intend for client activity to block eviction
-        // from transient storage on the DataNode side.
-        return true;
-      } else {
-        return replica.addNoChecksumAnchor();
-      }
-    } else {
-      return true;
-    }
-  }
-
-  private void releaseNoChecksumContext() {
-    if (verifyChecksum) {
-      if (storageType == null || !storageType.isTransient()) {
-        replica.removeNoChecksumAnchor();
-      }
-    }
-  }
-
-  @Override
-  public synchronized int read(ByteBuffer buf) throws IOException {
-    boolean canSkipChecksum = createNoChecksumContext();
-    try {
-      String traceString = null;
-      if (LOG.isTraceEnabled()) {
-        traceString = new StringBuilder().
-            append("read(").
-            append("buf.remaining=").append(buf.remaining()).
-            append(", block=").append(block).
-            append(", filename=").append(filename).
-            append(", canSkipChecksum=").append(canSkipChecksum).
-            append(")").toString();
-        LOG.info(traceString + ": starting");
-      }
-      int nRead;
-      try {
-        if (canSkipChecksum && zeroReadaheadRequested) {
-          nRead = readWithoutBounceBuffer(buf);
-        } else {
-          nRead = readWithBounceBuffer(buf, canSkipChecksum);
-        }
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.info(traceString + ": I/O error", e);
-        }
-        throw e;
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.info(traceString + ": returning " + nRead);
-      }
-      return nRead;
-    } finally {
-      if (canSkipChecksum) releaseNoChecksumContext();
-    }
-  }
-
-  private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
-      throws IOException {
-    freeDataBufIfExists();
-    freeChecksumBufIfExists();
-    int total = 0;
-    while (buf.hasRemaining()) {
-      int nRead = dataIn.read(buf, dataPos);
-      if (nRead <= 0) break;
-      dataPos += nRead;
-      total += nRead;
-    }
-    return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
-  }
-
-  /**
-   * Fill the data buffer.  If necessary, validate the data against the
-   * checksums.
-   * 
-   * We always want the offsets of the data contained in dataBuf to be
-   * aligned to the chunk boundary.  If we are validating checksums, we
-   * accomplish this by seeking backwards in the file until we're on a
-   * chunk boundary.  (This is necessary because we can't checksum a
-   * partial chunk.)  If we are not validating checksums, we simply only
-   * fill the latter part of dataBuf.
-   * 
-   * @param canSkipChecksum  true if we can skip checksumming.
-   * @return                 true if we hit EOF.
-   * @throws IOException
-   */
-  private synchronized boolean fillDataBuf(boolean canSkipChecksum)
-      throws IOException {
-    createDataBufIfNeeded();
-    final int slop = (int)(dataPos % bytesPerChecksum);
-    final long oldDataPos = dataPos;
-    dataBuf.limit(maxReadaheadLength);
-    if (canSkipChecksum) {
-      dataBuf.position(slop);
-      fillBuffer(dataBuf, canSkipChecksum);
-    } else {
-      dataPos -= slop;
-      dataBuf.position(0);
-      fillBuffer(dataBuf, canSkipChecksum);
-    }
-    dataBuf.limit(dataBuf.position());
-    dataBuf.position(Math.min(dataBuf.position(), slop));
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
-          "buffer from offset " + oldDataPos + " of " + block);
-    }
-    return dataBuf.limit() != maxReadaheadLength;
-  }
-
-  /**
-   * Read using the bounce buffer.
-   *
-   * A 'direct' read actually has three phases. The first drains any
-   * remaining bytes from the slow read buffer. After this the read is
-   * guaranteed to be on a checksum chunk boundary. If there are still bytes
-   * to read, the fast direct path is used for as many remaining bytes as
-   * possible, up to a multiple of the checksum chunk size. Finally, any
-   * 'odd' bytes remaining at the end of the read cause another slow read to
-   * be issued, which involves an extra copy.
-   *
-   * Every 'slow' read tries to fill the slow read buffer in one go for
-   * efficiency's sake. As described above, all non-checksum-chunk-aligned
-   * reads will be served from the slower read path.
-   *
-   * @param buf              The buffer to read into. 
-   * @param canSkipChecksum  True if we can skip checksums.
-   */
-  private synchronized int readWithBounceBuffer(ByteBuffer buf,
-        boolean canSkipChecksum) throws IOException {
-    int total = 0;
-    int bb = drainDataBuf(buf); // drain bounce buffer if possible
-    if (bb >= 0) {
-      total += bb;
-      if (buf.remaining() == 0) return total;
-    }
-    boolean eof = true, done = false;
-    do {
-      if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
-            && ((dataPos % bytesPerChecksum) == 0)) {
-        // Fast lane: try to read directly into user-supplied buffer, bypassing
-        // bounce buffer.
-        int oldLimit = buf.limit();
-        int nRead;
-        try {
-          buf.limit(buf.position() + maxReadaheadLength);
-          nRead = fillBuffer(buf, canSkipChecksum);
-        } finally {
-          buf.limit(oldLimit);
-        }
-        if (nRead < maxReadaheadLength) {
-          done = true;
-        }
-        if (nRead > 0) {
-          eof = false;
-        }
-        total += nRead;
-      } else {
-        // Slow lane: refill bounce buffer.
-        if (fillDataBuf(canSkipChecksum)) {
-          done = true;
-        }
-        bb = drainDataBuf(buf); // drain bounce buffer if possible
-        if (bb >= 0) {
-          eof = false;
-          total += bb;
-        }
-      }
-    } while ((!done) && (buf.remaining() > 0));
-    return (eof && total == 0) ? -1 : total;
-  }
-
-  @Override
-  public synchronized int read(byte[] arr, int off, int len)
-        throws IOException {
-    boolean canSkipChecksum = createNoChecksumContext();
-    int nRead;
-    try {
-      String traceString = null;
-      if (LOG.isTraceEnabled()) {
-        traceString = new StringBuilder().
-            append("read(arr.length=").append(arr.length).
-            append(", off=").append(off).
-            append(", len=").append(len).
-            append(", filename=").append(filename).
-            append(", block=").append(block).
-            append(", canSkipChecksum=").append(canSkipChecksum).
-            append(")").toString();
-        LOG.trace(traceString + ": starting");
-      }
-      try {
-        if (canSkipChecksum && zeroReadaheadRequested) {
-          nRead = readWithoutBounceBuffer(arr, off, len);
-        } else {
-          nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
-        }
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(traceString + ": I/O error", e);
-        }
-        throw e;
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(traceString + ": returning " + nRead);
-      }
-    } finally {
-      if (canSkipChecksum) releaseNoChecksumContext();
-    }
-    return nRead;
-  }
-
-  private synchronized int readWithoutBounceBuffer(byte arr[], int off,
-        int len) throws IOException {
-    freeDataBufIfExists();
-    freeChecksumBufIfExists();
-    int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
-    if (nRead > 0) {
-      dataPos += nRead;
-    } else if ((nRead == 0) && (dataPos == dataIn.size())) {
-      return -1;
-    }
-    return nRead;
-  }
-
-  private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
-        boolean canSkipChecksum) throws IOException {
-    createDataBufIfNeeded();
-    if (!dataBuf.hasRemaining()) {
-      dataBuf.position(0);
-      dataBuf.limit(maxReadaheadLength);
-      fillDataBuf(canSkipChecksum);
-    }
-    if (dataBuf.remaining() == 0) return -1;
-    int toRead = Math.min(dataBuf.remaining(), len);
-    dataBuf.get(arr, off, toRead);
-    return toRead;
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    int discardedFromBuf = 0;
-    long remaining = n;
-    if ((dataBuf != null) && dataBuf.hasRemaining()) {
-      discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
-      dataBuf.position(dataBuf.position() + discardedFromBuf);
-      remaining -= discardedFromBuf;
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + 
-        filename + "): discarded " + discardedFromBuf + " bytes from " +
-        "dataBuf and advanced dataPos by " + remaining);
-    }
-    dataPos += remaining;
-    return n;
-  }
-
-  @Override
-  public int available() throws IOException {
-    // We never do network I/O in BlockReaderLocal.
-    return Integer.MAX_VALUE;
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    if (closed) return;
-    closed = true;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("close(filename=" + filename + ", block=" + block + ")");
-    }
-    replica.unref();
-    freeDataBufIfExists();
-    freeChecksumBufIfExists();
-  }
-
-  @Override
-  public synchronized void readFully(byte[] arr, int off, int len)
-      throws IOException {
-    BlockReaderUtil.readFully(this, arr, off, len);
-  }
-
-  @Override
-  public synchronized int readAll(byte[] buf, int off, int len)
-      throws IOException {
-    return BlockReaderUtil.readAll(this, buf, off, len);
-  }
-
-  @Override
-  public boolean isLocal() {
-    return true;
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return true;
-  }
-
-  /**
-   * Get or create a memory map for this replica.
-   * 
-   * There are two kinds of ClientMmap objects we could fetch here: one that 
-   * will always read pre-checksummed data, and one that may read data that
-   * hasn't been checksummed.
-   *
-   * If we fetch the former, "safe" kind of ClientMmap, we have to increment
-   * the anchor count on the shared memory slot.  This will tell the DataNode
-   * not to munlock the block until this ClientMmap is closed.
-   * If we fetch the latter, we don't bother with anchoring.
-   *
-   * @param opts     The options to use, such as SKIP_CHECKSUMS.
-   * 
-   * @return         null on failure; the ClientMmap otherwise.
-   */
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    boolean anchor = verifyChecksum &&
-        (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
-    if (anchor) {
-      if (!createNoChecksumContext()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("can't get an mmap for " + block + " of " + filename + 
-              " since SKIP_CHECKSUMS was not given, " +
-              "we aren't skipping checksums, and the block is not mlocked.");
-        }
-        return null;
-      }
-    }
-    ClientMmap clientMmap = null;
-    try {
-      clientMmap = replica.getOrCreateClientMmap(anchor);
-    } finally {
-      if ((clientMmap == null) && anchor) {
-        releaseNoChecksumContext();
-      }
-    }
-    return clientMmap;
-  }
-  
-  @VisibleForTesting
-  boolean getVerifyChecksum() {
-    return this.verifyChecksum;
-  }
-
-  @VisibleForTesting
-  int getMaxReadaheadLength() {
-    return this.maxReadaheadLength;
-  }
-  
-  /**
-   * Make the replica anchorable.  Normally this can only be done by the
-   * DataNode.  This method is only for testing.
-   */
-  @VisibleForTesting
-  void forceAnchorable() {
-    replica.getSlot().makeAnchorable();
-  }
-
-  /**
-   * Make the replica unanchorable.  Normally this can only be done by the
-   * DataNode.  This method is only for testing.
-   */
-  @VisibleForTesting
-  void forceUnanchorable() {
-    replica.getSlot().makeUnanchorable();
-  }
-}


Mime
View raw message