hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [21/42] hadoop git commit: HDFS-8934. Move ShortCircuitShm to hdfs-client. Contributed by Mingliang Liu.
Date Tue, 25 Aug 2015 17:12:32 GMT
HDFS-8934. Move ShortCircuitShm to hdfs-client. Contributed by Mingliang Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/490bb5eb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/490bb5eb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/490bb5eb

Branch: refs/heads/YARN-1197
Commit: 490bb5ebd6c6d6f9c08fcad167f976687fc3aa42
Parents: 61bf9ca
Author: Haohui Mai <wheat9@apache.org>
Authored: Sat Aug 22 13:30:19 2015 -0700
Committer: Haohui Mai <wheat9@apache.org>
Committed: Sat Aug 22 13:31:03 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/ExtendedBlockId.java |  82 +++
 .../org/apache/hadoop/hdfs/net/DomainPeer.java  | 132 ++++
 .../java/org/apache/hadoop/hdfs/net/Peer.java   | 123 ++++
 .../datatransfer/BlockConstructionStage.java    |  62 ++
 .../datatransfer/DataTransferProtoUtil.java     | 146 +++++
 .../datatransfer/DataTransferProtocol.java      | 202 ++++++
 .../hadoop/hdfs/protocol/datatransfer/Op.java   |  66 ++
 .../hdfs/protocol/datatransfer/Sender.java      | 261 ++++++++
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  | 254 ++++++++
 .../token/block/InvalidBlockTokenException.java |  41 ++
 .../hdfs/server/datanode/CachingStrategy.java   |  76 +++
 .../hadoop/hdfs/shortcircuit/DfsClientShm.java  | 119 ++++
 .../hdfs/shortcircuit/DfsClientShmManager.java  | 522 +++++++++++++++
 .../hdfs/shortcircuit/ShortCircuitShm.java      | 647 +++++++++++++++++++
 .../hadoop/hdfs/util/ExactSizeInputStream.java  | 125 ++++
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../apache/hadoop/hdfs/BlockReaderFactory.java  |   4 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  10 +-
 .../org/apache/hadoop/hdfs/DataStreamer.java    |   6 +-
 .../org/apache/hadoop/hdfs/ExtendedBlockId.java |  82 ---
 .../apache/hadoop/hdfs/RemoteBlockReader.java   |   4 +-
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  |   4 +-
 .../org/apache/hadoop/hdfs/net/DomainPeer.java  | 132 ----
 .../java/org/apache/hadoop/hdfs/net/Peer.java   | 123 ----
 .../datatransfer/BlockConstructionStage.java    |  62 --
 .../datatransfer/DataTransferProtoUtil.java     | 148 -----
 .../datatransfer/DataTransferProtocol.java      | 201 ------
 .../hadoop/hdfs/protocol/datatransfer/Op.java   |  66 --
 .../hdfs/protocol/datatransfer/PipelineAck.java |   2 +-
 .../hdfs/protocol/datatransfer/Receiver.java    |   7 +-
 .../hdfs/protocol/datatransfer/Sender.java      | 261 --------
 .../datatransfer/sasl/DataTransferSaslUtil.java |   2 +-
 ...tDatanodeProtocolServerSideTranslatorPB.java |   2 +-
 .../ClientDatanodeProtocolTranslatorPB.java     |   6 +-
 ...tNamenodeProtocolServerSideTranslatorPB.java |   6 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  28 +-
 .../DatanodeProtocolClientSideTranslatorPB.java |   4 +-
 .../InterDatanodeProtocolTranslatorPB.java      |   2 +-
 .../NamenodeProtocolTranslatorPB.java           |   2 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 228 +------
 .../token/block/InvalidBlockTokenException.java |  41 --
 .../hadoop/hdfs/server/balancer/Dispatcher.java |   2 +-
 .../hdfs/server/datanode/CachingStrategy.java   |  76 ---
 .../hadoop/hdfs/server/datanode/DataNode.java   |   4 +-
 .../hdfs/server/datanode/DataXceiver.java       |  14 +-
 .../server/namenode/FSImageFormatPBINode.java   |   5 +-
 .../hadoop/hdfs/shortcircuit/DfsClientShm.java  | 119 ----
 .../hdfs/shortcircuit/DfsClientShmManager.java  | 514 ---------------
 .../hdfs/shortcircuit/ShortCircuitCache.java    |   4 +-
 .../hdfs/shortcircuit/ShortCircuitShm.java      | 646 ------------------
 .../hadoop/hdfs/util/ExactSizeInputStream.java  | 125 ----
 .../hadoop/hdfs/protocolPB/TestPBHelper.java    |  20 +-
 52 files changed, 2949 insertions(+), 2873 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
new file mode 100644
index 0000000..7b9e8e3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+
+/**
+ * An immutable key which identifies a block.
+ */
+@InterfaceAudience.Private
+final public class ExtendedBlockId {
+  /**
+   * The block ID for this block.
+   */
+  private final long blockId;
+
+  /**
+   * The block pool ID for this block.
+   */
+  private final String bpId;
+
+  public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) {
+    return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+  }
+  
+  public ExtendedBlockId(long blockId, String bpId) {
+    this.blockId = blockId;
+    this.bpId = bpId;
+  }
+
+  public long getBlockId() {
+    return this.blockId;
+  }
+
+  public String getBlockPoolId() {
+    return this.bpId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if ((o == null) || (o.getClass() != this.getClass())) {
+      return false;
+    }
+    ExtendedBlockId other = (ExtendedBlockId)o;
+    return new EqualsBuilder().
+        append(blockId, other.blockId).
+        append(bpId, other.bpId).
+        isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().
+        append(this.blockId).
+        append(this.bpId).
+        toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append(blockId).
+        append("_").append(bpId).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
new file mode 100644
index 0000000..4792b0e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Represents a peer that we communicate with by using blocking I/O 
+ * on a UNIX domain socket.
+ */
+@InterfaceAudience.Private
+public class DomainPeer implements Peer {
+  private final DomainSocket socket;
+  private final OutputStream out;
+  private final InputStream in;
+  private final ReadableByteChannel channel;
+
+  public DomainPeer(DomainSocket socket) {
+    this.socket = socket;
+    this.out = socket.getOutputStream();
+    this.in = socket.getInputStream();
+    this.channel = socket.getChannel();
+  }
+
+  @Override
+  public ReadableByteChannel getInputStreamChannel() {
+    return channel;
+  }
+
+  @Override
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs);
+  }
+
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+  }
+
+  @Override
+  public boolean getTcpNoDelay() throws IOException {
+    /* No TCP, no TCP_NODELAY. */
+    return false;
+  }
+
+  @Override
+  public void setWriteTimeout(int timeoutMs) throws IOException {
+    socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs);
+  }
+
+  @Override
+  public boolean isClosed() {
+    return !socket.isOpen();
+  }
+
+  @Override
+  public void close() throws IOException {
+    socket.close();
+  }
+
+  @Override
+  public String getRemoteAddressString() {
+    return "unix:" + socket.getPath();
+  }
+
+  @Override
+  public String getLocalAddressString() {
+    return "<local>";
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public boolean isLocal() {
+    /* UNIX domain sockets can only be used for local communication. */
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "DomainPeer(" + getRemoteAddressString() + ")";
+  }
+
+  @Override
+  public DomainSocket getDomainSocket() {
+    return socket;
+  }
+
+  @Override
+  public boolean hasSecureChannel() {
+    //
+    // Communication over domain sockets is assumed to be secure, since it
+    // doesn't pass over any network.  We also carefully control the privileges
+    // that can be used on the domain socket inode and its parent directories.
+    // See #{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0}
+    // for details.
+    //
+    // So unless you are running as root or the hdfs superuser, you cannot
+    // launch a man-in-the-middle attach on UNIX domain socket traffic.
+    //
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
new file mode 100644
index 0000000..42cf287
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.net.unix.DomainSocket;
+
+/**
+ * Represents a connection to a peer.
+ */
+@InterfaceAudience.Private
+public interface Peer extends Closeable {
+  /**
+   * @return                The input stream channel associated with this
+   *                        peer, or null if it has none.
+   */
+  public ReadableByteChannel getInputStreamChannel();
+
+  /**
+   * Set the read timeout on this peer.
+   *
+   * @param timeoutMs       The timeout in milliseconds.
+   */
+  public void setReadTimeout(int timeoutMs) throws IOException;
+
+  /**
+   * @return                The receive buffer size.
+   */
+  public int getReceiveBufferSize() throws IOException;
+
+  /**
+   * @return                True if TCP_NODELAY is turned on.
+   */
+  public boolean getTcpNoDelay() throws IOException;
+
+  /**
+   * Set the write timeout on this peer.
+   *
+   * Note: this is not honored for BasicInetPeer.
+   * See {@link BasicSocketPeer#setWriteTimeout} for details.
+   * 
+   * @param timeoutMs       The timeout in milliseconds.
+   */
+  public void setWriteTimeout(int timeoutMs) throws IOException;
+
+  /**
+   * @return                true only if the peer is closed.
+   */
+  public boolean isClosed();
+  
+  /**
+   * Close the peer.
+   *
+   * It's safe to re-close a Peer that is already closed.
+   */
+  public void close() throws IOException;
+
+  /**
+   * @return               A string representing the remote end of our 
+   *                       connection to the peer.
+   */
+  public String getRemoteAddressString();
+
+  /**
+   * @return               A string representing the local end of our 
+   *                       connection to the peer.
+   */
+  public String getLocalAddressString();
+  
+  /**
+   * @return               An InputStream associated with the Peer.
+   *                       This InputStream will be valid until you close
+   *                       this peer with Peer#close.
+   */
+  public InputStream getInputStream() throws IOException;
+  
+  /**
+   * @return               An OutputStream associated with the Peer.
+   *                       This OutputStream will be valid until you close
+   *                       this peer with Peer#close.
+   */
+  public OutputStream getOutputStream() throws IOException;
+
+  /**
+   * @return               True if the peer resides on the same
+   *                       computer as we.
+   */
+  public boolean isLocal();
+
+  /**
+   * @return               The DomainSocket associated with the current
+   *                       peer, or null if there is none.
+   */
+  public DomainSocket getDomainSocket();
+  
+  /**
+   * Return true if the channel is secure.
+   *
+   * @return               True if our channel to this peer is not
+   *                       susceptible to man-in-the-middle attacks.
+   */
+  public boolean hasSecureChannel();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
new file mode 100644
index 0000000..5f86e52
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/** Block Construction Stage */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum BlockConstructionStage {
+  /** The enumerates are always listed as regular stage followed by the
+   * recovery stage. 
+   * Changing this order will make getRecoveryStage not working.
+   */
+  // pipeline set up for block append
+  PIPELINE_SETUP_APPEND,
+  // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+  PIPELINE_SETUP_APPEND_RECOVERY,
+  // data streaming
+  DATA_STREAMING,
+  // pipeline setup for failed data streaming recovery
+  PIPELINE_SETUP_STREAMING_RECOVERY,
+  // close the block and pipeline
+  PIPELINE_CLOSE,
+  // Recover a failed PIPELINE_CLOSE
+  PIPELINE_CLOSE_RECOVERY,
+  // pipeline set up for block creation
+  PIPELINE_SETUP_CREATE,
+  // transfer RBW for adding datanodes
+  TRANSFER_RBW,
+  // transfer Finalized for adding datanodes
+  TRANSFER_FINALIZED;
+  
+  final static private byte RECOVERY_BIT = (byte)1;
+  
+  /**
+   * get the recovery stage of this stage
+   */
+  public BlockConstructionStage getRecoveryStage() {
+    if (this == PIPELINE_SETUP_CREATE) {
+      throw new IllegalArgumentException( "Unexpected blockStage " + this);
+    } else {
+      return values()[ordinal()|RECOVERY_BIT];
+    }
+  }
+}    

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
new file mode 100644
index 0000000..28097ab
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceInfo;
+import org.apache.htrace.TraceScope;
+
+/**
+ * Static utilities for dealing with the protocol buffers used by the
+ * Data Transfer Protocol.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class DataTransferProtoUtil {
+  static BlockConstructionStage fromProto(
+      OpWriteBlockProto.BlockConstructionStage stage) {
+    return BlockConstructionStage.valueOf(stage.name());
+  }
+
+  static OpWriteBlockProto.BlockConstructionStage toProto(
+      BlockConstructionStage stage) {
+    return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name());
+  }
+
+  public static ChecksumProto toProto(DataChecksum checksum) {
+    ChecksumTypeProto type = PBHelperClient.convert(checksum.getChecksumType());
+    // ChecksumType#valueOf never returns null
+    return ChecksumProto.newBuilder()
+      .setBytesPerChecksum(checksum.getBytesPerChecksum())
+      .setType(type)
+      .build();
+  }
+
+  public static DataChecksum fromProto(ChecksumProto proto) {
+    if (proto == null) return null;
+
+    int bytesPerChecksum = proto.getBytesPerChecksum();
+    DataChecksum.Type type = PBHelperClient.convert(proto.getType());
+    return DataChecksum.newDataChecksum(type, bytesPerChecksum);
+  }
+
+  static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
+      String client, Token<BlockTokenIdentifier> blockToken) {
+    ClientOperationHeaderProto header =
+      ClientOperationHeaderProto.newBuilder()
+        .setBaseHeader(buildBaseHeader(blk, blockToken))
+        .setClientName(client)
+        .build();
+    return header;
+  }
+
+  static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
+      Token<BlockTokenIdentifier> blockToken) {
+    BaseHeaderProto.Builder builder =  BaseHeaderProto.newBuilder()
+      .setBlock(PBHelperClient.convert(blk))
+      .setToken(PBHelperClient.convert(blockToken));
+    if (Trace.isTracing()) {
+      Span s = Trace.currentSpan();
+      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
+          .setTraceId(s.getTraceId())
+          .setParentId(s.getSpanId()));
+    }
+    return builder.build();
+  }
+
+  public static TraceInfo fromProto(DataTransferTraceInfoProto proto) {
+    if (proto == null) return null;
+    if (!proto.hasTraceId()) return null;
+    return new TraceInfo(proto.getTraceId(), proto.getParentId());
+  }
+
+  public static TraceScope continueTraceSpan(ClientOperationHeaderProto header,
+      String description) {
+    return continueTraceSpan(header.getBaseHeader(), description);
+  }
+
+  public static TraceScope continueTraceSpan(BaseHeaderProto header,
+      String description) {
+    return continueTraceSpan(header.getTraceInfo(), description);
+  }
+
+  public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto,
+      String description) {
+    TraceScope scope = null;
+    TraceInfo info = fromProto(proto);
+    if (info != null) {
+      scope = Trace.startSpan(description, info);
+    }
+    return scope;
+  }
+
+  public static void checkBlockOpStatus(
+          BlockOpResponseProto response,
+          String logInfo) throws IOException {
+    if (response.getStatus() != Status.SUCCESS) {
+      if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+        throw new InvalidBlockTokenException(
+          "Got access token error"
+          + ", status message " + response.getMessage()
+          + ", " + logInfo
+        );
+      } else {
+        throw new IOException(
+          "Got error"
+          + ", status message " + response.getMessage()
+          + ", " + logInfo
+        );
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
new file mode 100644
index 0000000..1f7e378
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transfer data to/from datanode using a streaming protocol.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface DataTransferProtocol {
+  public static final Logger LOG = LoggerFactory.getLogger(DataTransferProtocol.class);
+  
+  /** Version for data transfers between clients and datanodes
+   * This should change when serialization of DatanodeInfo, not just
+   * when protocol changes. It is not very obvious. 
+   */
+  /*
+   * Version 28:
+   *    Declare methods in DataTransferProtocol interface.
+   */
+  public static final int DATA_TRANSFER_VERSION = 28;
+
+  /** 
+   * Read a block.
+   * 
+   * @param blk the block being read.
+   * @param blockToken security token for accessing the block.
+   * @param clientName client's name.
+   * @param blockOffset offset of the block.
+   * @param length maximum number of bytes for this read.
+   * @param sendChecksum if false, the DN should skip reading and sending
+   *        checksums
+   * @param cachingStrategy  The caching strategy to use.
+   */
+  public void readBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final long blockOffset,
+      final long length,
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException;
+
+  /**
+   * Write a block to a datanode pipeline.
+   * The receiver datanode of this call is the next datanode in the pipeline.
+   * The other downstream datanodes are specified by the targets parameter.
+   * Note that the receiver {@link DatanodeInfo} is not required in the
+   * parameter list since the receiver datanode knows its info.  However, the
+   * {@link StorageType} for storing the replica in the receiver datanode is a 
+   * parameter since the receiver datanode may support multiple storage types.
+   *
+   * @param blk the block being written.
+   * @param storageType for storing the replica in the receiver datanode.
+   * @param blockToken security token for accessing the block.
+   * @param clientName client's name.
+   * @param targets other downstream datanodes in the pipeline.
+   * @param targetStorageTypes target {@link StorageType}s corresponding
+   *                           to the target datanodes.
+   * @param source source datanode.
+   * @param stage pipeline stage.
+   * @param pipelineSize the size of the pipeline.
+   * @param minBytesRcvd minimum number of bytes received.
+   * @param maxBytesRcvd maximum number of bytes received.
+   * @param latestGenerationStamp the latest generation stamp of the block.
+   * @param pinning whether to pin the block, so Balancer won't move it.
+   * @param targetPinnings whether to pin the block on target datanode
+   */
+  public void writeBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes, 
+      final DatanodeInfo source,
+      final BlockConstructionStage stage,
+      final int pipelineSize,
+      final long minBytesRcvd,
+      final long maxBytesRcvd,
+      final long latestGenerationStamp,
+      final DataChecksum requestedChecksum,
+      final CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException;
+  /**
+   * Transfer a block to another datanode.
+   * The block stage must be
+   * either {@link BlockConstructionStage#TRANSFER_RBW}
+   * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
+   * 
+   * @param blk the block being transferred.
+   * @param blockToken security token for accessing the block.
+   * @param clientName client's name.
+   * @param targets target datanodes.
+   */
+  public void transferBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes) throws IOException;
+
+  /**
+   * Request short circuit access file descriptors from a DataNode.
+   *
+   * @param blk             The block to get file descriptors for.
+   * @param blockToken      Security token for accessing the block.
+   * @param slotId          The shared memory slot id to use, or null 
+   *                          to use no slot id.
+   * @param maxVersion      Maximum version of the block data the client 
+   *                          can understand.
+   * @param supportsReceiptVerification  True if the client supports
+   *                          receipt verification.
+   */
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
+        throws IOException;
+
+  /**
+   * Release a pair of short-circuit FDs requested earlier.
+   *
+   * @param slotId          SlotID used by the earlier file descriptors.
+   */
+  public void releaseShortCircuitFds(final SlotId slotId) throws IOException;
+
+  /**
+   * Request a short circuit shared memory area from a DataNode.
+   * 
+   * @param clientName       The name of the client.
+   */
+  public void requestShortCircuitShm(String clientName) throws IOException;
+  
+  /**
+   * Receive a block from a source datanode
+   * and then notifies the namenode
+   * to remove the copy from the original datanode.
+   * Note that the source datanode and the original datanode can be different.
+   * It is used for balancing purpose.
+   * 
+   * @param blk the block being replaced.
+   * @param storageType the {@link StorageType} for storing the block.
+   * @param blockToken security token for accessing the block.
+   * @param delHint the hint for deleting the block in the original datanode.
+   * @param source the source datanode for receiving the block.
+   */
+  public void replaceBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
+      final Token<BlockTokenIdentifier> blockToken,
+      final String delHint,
+      final DatanodeInfo source) throws IOException;
+
+  /**
+   * Copy a block. 
+   * It is used for balancing purpose.
+   * 
+   * @param blk the block being copied.
+   * @param blockToken security token for accessing the block.
+   */
+  public void copyBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException;
+
+  /**
+   * Get block checksum (MD5 of CRC32).
+   * 
+   * @param blk a block.
+   * @param blockToken security token for accessing the block.
+   * @throws IOException
+   */
+  public void blockChecksum(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
new file mode 100644
index 0000000..3077498
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/** Operation */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum Op {
+  WRITE_BLOCK((byte)80),
+  READ_BLOCK((byte)81),
+  READ_METADATA((byte)82),
+  REPLACE_BLOCK((byte)83),
+  COPY_BLOCK((byte)84),
+  BLOCK_CHECKSUM((byte)85),
+  TRANSFER_BLOCK((byte)86),
+  REQUEST_SHORT_CIRCUIT_FDS((byte)87),
+  RELEASE_SHORT_CIRCUIT_FDS((byte)88),
+  REQUEST_SHORT_CIRCUIT_SHM((byte)89),
+  CUSTOM((byte)127);
+
+  /** The code for this operation. */
+  public final byte code;
+  
+  private Op(byte code) {
+    this.code = code;
+  }
+  
+  private static final int FIRST_CODE = values()[0].code;
+  /** Return the object represented by the code. */
+  private static Op valueOf(byte code) {
+    final int i = (code & 0xff) - FIRST_CODE;
+    return i < 0 || i >= values().length? null: values()[i];
+  }
+
+  /** Read from in */
+  public static Op read(DataInput in) throws IOException {
+    return valueOf(in.readByte());
+  }
+
+  /** Write to out */
+  public void write(DataOutput out) throws IOException {
+    out.write(code);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
new file mode 100644
index 0000000..2d11dc2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+import org.apache.htrace.Trace;
+import org.apache.htrace.Span;
+
+import com.google.protobuf.Message;
+
+/** Sender */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class Sender implements DataTransferProtocol {
+  private final DataOutputStream out;
+
+  /** Create a sender for DataTransferProtocol with a output stream. */
+  public Sender(final DataOutputStream out) {
+    this.out = out;    
+  }
+
+  /** Initialize a operation. */
+  private static void op(final DataOutput out, final Op op
+      ) throws IOException {
+    out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+    op.write(out);
+  }
+
+  private static void send(final DataOutputStream out, final Op opcode,
+      final Message proto) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
+          + ": " + proto);
+    }
+    op(out, opcode);
+    proto.writeDelimitedTo(out);
+    out.flush();
+  }
+
+  static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
+    CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
+    if (cachingStrategy.getReadahead() != null) {
+      builder.setReadahead(cachingStrategy.getReadahead().longValue());
+    }
+    if (cachingStrategy.getDropBehind() != null) {
+      builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
+    }
+    return builder.build();
+  }
+
+  @Override
+  public void readBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final long blockOffset,
+      final long length,
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
+
+    OpReadBlockProto proto = OpReadBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
+      .setOffset(blockOffset)
+      .setLen(length)
+      .setSendChecksums(sendChecksum)
+      .setCachingStrategy(getCachingStrategy(cachingStrategy))
+      .build();
+
+    send(out, Op.READ_BLOCK, proto);
+  }
+  
+
+  @Override
+  public void writeBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes, 
+      final DatanodeInfo source,
+      final BlockConstructionStage stage,
+      final int pipelineSize,
+      final long minBytesRcvd,
+      final long maxBytesRcvd,
+      final long latestGenerationStamp,
+      DataChecksum requestedChecksum,
+      final CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException {
+    ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
+        blk, clientName, blockToken);
+    
+    ChecksumProto checksumProto =
+      DataTransferProtoUtil.toProto(requestedChecksum);
+
+    OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
+      .setHeader(header)
+      .setStorageType(PBHelperClient.convertStorageType(storageType))
+      .addAllTargets(PBHelperClient.convert(targets, 1))
+      .addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes, 1))
+      .setStage(toProto(stage))
+      .setPipelineSize(pipelineSize)
+      .setMinBytesRcvd(minBytesRcvd)
+      .setMaxBytesRcvd(maxBytesRcvd)
+      .setLatestGenerationStamp(latestGenerationStamp)
+      .setRequestedChecksum(checksumProto)
+      .setCachingStrategy(getCachingStrategy(cachingStrategy))
+      .setAllowLazyPersist(allowLazyPersist)
+      .setPinning(pinning)
+      .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1));
+    
+    if (source != null) {
+      proto.setSource(PBHelperClient.convertDatanodeInfo(source));
+    }
+
+    send(out, Op.WRITE_BLOCK, proto.build());
+  }
+
+  @Override
+  public void transferBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes) throws IOException {
+    
+    OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildClientHeader(
+          blk, clientName, blockToken))
+      .addAllTargets(PBHelperClient.convert(targets))
+      .addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes))
+      .build();
+
+    send(out, Op.TRANSFER_BLOCK, proto);
+  }
+
+  @Override
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
+        throws IOException {
+    OpRequestShortCircuitAccessProto.Builder builder =
+        OpRequestShortCircuitAccessProto.newBuilder()
+          .setHeader(DataTransferProtoUtil.buildBaseHeader(
+            blk, blockToken)).setMaxVersion(maxVersion);
+    if (slotId != null) {
+      builder.setSlotId(PBHelperClient.convert(slotId));
+    }
+    builder.setSupportsReceiptVerification(supportsReceiptVerification);
+    OpRequestShortCircuitAccessProto proto = builder.build();
+    send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
+  }
+  
+  @Override
+  public void releaseShortCircuitFds(SlotId slotId) throws IOException {
+    ReleaseShortCircuitAccessRequestProto.Builder builder =
+        ReleaseShortCircuitAccessRequestProto.newBuilder().
+        setSlotId(PBHelperClient.convert(slotId));
+    if (Trace.isTracing()) {
+      Span s = Trace.currentSpan();
+      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
+          .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
+    }
+    ReleaseShortCircuitAccessRequestProto proto = builder.build();
+    send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
+  }
+
+  @Override
+  public void requestShortCircuitShm(String clientName) throws IOException {
+    ShortCircuitShmRequestProto.Builder builder =
+        ShortCircuitShmRequestProto.newBuilder().
+        setClientName(clientName);
+    if (Trace.isTracing()) {
+      Span s = Trace.currentSpan();
+      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
+          .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
+    }
+    ShortCircuitShmRequestProto proto = builder.build();
+    send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
+  }
+  
+  @Override
+  public void replaceBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
+      final Token<BlockTokenIdentifier> blockToken,
+      final String delHint,
+      final DatanodeInfo source) throws IOException {
+    OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+      .setStorageType(PBHelperClient.convertStorageType(storageType))
+      .setDelHint(delHint)
+      .setSource(PBHelperClient.convertDatanodeInfo(source))
+      .build();
+    
+    send(out, Op.REPLACE_BLOCK, proto);
+  }
+
+  @Override
+  public void copyBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+    OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+      .build();
+    
+    send(out, Op.COPY_BLOCK, proto);
+  }
+
+  @Override
+  public void blockChecksum(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+    OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+      .build();
+    
+    send(out, Op.BLOCK_CHECKSUM, proto);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
new file mode 100644
index 0000000..edf658a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utilities for converting protobuf classes to and from implementation classes
+ * and other helper utilities to help in dealing with protobuf.
+ *
+ * Note that when converting from an internal type to protobuf type, the
+ * converter never return null for protobuf type. The check for internal type
+ * being null must be done before calling the convert() method.
+ */
+public class PBHelperClient {
+  private PBHelperClient() {
+    /** Hidden constructor */
+  }
+
+  public static ByteString getByteString(byte[] bytes) {
+    return ByteString.copyFrom(bytes);
+  }
+
+  public static ShmId convert(ShortCircuitShmIdProto shmId) {
+    return new ShmId(shmId.getHi(), shmId.getLo());
+  }
+
+  public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
+    return DataChecksum.Type.valueOf(type.getNumber());
+  }
+
+  public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
+    return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
+  }
+
+  public static ExtendedBlockProto convert(final ExtendedBlock b) {
+    if (b == null) return null;
+    return ExtendedBlockProto.newBuilder().
+      setPoolId(b.getBlockPoolId()).
+      setBlockId(b.getBlockId()).
+      setNumBytes(b.getNumBytes()).
+      setGenerationStamp(b.getGenerationStamp()).
+      build();
+  }
+
+  public static TokenProto convert(Token<?> tok) {
+    return TokenProto.newBuilder().
+      setIdentifier(ByteString.copyFrom(tok.getIdentifier())).
+      setPassword(ByteString.copyFrom(tok.getPassword())).
+      setKind(tok.getKind().toString()).
+      setService(tok.getService().toString()).build();
+  }
+
+  public static ShortCircuitShmIdProto convert(ShmId shmId) {
+    return ShortCircuitShmIdProto.newBuilder().
+      setHi(shmId.getHi()).
+      setLo(shmId.getLo()).
+      build();
+
+  }
+
+  public static ShortCircuitShmSlotProto convert(SlotId slotId) {
+    return ShortCircuitShmSlotProto.newBuilder().
+      setShmId(convert(slotId.getShmId())).
+      setSlotIdx(slotId.getSlotIdx()).
+      build();
+  }
+
+  public static DatanodeIDProto convert(DatanodeID dn) {
+    // For wire compatibility with older versions we transmit the StorageID
+    // which is the same as the DatanodeUuid. Since StorageID is a required
+    // field we pass the empty string if the DatanodeUuid is not yet known.
+    return DatanodeIDProto.newBuilder()
+      .setIpAddr(dn.getIpAddr())
+      .setHostName(dn.getHostName())
+      .setXferPort(dn.getXferPort())
+      .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "")
+      .setInfoPort(dn.getInfoPort())
+      .setInfoSecurePort(dn.getInfoSecurePort())
+      .setIpcPort(dn.getIpcPort()).build();
+  }
+
+  public static DatanodeInfoProto.AdminState convert(
+    final DatanodeInfo.AdminStates inAs) {
+    switch (inAs) {
+      case NORMAL: return  DatanodeInfoProto.AdminState.NORMAL;
+      case DECOMMISSION_INPROGRESS:
+        return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS;
+      case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED;
+      default: return DatanodeInfoProto.AdminState.NORMAL;
+    }
+  }
+
+  public static DatanodeInfoProto convert(DatanodeInfo info) {
+    DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
+    if (info.getNetworkLocation() != null) {
+      builder.setLocation(info.getNetworkLocation());
+    }
+    builder
+      .setId(convert((DatanodeID) info))
+      .setCapacity(info.getCapacity())
+      .setDfsUsed(info.getDfsUsed())
+      .setRemaining(info.getRemaining())
+      .setBlockPoolUsed(info.getBlockPoolUsed())
+      .setCacheCapacity(info.getCacheCapacity())
+      .setCacheUsed(info.getCacheUsed())
+      .setLastUpdate(info.getLastUpdate())
+      .setLastUpdateMonotonic(info.getLastUpdateMonotonic())
+      .setXceiverCount(info.getXceiverCount())
+      .setAdminState(convert(info.getAdminState()))
+      .build();
+    return builder.build();
+  }
+
+  public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
+    DatanodeInfo[] dnInfos) {
+    return convert(dnInfos, 0);
+  }
+
+  /**
+   * Copy from {@code dnInfos} to a target of list of same size starting at
+   * {@code startIdx}.
+   */
+  public static List<? extends HdfsProtos.DatanodeInfoProto> convert(
+    DatanodeInfo[] dnInfos, int startIdx) {
+    if (dnInfos == null)
+      return null;
+    ArrayList<HdfsProtos.DatanodeInfoProto> protos = Lists
+      .newArrayListWithCapacity(dnInfos.length);
+    for (int i = startIdx; i < dnInfos.length; i++) {
+      protos.add(convert(dnInfos[i]));
+    }
+    return protos;
+  }
+
+  public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
+    List<Boolean> pinnings = new ArrayList<>();
+    if (targetPinnings == null) {
+      pinnings.add(Boolean.FALSE);
+    } else {
+      for (; idx < targetPinnings.length; ++idx) {
+        pinnings.add(targetPinnings[idx]);
+      }
+    }
+    return pinnings;
+  }
+
+  static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
+    if (di == null) return null;
+    return convert(di);
+  }
+
+  public static StorageTypeProto convertStorageType(StorageType type) {
+    switch(type) {
+      case DISK:
+        return StorageTypeProto.DISK;
+      case SSD:
+        return StorageTypeProto.SSD;
+      case ARCHIVE:
+        return StorageTypeProto.ARCHIVE;
+      case RAM_DISK:
+        return StorageTypeProto.RAM_DISK;
+      default:
+        throw new IllegalStateException(
+          "BUG: StorageType not found, type=" + type);
+    }
+  }
+
+  public static StorageType convertStorageType(StorageTypeProto type) {
+    switch(type) {
+      case DISK:
+        return StorageType.DISK;
+      case SSD:
+        return StorageType.SSD;
+      case ARCHIVE:
+        return StorageType.ARCHIVE;
+      case RAM_DISK:
+        return StorageType.RAM_DISK;
+      default:
+        throw new IllegalStateException(
+          "BUG: StorageTypeProto not found, type=" + type);
+    }
+  }
+
+  public static List<StorageTypeProto> convertStorageTypes(
+    StorageType[] types) {
+    return convertStorageTypes(types, 0);
+  }
+
+  public static List<StorageTypeProto> convertStorageTypes(
+    StorageType[] types, int startIdx) {
+    if (types == null) {
+      return null;
+    }
+    final List<StorageTypeProto> protos = new ArrayList<>(
+      types.length);
+    for (int i = startIdx; i < types.length; ++i) {
+      protos.add(PBHelperClient.convertStorageType(types[i]));
+    }
+    return protos;
+  }
+
+  public static InputStream vintPrefixed(final InputStream input)
+    throws IOException {
+    final int firstByte = input.read();
+    if (firstByte == -1) {
+      throw new EOFException("Premature EOF: no length prefix available");
+    }
+
+    int size = CodedInputStream.readRawVarint32(firstByte, input);
+    assert size >= 0;
+    return new ExactSizeInputStream(input, size);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
new file mode 100644
index 0000000..2fa86fa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Access token verification failed.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class InvalidBlockTokenException extends IOException {
+  private static final long serialVersionUID = 168L;
+
+  public InvalidBlockTokenException() {
+    super();
+  }
+
+  public InvalidBlockTokenException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
new file mode 100644
index 0000000..215df13
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+/**
+ * The caching strategy we should use for an HDFS read or write operation.
+ */
+public class CachingStrategy {
+  private final Boolean dropBehind; // null = use server defaults
+  private final Long readahead; // null = use server defaults
+  
+  public static CachingStrategy newDefaultStrategy() {
+    return new CachingStrategy(null, null);
+  }
+
+  public static CachingStrategy newDropBehind() {
+    return new CachingStrategy(true, null);
+  }
+
+  public static class Builder {
+    private Boolean dropBehind;
+    private Long readahead;
+
+    public Builder(CachingStrategy prev) {
+      this.dropBehind = prev.dropBehind;
+      this.readahead = prev.readahead;
+    }
+
+    public Builder setDropBehind(Boolean dropBehind) {
+      this.dropBehind = dropBehind;
+      return this;
+    }
+
+    public Builder setReadahead(Long readahead) {
+      this.readahead = readahead;
+      return this;
+    }
+
+    public CachingStrategy build() {
+      return new CachingStrategy(dropBehind, readahead);
+    }
+  }
+
+  public CachingStrategy(Boolean dropBehind, Long readahead) {
+    this.dropBehind = dropBehind;
+    this.readahead = readahead;
+  }
+
+  public Boolean getDropBehind() {
+    return dropBehind;
+  }
+  
+  public Long getReadahead() {
+    return readahead;
+  }
+
+  public String toString() {
+    return "CachingStrategy(dropBehind=" + dropBehind +
+        ", readahead=" + readahead + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
new file mode 100644
index 0000000..81cc68d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.EndpointShmManager;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.DomainSocketWatcher;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * DfsClientShm is a subclass of ShortCircuitShm which is used by the
+ * DfsClient.
+ * When the UNIX domain socket associated with this shared memory segment
+ * closes unexpectedly, we mark the slots inside this segment as disconnected.
+ * ShortCircuitReplica objects that contain disconnected slots are stale,
+ * and will not be used to service new reads or mmap operations.
+ * However, in-progress read or mmap operations will continue to proceed.
+ * Once the last slot is deallocated, the segment can be safely munmapped.
+ *
+ * Slots may also become stale because the associated replica has been deleted
+ * on the DataNode.  In this case, the DataNode will clear the 'valid' bit.
+ * The client will then see these slots as stale (see
+ * #{ShortCircuitReplica#isStale}).
+ */
+public class DfsClientShm extends ShortCircuitShm
+    implements DomainSocketWatcher.Handler {
+  /**
+   * The EndpointShmManager associated with this shared memory segment.
+   */
+  private final EndpointShmManager manager;
+
+  /**
+   * The UNIX domain socket associated with this DfsClientShm.
+   * We rely on the DomainSocketWatcher to close the socket associated with
+   * this DomainPeer when necessary.
+   */
+  private final DomainPeer peer;
+
+  /**
+   * True if this shared memory segment has lost its connection to the
+   * DataNode.
+   *
+   * {@link DfsClientShm#handle} sets this to true.
+   */
+  private boolean disconnected = false;
+
+  DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
+      DomainPeer peer) throws IOException {
+    super(shmId, stream);
+    this.manager = manager;
+    this.peer = peer;
+  }
+
+  public EndpointShmManager getEndpointShmManager() {
+    return manager;
+  }
+
+  public DomainPeer getPeer() {
+    return peer;
+  }
+
+  /**
+   * Determine if the shared memory segment is disconnected from the DataNode.
+   *
+   * This must be called with the DfsClientShmManager lock held.
+   *
+   * @return   True if the shared memory segment is stale.
+   */
+  public synchronized boolean isDisconnected() {
+    return disconnected;
+  }
+
+  /**
+   * Handle the closure of the UNIX domain socket associated with this shared
+   * memory segment by marking this segment as stale.
+   *
+   * If there are no slots associated with this shared memory segment, it will
+   * be freed immediately in this function.
+   */
+  @Override
+  public boolean handle(DomainSocket sock) {
+    manager.unregisterShm(getShmId());
+    synchronized (this) {
+      Preconditions.checkState(!disconnected);
+      disconnected = true;
+      boolean hadSlots = false;
+      for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) {
+        Slot slot = iter.next();
+        slot.makeInvalid();
+        hadSlots = true;
+      }
+      if (!hadSlots) {
+        free();
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
new file mode 100644
index 0000000..f70398a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.DomainSocketWatcher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages short-circuit memory segments for an HDFS client.
+ * 
+ * Clients are responsible for requesting and releasing shared memory segments used
+ * for communicating with the DataNode. The client will try to allocate new slots
+ * in the set of existing segments, falling back to getting a new segment from the
+ * DataNode via {@link DataTransferProtocol#requestShortCircuitFds}.
+ * 
+ * The counterpart to this class on the DataNode is {@link ShortCircuitRegistry}.
+ * See {@link ShortCircuitRegistry} for more information on the communication protocol.
+ */
+@InterfaceAudience.Private
+public class DfsClientShmManager implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DfsClientShmManager.class);
+
+  /**
+   * Manages short-circuit memory segments that pertain to a given DataNode.
+   */
+  class EndpointShmManager {
+    /**
+     * The datanode we're managing.
+     */
+    private final DatanodeInfo datanode;
+
+    /**
+     * Shared memory segments which have no empty slots.
+     *
+     * Protected by the manager lock.
+     */
+    private final TreeMap<ShmId, DfsClientShm> full =
+        new TreeMap<ShmId, DfsClientShm>();
+
+    /**
+     * Shared memory segments which have at least one empty slot.
+     *
+     * Protected by the manager lock.
+     */
+    private final TreeMap<ShmId, DfsClientShm> notFull =
+        new TreeMap<ShmId, DfsClientShm>();
+
+    /**
+     * True if this datanode doesn't support short-circuit shared memory
+     * segments.
+     *
+     * Protected by the manager lock.
+     */
+    private boolean disabled = false;
+
+    /**
+     * True if we're in the process of loading a shared memory segment from
+     * this DataNode.
+     *
+     * Protected by the manager lock.
+     */
+    private boolean loading = false;
+
+    EndpointShmManager (DatanodeInfo datanode) {
+      this.datanode = datanode;
+    }
+
+    /**
+     * Pull a slot out of a preexisting shared memory segment.
+     *
+     * Must be called with the manager lock held.
+     *
+     * @param blockId     The blockId to put inside the Slot object.
+     *
+     * @return            null if none of our shared memory segments contain a
+     *                      free slot; the slot object otherwise.
+     */
+    private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
+      if (notFull.isEmpty()) {
+        return null;
+      }
+      Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
+      DfsClientShm shm = entry.getValue();
+      ShmId shmId = shm.getShmId();
+      Slot slot = shm.allocAndRegisterSlot(blockId);
+      if (shm.isFull()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
+              " out of " + shm);
+        }
+        DfsClientShm removedShm = notFull.remove(shmId);
+        Preconditions.checkState(removedShm == shm);
+        full.put(shmId, shm);
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
+              " out of " + shm);
+        }
+      }
+      return slot;
+    }
+
+    /**
+     * Ask the DataNode for a new shared memory segment.  This function must be
+     * called with the manager lock held.  We will release the lock while
+     * communicating with the DataNode.
+     *
+     * @param clientName    The current client name.
+     * @param peer          The peer to use to talk to the DataNode.
+     *
+     * @return              Null if the DataNode does not support shared memory
+     *                        segments, or experienced an error creating the
+     *                        shm.  The shared memory segment itself on success.
+     * @throws IOException  If there was an error communicating over the socket.
+     *                        We will not throw an IOException unless the socket
+     *                        itself (or the network) is the problem.
+     */
+    private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
+        throws IOException {
+      final DataOutputStream out = 
+          new DataOutputStream(
+              new BufferedOutputStream(peer.getOutputStream()));
+      new Sender(out).requestShortCircuitShm(clientName);
+      ShortCircuitShmResponseProto resp = 
+          ShortCircuitShmResponseProto.parseFrom(
+            PBHelperClient.vintPrefixed(peer.getInputStream()));
+      String error = resp.hasError() ? resp.getError() : "(unknown)";
+      switch (resp.getStatus()) {
+      case SUCCESS:
+        DomainSocket sock = peer.getDomainSocket();
+        byte buf[] = new byte[1];
+        FileInputStream fis[] = new FileInputStream[1];
+        if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
+          throw new EOFException("got EOF while trying to transfer the " +
+              "file descriptor for the shared memory segment.");
+        }
+        if (fis[0] == null) {
+          throw new IOException("the datanode " + datanode + " failed to " +
+              "pass a file descriptor for the shared memory segment.");
+        }
+        try {
+          DfsClientShm shm = 
+              new DfsClientShm(PBHelperClient.convert(resp.getId()),
+                  fis[0], this, peer);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": createNewShm: created " + shm);
+          }
+          return shm;
+        } finally {
+          try {
+            fis[0].close();
+          } catch (Throwable e) {
+            LOG.debug("Exception in closing " + fis[0], e);
+          }
+        }
+      case ERROR_UNSUPPORTED:
+        // The DataNode just does not support short-circuit shared memory
+        // access, and we should stop asking.
+        LOG.info(this + ": datanode does not support short-circuit " +
+            "shared memory access: " + error);
+        disabled = true;
+        return null;
+      default:
+        // The datanode experienced some kind of unexpected error when trying to
+        // create the short-circuit shared memory segment.
+        LOG.warn(this + ": error requesting short-circuit shared memory " +
+            "access: " + error);
+        return null;
+      }
+    }
+
+    /**
+     * Allocate a new shared memory slot connected to this datanode.
+     *
+     * Must be called with the EndpointShmManager lock held.
+     *
+     * @param peer          The peer to use to talk to the DataNode.
+     * @param usedPeer      (out param) Will be set to true if we used the peer.
+     *                        When a peer is used
+     *
+     * @param clientName    The client name.
+     * @param blockId       The block ID to use.
+     * @return              null if the DataNode does not support shared memory
+     *                        segments, or experienced an error creating the
+     *                        shm.  The shared memory segment itself on success.
+     * @throws IOException  If there was an error communicating over the socket.
+     */
+    Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
+        String clientName, ExtendedBlockId blockId) throws IOException {
+      while (true) {
+        if (closed) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": the DfsClientShmManager has been closed.");
+          }
+          return null;
+        }
+        if (disabled) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": shared memory segment access is disabled.");
+          }
+          return null;
+        }
+        // Try to use an existing slot.
+        Slot slot = allocSlotFromExistingShm(blockId);
+        if (slot != null) {
+          return slot;
+        }
+        // There are no free slots.  If someone is loading more slots, wait
+        // for that to finish.
+        if (loading) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": waiting for loading to finish...");
+          }
+          finishedLoading.awaitUninterruptibly();
+        } else {
+          // Otherwise, load the slot ourselves.
+          loading = true;
+          lock.unlock();
+          DfsClientShm shm;
+          try {
+            shm = requestNewShm(clientName, peer);
+            if (shm == null) continue;
+            // See #{DfsClientShmManager#domainSocketWatcher} for details
+            // about why we do this before retaking the manager lock.
+            domainSocketWatcher.add(peer.getDomainSocket(), shm);
+            // The DomainPeer is now our responsibility, and should not be
+            // closed by the caller.
+            usedPeer.setValue(true);
+          } finally {
+            lock.lock();
+            loading = false;
+            finishedLoading.signalAll();
+          }
+          if (shm.isDisconnected()) {
+            // If the peer closed immediately after the shared memory segment
+            // was created, the DomainSocketWatcher callback might already have
+            // fired and marked the shm as disconnected.  In this case, we
+            // obviously don't want to add the SharedMemorySegment to our list
+            // of valid not-full segments.
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(this + ": the UNIX domain socket associated with " +
+                  "this short-circuit memory closed before we could make " +
+                  "use of the shm.");
+            }
+          } else {
+            notFull.put(shm.getShmId(), shm);
+          }
+        }
+      }
+    }
+    
+    /**
+     * Stop tracking a slot.
+     *
+     * Must be called with the EndpointShmManager lock held.
+     *
+     * @param slot          The slot to release.
+     */
+    void freeSlot(Slot slot) {
+      DfsClientShm shm = (DfsClientShm)slot.getShm();
+      shm.unregisterSlot(slot.getSlotIdx());
+      if (shm.isDisconnected()) {
+        // Stale shared memory segments should not be tracked here.
+        Preconditions.checkState(!full.containsKey(shm.getShmId()));
+        Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
+        if (shm.isEmpty()) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": freeing empty stale " + shm);
+          }
+          shm.free();
+        }
+      } else {
+        ShmId shmId = shm.getShmId();
+        full.remove(shmId); // The shm can't be full if we just freed a slot.
+        if (shm.isEmpty()) {
+          notFull.remove(shmId);
+  
+          // If the shared memory segment is now empty, we call shutdown(2) on
+          // the UNIX domain socket associated with it.  The DomainSocketWatcher,
+          // which is watching this socket, will call DfsClientShm#handle,
+          // cleaning up this shared memory segment.
+          //
+          // See #{DfsClientShmManager#domainSocketWatcher} for details about why
+          // we don't want to call DomainSocketWatcher#remove directly here.
+          //
+          // Note that we could experience 'fragmentation' here, where the
+          // DFSClient allocates a bunch of slots in different shared memory
+          // segments, and then frees most of them, but never fully empties out
+          // any segment.  We make some attempt to avoid this fragmentation by
+          // always allocating new slots out of the shared memory segment with the
+          // lowest ID, but it could still occur.  In most workloads,
+          // fragmentation should not be a major concern, since it doesn't impact
+          // peak file descriptor usage or the speed of allocation.
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": shutting down UNIX domain socket for " +
+                "empty " + shm);
+          }
+          shutdown(shm);
+        } else {
+          notFull.put(shmId, shm);
+        }
+      }
+    }
+    
+    /**
+     * Unregister a shared memory segment.
+     *
+     * Once a segment is unregistered, we will not allocate any more slots
+     * inside that segment.
+     *
+     * The DomainSocketWatcher calls this while holding the DomainSocketWatcher
+     * lock.
+     *
+     * @param shmId         The ID of the shared memory segment to unregister.
+     */
+    void unregisterShm(ShmId shmId) {
+      lock.lock();
+      try {
+        full.remove(shmId);
+        notFull.remove(shmId);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return String.format("EndpointShmManager(%s, parent=%s)",
+          datanode, DfsClientShmManager.this);
+    }
+
+    PerDatanodeVisitorInfo getVisitorInfo() {
+      return new PerDatanodeVisitorInfo(full, notFull, disabled);
+    }
+
+    final void shutdown(DfsClientShm shm) {
+      try {
+        shm.getPeer().getDomainSocket().shutdown();
+      } catch (IOException e) {
+        LOG.warn(this + ": error shutting down shm: got IOException calling " +
+            "shutdown(SHUT_RDWR)", e);
+      }
+    }
+  }
+
+  private boolean closed = false;
+
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * A condition variable which is signalled when we finish loading a segment
+   * from the Datanode.
+   */
+  private final Condition finishedLoading = lock.newCondition();
+
+  /**
+   * Information about each Datanode.
+   */
+  private final HashMap<DatanodeInfo, EndpointShmManager> datanodes =
+      new HashMap<DatanodeInfo, EndpointShmManager>(1);
+  
+  /**
+   * The DomainSocketWatcher which keeps track of the UNIX domain socket
+   * associated with each shared memory segment.
+   *
+   * Note: because the DomainSocketWatcher makes callbacks into this
+   * DfsClientShmManager object, you must MUST NOT attempt to take the
+   * DomainSocketWatcher lock while holding the DfsClientShmManager lock,
+   * or else deadlock might result.   This means that most DomainSocketWatcher
+   * methods are off-limits unless you release the manager lock first.
+   */
+  private final DomainSocketWatcher domainSocketWatcher;
+  
+  DfsClientShmManager(int interruptCheckPeriodMs) throws IOException {
+    this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs,
+        "client");
+  }
+  
+  public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
+      MutableBoolean usedPeer, ExtendedBlockId blockId,
+      String clientName) throws IOException {
+    lock.lock();
+    try {
+      if (closed) {
+        LOG.trace(this + ": the DfsClientShmManager isclosed.");
+        return null;
+      }
+      EndpointShmManager shmManager = datanodes.get(datanode);
+      if (shmManager == null) {
+        shmManager = new EndpointShmManager(datanode);
+        datanodes.put(datanode, shmManager);
+      }
+      return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
+    } finally {
+      lock.unlock();
+    }
+  }
+  
+  public void freeSlot(Slot slot) {
+    lock.lock();
+    try {
+      DfsClientShm shm = (DfsClientShm)slot.getShm();
+      shm.getEndpointShmManager().freeSlot(slot);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  public static class PerDatanodeVisitorInfo {
+    public final TreeMap<ShmId, DfsClientShm> full;
+    public final TreeMap<ShmId, DfsClientShm> notFull;
+    public final boolean disabled;
+
+    PerDatanodeVisitorInfo(TreeMap<ShmId, DfsClientShm> full,
+        TreeMap<ShmId, DfsClientShm> notFull, boolean disabled) {
+      this.full = full;
+      this.notFull = notFull;
+      this.disabled = disabled;
+    }
+  }
+
+  @VisibleForTesting
+  public interface Visitor {
+    void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+        throws IOException;
+  }
+
+  @VisibleForTesting
+  public void visit(Visitor visitor) throws IOException {
+    lock.lock();
+    try {
+      HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info = 
+          new HashMap<DatanodeInfo, PerDatanodeVisitorInfo>();
+      for (Entry<DatanodeInfo, EndpointShmManager> entry :
+            datanodes.entrySet()) {
+        info.put(entry.getKey(), entry.getValue().getVisitorInfo());
+      }
+      visitor.visit(info);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Close the DfsClientShmManager.
+   */
+  @Override
+  public void close() throws IOException {
+    lock.lock();
+    try {
+      if (closed) return;
+      closed = true;
+    } finally {
+      lock.unlock();
+    }
+    // When closed, the domainSocketWatcher will issue callbacks that mark
+    // all the outstanding DfsClientShm segments as stale.
+    try {
+      domainSocketWatcher.close();
+    } catch (Throwable e) {
+      LOG.debug("Exception in closing " + domainSocketWatcher, e);
+    }
+  }
+
+
+  @Override
+  public String toString() {
+    return String.format("ShortCircuitShmManager(%08x)",
+        System.identityHashCode(this));
+  }
+
+  @VisibleForTesting
+  public DomainSocketWatcher getDomainSocketWatcher() {
+    return domainSocketWatcher;
+  }
+}


Mime
View raw message