hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [3/4] hadoop git commit: HDFS-8934. Move ShortCircuitShm to hdfs-client. Contributed by Mingliang Liu.
Date Sat, 22 Aug 2015 20:30:53 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
new file mode 100644
index 0000000..78325a3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
@@ -0,0 +1,647 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.misc.Unsafe;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.primitives.Ints;
+
+/**
+ * A shared memory segment used to implement short-circuit reads.
+ */
+public class ShortCircuitShm {
+  private static final Logger LOG = LoggerFactory.getLogger(ShortCircuitShm.class);
+
+  protected static final int BYTES_PER_SLOT = 64;
+
+  private static final Unsafe unsafe = safetyDance();
+
+  private static Unsafe safetyDance() {
+    try {
+      Field f = Unsafe.class.getDeclaredField("theUnsafe");
+      f.setAccessible(true);
+      return (Unsafe)f.get(null);
+    } catch (Throwable e) {
+      LOG.error("failed to load misc.Unsafe", e);
+    }
+    return null;
+  }
+
+  /**
+   * Calculate the usable size of a shared memory segment.
+   * We round down to a multiple of the slot size and do some validation.
+   *
+   * @param stream The stream we're using.
+   * @return       The usable size of the shared memory segment.
+   */
+  private static int getUsableLength(FileInputStream stream)
+      throws IOException {
+    int intSize = Ints.checkedCast(stream.getChannel().size());
+    int slots = intSize / BYTES_PER_SLOT;
+    if (slots == 0) {
+      throw new IOException("size of shared memory segment was " +
+          intSize + ", but that is not enough to hold even one slot.");
+    }
+    return slots * BYTES_PER_SLOT;
+  }
+
+  /**
+   * Identifies a DfsClientShm.
+   */
+  public static class ShmId implements Comparable<ShmId> {
+    private static final Random random = new Random();
+    private final long hi;
+    private final long lo;
+
+    /**
+     * Generate a random ShmId.
+     * 
+     * We generate ShmIds randomly to prevent a malicious client from
+     * successfully guessing one and using that to interfere with another
+     * client.
+     */
+    public static ShmId createRandom() {
+      return new ShmId(random.nextLong(), random.nextLong());
+    }
+
+    public ShmId(long hi, long lo) {
+      this.hi = hi;
+      this.lo = lo;
+    }
+    
+    public long getHi() {
+      return hi;
+    }
+    
+    public long getLo() {
+      return lo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if ((o == null) || (o.getClass() != this.getClass())) {
+        return false;
+      }
+      ShmId other = (ShmId)o;
+      return new EqualsBuilder().
+          append(hi, other.hi).
+          append(lo, other.lo).
+          isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().
+          append(this.hi).
+          append(this.lo).
+          toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%016x%016x", hi, lo);
+    }
+
+    @Override
+    public int compareTo(ShmId other) {
+      return ComparisonChain.start().
+          compare(hi, other.hi).
+          compare(lo, other.lo).
+          result();
+    }
+  };
+
+  /**
+   * Uniquely identifies a slot.
+   */
+  public static class SlotId {
+    private final ShmId shmId;
+    private final int slotIdx;
+    
+    public SlotId(ShmId shmId, int slotIdx) {
+      this.shmId = shmId;
+      this.slotIdx = slotIdx;
+    }
+
+    public ShmId getShmId() {
+      return shmId;
+    }
+
+    public int getSlotIdx() {
+      return slotIdx;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if ((o == null) || (o.getClass() != this.getClass())) {
+        return false;
+      }
+      SlotId other = (SlotId)o;
+      return new EqualsBuilder().
+          append(shmId, other.shmId).
+          append(slotIdx, other.slotIdx).
+          isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().
+          append(this.shmId).
+          append(this.slotIdx).
+          toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("SlotId(%s:%d)", shmId.toString(), slotIdx);
+    }
+  }
+
+  public class SlotIterator implements Iterator<Slot> {
+    int slotIdx = -1;
+
+    @Override
+    public boolean hasNext() {
+      synchronized (ShortCircuitShm.this) {
+        return allocatedSlots.nextSetBit(slotIdx + 1) != -1;
+      }
+    }
+
+    @Override
+    public Slot next() {
+      synchronized (ShortCircuitShm.this) {
+        int nextSlotIdx = allocatedSlots.nextSetBit(slotIdx + 1);
+        if (nextSlotIdx == -1) {
+          throw new NoSuchElementException();
+        }
+        slotIdx = nextSlotIdx;
+        return slots[nextSlotIdx];
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("SlotIterator " +
+          "doesn't support removal");
+    }
+  }
+  
+  /**
+   * A slot containing information about a replica.
+   *
+   * The format is:
+   * word 0
+   *   bit 0:32   Slot flags (see below).
+   *   bit 33:63  Anchor count.
+   * word 1:7
+   *   Reserved for future use, such as statistics.
+   *   Padding is also useful for avoiding false sharing.
+   *
+   * Little-endian versus big-endian is not relevant here since both the client
+   * and the server reside on the same computer and use the same orientation.
+   */
+  public class Slot {
+    /**
+     * Flag indicating that the slot is valid.  
+     * 
+     * The DFSClient sets this flag when it allocates a new slot within one of
+     * its shared memory regions.
+     * 
+     * The DataNode clears this flag when the replica associated with this slot
+     * is no longer valid.  The client itself also clears this flag when it
+     * believes that the DataNode is no longer using this slot to communicate.
+     */
+    private static final long VALID_FLAG =          1L<<63;
+
+    /**
+     * Flag indicating that the slot can be anchored.
+     */
+    private static final long ANCHORABLE_FLAG =     1L<<62;
+
+    /**
+     * The slot address in memory.
+     */
+    private final long slotAddress;
+
+    /**
+     * BlockId of the block this slot is used for.
+     */
+    private final ExtendedBlockId blockId;
+
+    Slot(long slotAddress, ExtendedBlockId blockId) {
+      this.slotAddress = slotAddress;
+      this.blockId = blockId;
+    }
+
+    /**
+     * Get the short-circuit memory segment associated with this Slot.
+     *
+     * @return      The enclosing short-circuit memory segment.
+     */
+    public ShortCircuitShm getShm() {
+      return ShortCircuitShm.this;
+    }
+
+    /**
+     * Get the ExtendedBlockId associated with this slot.
+     *
+     * @return      The ExtendedBlockId of this slot.
+     */
+    public ExtendedBlockId getBlockId() {
+      return blockId;
+    }
+
+    /**
+     * Get the SlotId of this slot, containing both shmId and slotIdx.
+     *
+     * @return      The SlotId of this slot.
+     */
+    public SlotId getSlotId() {
+      return new SlotId(getShmId(), getSlotIdx());
+    }
+
+    /**
+     * Get the Slot index.
+     *
+     * @return      The index of this slot.
+     */
+    public int getSlotIdx() {
+      return Ints.checkedCast(
+          (slotAddress - baseAddress) / BYTES_PER_SLOT);
+    }
+
+    /**
+     * Clear the slot.
+     */
+    void clear() {
+      unsafe.putLongVolatile(null, this.slotAddress, 0);
+    }
+
+    private boolean isSet(long flag) {
+      long prev = unsafe.getLongVolatile(null, this.slotAddress);
+      return (prev & flag) != 0;
+    }
+
+    private void setFlag(long flag) {
+      long prev;
+      do {
+        prev = unsafe.getLongVolatile(null, this.slotAddress);
+        if ((prev & flag) != 0) {
+          return;
+        }
+      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
+                  prev, prev | flag));
+    }
+
+    private void clearFlag(long flag) {
+      long prev;
+      do {
+        prev = unsafe.getLongVolatile(null, this.slotAddress);
+        if ((prev & flag) == 0) {
+          return;
+        }
+      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
+                  prev, prev & (~flag)));
+    }
+    
+    public boolean isValid() {
+      return isSet(VALID_FLAG);
+    }
+
+    public void makeValid() {
+      setFlag(VALID_FLAG);
+    }
+
+    public void makeInvalid() {
+      clearFlag(VALID_FLAG);
+    }
+
+    public boolean isAnchorable() {
+      return isSet(ANCHORABLE_FLAG);
+    }
+
+    public void makeAnchorable() {
+      setFlag(ANCHORABLE_FLAG);
+    }
+
+    public void makeUnanchorable() {
+      clearFlag(ANCHORABLE_FLAG);
+    }
+
+    public boolean isAnchored() {
+      long prev = unsafe.getLongVolatile(null, this.slotAddress);
+      if ((prev & VALID_FLAG) == 0) {
+        // Slot is no longer valid.
+        return false;
+      }
+      return ((prev & 0x7fffffff) != 0);
+    }
+
+    /**
+     * Try to add an anchor for a given slot.
+     *
+     * When a slot is anchored, we know that the block it refers to is resident
+     * in memory.
+     *
+     * @return          True if the slot is anchored.
+     */
+    public boolean addAnchor() {
+      long prev;
+      do {
+        prev = unsafe.getLongVolatile(null, this.slotAddress);
+        if ((prev & VALID_FLAG) == 0) {
+          // Slot is no longer valid.
+          return false;
+        }
+        if ((prev & ANCHORABLE_FLAG) == 0) {
+          // Slot can't be anchored right now.
+          return false;
+        }
+        if ((prev & 0x7fffffff) == 0x7fffffff) {
+          // Too many other threads have anchored the slot (2 billion?)
+          return false;
+        }
+      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
+                  prev, prev + 1));
+      return true;
+    }
+
+    /**
+     * Remove an anchor for a given slot.
+     */
+    public void removeAnchor() {
+      long prev;
+      do {
+        prev = unsafe.getLongVolatile(null, this.slotAddress);
+        Preconditions.checkState((prev & 0x7fffffff) != 0,
+            "Tried to remove anchor for slot " + slotAddress +", which was " +
+            "not anchored.");
+      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
+                  prev, prev - 1));
+    }
+
+    @Override
+    public String toString() {
+      return "Slot(slotIdx=" + getSlotIdx() + ", shm=" + getShm() + ")";
+    }
+  }
+
+  /**
+   * ID for this SharedMemorySegment.
+   */
+  private final ShmId shmId;
+
+  /**
+   * The base address of the memory-mapped file.
+   */
+  private final long baseAddress;
+
+  /**
+   * The mmapped length of the shared memory segment
+   */
+  private final int mmappedLength;
+
+  /**
+   * The slots associated with this shared memory segment.
+   * slot[i] contains the slot at offset i * BYTES_PER_SLOT,
+   * or null if that slot is not allocated.
+   */
+  private final Slot slots[];
+
+  /**
+   * A bitset where each bit represents a slot which is in use.
+   */
+  private final BitSet allocatedSlots;
+
+  /**
+   * Create the ShortCircuitShm.
+   * 
+   * @param shmId       The ID to use.
+   * @param stream      The stream that we're going to use to create this 
+   *                    shared memory segment.
+   *                    
+   *                    Although this is a FileInputStream, we are going to
+   *                    assume that the underlying file descriptor is writable
+   *                    as well as readable. It would be more appropriate to use
+   *                    a RandomAccessFile here, but that class does not have
+   *                    any public accessor which returns a FileDescriptor,
+   *                    unlike FileInputStream.
+   */
+  public ShortCircuitShm(ShmId shmId, FileInputStream stream)
+        throws IOException {
+    if (!NativeIO.isAvailable()) {
+      throw new UnsupportedOperationException("NativeIO is not available.");
+    }
+    if (Shell.WINDOWS) {
+      throw new UnsupportedOperationException(
+          "DfsClientShm is not yet implemented for Windows.");
+    }
+    if (unsafe == null) {
+      throw new UnsupportedOperationException(
+          "can't use DfsClientShm because we failed to " +
+          "load misc.Unsafe.");
+    }
+    this.shmId = shmId;
+    this.mmappedLength = getUsableLength(stream);
+    this.baseAddress = POSIX.mmap(stream.getFD(), 
+        POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength);
+    this.slots = new Slot[mmappedLength / BYTES_PER_SLOT];
+    this.allocatedSlots = new BitSet(slots.length);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("creating " + this.getClass().getSimpleName() +
+          "(shmId=" + shmId +
+          ", mmappedLength=" + mmappedLength +
+          ", baseAddress=" + String.format("%x", baseAddress) +
+          ", slots.length=" + slots.length + ")");
+    }
+  }
+
+  public final ShmId getShmId() {
+    return shmId;
+  }
+  
+  /**
+   * Determine if this shared memory object is empty.
+   *
+   * @return    True if the shared memory object is empty.
+   */
+  synchronized final public boolean isEmpty() {
+    return allocatedSlots.nextSetBit(0) == -1;
+  }
+
+  /**
+   * Determine if this shared memory object is full.
+   *
+   * @return    True if the shared memory object is full.
+   */
+  synchronized final public boolean isFull() {
+    return allocatedSlots.nextClearBit(0) >= slots.length;
+  }
+
+  /**
+   * Calculate the base address of a slot.
+   *
+   * @param slotIdx   Index of the slot.
+   * @return          The base address of the slot.
+   */
+  private final long calculateSlotAddress(int slotIdx) {
+    long offset = slotIdx;
+    offset *= BYTES_PER_SLOT;
+    return this.baseAddress + offset;
+  }
+
+  /**
+   * Allocate a new slot and register it.
+   *
+   * This function chooses an empty slot, initializes it, and then returns
+   * the relevant Slot object.
+   *
+   * @return    The new slot.
+   */
+  synchronized public final Slot allocAndRegisterSlot(
+      ExtendedBlockId blockId) {
+    int idx = allocatedSlots.nextClearBit(0);
+    if (idx >= slots.length) {
+      throw new RuntimeException(this + ": no more slots are available.");
+    }
+    allocatedSlots.set(idx, true);
+    Slot slot = new Slot(calculateSlotAddress(idx), blockId);
+    slot.clear();
+    slot.makeValid();
+    slots[idx] = slot;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots +
+                  StringUtils.getStackTrace(Thread.currentThread()));
+    }
+    return slot;
+  }
+
+  synchronized public final Slot getSlot(int slotIdx)
+      throws InvalidRequestException {
+    if (!allocatedSlots.get(slotIdx)) {
+      throw new InvalidRequestException(this + ": slot " + slotIdx +
+          " does not exist.");
+    }
+    return slots[slotIdx];
+  }
+
+  /**
+   * Register a slot.
+   *
+   * This function looks at a slot which has already been initialized (by
+   * another process), and registers it with us.  Then, it returns the 
+   * relevant Slot object.
+   *
+   * @return    The slot.
+   *
+   * @throws InvalidRequestException
+   *            If the slot index we're trying to allocate has not been
+   *            initialized, or is already in use.
+   */
+  synchronized public final Slot registerSlot(int slotIdx,
+      ExtendedBlockId blockId) throws InvalidRequestException {
+    if (slotIdx < 0) {
+      throw new InvalidRequestException(this + ": invalid negative slot " +
+          "index " + slotIdx);
+    }
+    if (slotIdx >= slots.length) {
+      throw new InvalidRequestException(this + ": invalid slot " +
+          "index " + slotIdx);
+    }
+    if (allocatedSlots.get(slotIdx)) {
+      throw new InvalidRequestException(this + ": slot " + slotIdx +
+          " is already in use.");
+    }
+    Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId);
+    if (!slot.isValid()) {
+      throw new InvalidRequestException(this + ": slot " + slotIdx +
+          " is not marked as valid.");
+    }
+    slots[slotIdx] = slot;
+    allocatedSlots.set(slotIdx, true);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots +
+                  StringUtils.getStackTrace(Thread.currentThread()));
+    }
+    return slot;
+  }
+
+  /**
+   * Unregisters a slot.
+   * 
+   * This doesn't alter the contents of the slot.  It just means
+   *
+   * @param slotIdx  Index of the slot to unregister.
+   */
+  synchronized public final void unregisterSlot(int slotIdx) {
+    Preconditions.checkState(allocatedSlots.get(slotIdx),
+        "tried to unregister slot " + slotIdx + ", which was not registered.");
+    allocatedSlots.set(slotIdx, false);
+    slots[slotIdx] = null;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": unregisterSlot " + slotIdx);
+    }
+  }
+  
+  /**
+   * Iterate over all allocated slots.
+   * 
+   * Note that this method isn't safe if 
+   *
+   * @return        The slot iterator.
+   */
+  public SlotIterator slotIterator() {
+    return new SlotIterator();
+  }
+
+  public void free() {
+    try {
+      POSIX.munmap(baseAddress, mmappedLength);
+    } catch (IOException e) {
+      LOG.warn(this + ": failed to munmap", e);
+    }
+    LOG.trace(this + ": freed");
+  }
+  
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "(" + shmId + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java
new file mode 100644
index 0000000..17365fb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An InputStream implementations which reads from some other InputStream
+ * but expects an exact number of bytes. Any attempts to read past the
+ * specified number of bytes will return as if the end of the stream
+ * was reached. If the end of the underlying stream is reached prior to
+ * the specified number of bytes, an EOFException is thrown.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ExactSizeInputStream extends FilterInputStream {
+  private int remaining;
+
+  /**
+   * Construct an input stream that will read no more than
+   * 'numBytes' bytes.
+   * 
+   * If an EOF occurs on the underlying stream before numBytes
+   * bytes have been read, an EOFException will be thrown.
+   * 
+   * @param in the inputstream to wrap
+   * @param numBytes the number of bytes to read
+   */
+  public ExactSizeInputStream(InputStream in, int numBytes) {
+    super(in);
+    Preconditions.checkArgument(numBytes >= 0,
+        "Negative expected bytes: ", numBytes);
+    this.remaining = numBytes;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return Math.min(super.available(), remaining);
+  }
+
+  @Override
+  public int read() throws IOException {
+    // EOF if we reached our limit
+    if (remaining <= 0) {
+      return -1;
+    }
+    final int result = super.read();
+    if (result >= 0) {
+      --remaining;
+    } else if (remaining > 0) {
+      // Underlying stream reached EOF but we haven't read the expected
+      // number of bytes.
+      throw new EOFException(
+          "Premature EOF. Expected " + remaining + "more bytes");
+    }
+    return result;
+  }
+
+  @Override
+  public int read(final byte[] b, final int off, int len)
+                  throws IOException {
+    if (remaining <= 0) {
+      return -1;
+    }
+    len = Math.min(len, remaining);
+    final int result = super.read(b, off, len);
+    if (result >= 0) {
+      remaining -= result;
+    } else if (remaining > 0) {
+      // Underlying stream reached EOF but we haven't read the expected
+      // number of bytes.
+      throw new EOFException(
+          "Premature EOF. Expected " + remaining + "more bytes");
+    }
+    return result;
+  }
+
+  @Override
+  public long skip(final long n) throws IOException {
+    final long result = super.skip(Math.min(n, remaining));
+    if (result > 0) {
+      remaining -= result;
+    } else if (remaining > 0) {
+      // Underlying stream reached EOF but we haven't read the expected
+      // number of bytes.
+      throw new EOFException(
+          "Premature EOF. Expected " + remaining + "more bytes");
+    }
+    return result;
+  }
+  
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  @Override
+  public void mark(int readlimit) {
+    throw new UnsupportedOperationException();
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index dc5bff1..3246919 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -482,6 +482,8 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8823. Move replication factor into individual blocks. (wheat9)
 
+    HDFS-8934. Move ShortCircuitShm to hdfs-client. (Mingliang Liu via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 8517173..fec6b85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -592,7 +592,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
         failureInjector.getSupportsReceiptVerification());
     DataInputStream in = new DataInputStream(peer.getInputStream());
     BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-        PBHelper.vintPrefixed(in));
+        PBHelperClient.vintPrefixed(in));
     DomainSocket sock = peer.getDomainSocket();
     failureInjector.injectRequestFileDescriptorsFailure();
     switch (resp.getStatus()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index a7b518e..12646b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -149,7 +149,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -1928,7 +1928,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
           new Sender(out).blockChecksum(block, lb.getBlockToken());
 
           final BlockOpResponseProto reply =
-            BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
+            BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
 
           String logInfo = "for block " + block + " from datanode " + datanodes[j];
           DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
@@ -1960,7 +1960,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
           // read crc-type
           final DataChecksum.Type ct;
           if (checksumData.hasCrcType()) {
-            ct = PBHelper.convert(checksumData
+            ct = PBHelperClient.convert(checksumData
                 .getCrcType());
           } else {
             LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
@@ -2088,11 +2088,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
           0, 1, true, CachingStrategy.newDefaultStrategy());
       final BlockOpResponseProto reply =
-          BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
+          BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
       String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn;
       DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
 
-      return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
+      return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
     } finally {
       IOUtils.cleanup(null, pair.in, pair.out);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 8dd85b7..a975312 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -1245,7 +1245,7 @@ class DataStreamer extends Daemon {
 
       //ack
       BlockOpResponseProto response =
-          BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
+          BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
       if (SUCCESS != response.getStatus()) {
         throw new IOException("Failed to add a datanode");
       }
@@ -1524,7 +1524,7 @@ class DataStreamer extends Daemon {
 
         // receive ack for connect
         BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-            PBHelper.vintPrefixed(blockReplyStream));
+            PBHelperClient.vintPrefixed(blockReplyStream));
         pipelineStatus = resp.getStatus();
         firstBadLink = resp.getFirstBadLink();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
deleted file mode 100644
index 7b9e8e3..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-
-/**
- * An immutable key which identifies a block.
- */
-@InterfaceAudience.Private
-final public class ExtendedBlockId {
-  /**
-   * The block ID for this block.
-   */
-  private final long blockId;
-
-  /**
-   * The block pool ID for this block.
-   */
-  private final String bpId;
-
-  public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) {
-    return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
-  }
-  
-  public ExtendedBlockId(long blockId, String bpId) {
-    this.blockId = blockId;
-    this.bpId = bpId;
-  }
-
-  public long getBlockId() {
-    return this.blockId;
-  }
-
-  public String getBlockPoolId() {
-    return this.bpId;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if ((o == null) || (o.getClass() != this.getClass())) {
-      return false;
-    }
-    ExtendedBlockId other = (ExtendedBlockId)o;
-    return new EqualsBuilder().
-        append(blockId, other.blockId).
-        append(bpId, other.bpId).
-        isEquals();
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().
-        append(this.blockId).
-        append(this.bpId).
-        toHashCode();
-  }
-
-  @Override
-  public String toString() {
-    return new StringBuilder().append(blockId).
-        append("_").append(bpId).toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index d70f419..05a9f2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
@@ -414,7 +414,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
         new BufferedInputStream(peer.getInputStream(), bufferSize));
     
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelper.vintPrefixed(in));
+        PBHelperClient.vintPrefixed(in));
     RemoteBlockReader2.checkSuccess(status, peer, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index c368d65..4c23d36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
@@ -417,7 +417,7 @@ public class RemoteBlockReader2  implements BlockReader {
     DataInputStream in = new DataInputStream(peer.getInputStream());
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelper.vintPrefixed(in));
+        PBHelperClient.vintPrefixed(in));
     checkSuccess(status, peer, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
deleted file mode 100644
index 4792b0e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.net;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.channels.ReadableByteChannel;
-
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * Represents a peer that we communicate with by using blocking I/O 
- * on a UNIX domain socket.
- */
-@InterfaceAudience.Private
-public class DomainPeer implements Peer {
-  private final DomainSocket socket;
-  private final OutputStream out;
-  private final InputStream in;
-  private final ReadableByteChannel channel;
-
-  public DomainPeer(DomainSocket socket) {
-    this.socket = socket;
-    this.out = socket.getOutputStream();
-    this.in = socket.getInputStream();
-    this.channel = socket.getChannel();
-  }
-
-  @Override
-  public ReadableByteChannel getInputStreamChannel() {
-    return channel;
-  }
-
-  @Override
-  public void setReadTimeout(int timeoutMs) throws IOException {
-    socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs);
-  }
-
-  @Override
-  public int getReceiveBufferSize() throws IOException {
-    return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
-  }
-
-  @Override
-  public boolean getTcpNoDelay() throws IOException {
-    /* No TCP, no TCP_NODELAY. */
-    return false;
-  }
-
-  @Override
-  public void setWriteTimeout(int timeoutMs) throws IOException {
-    socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs);
-  }
-
-  @Override
-  public boolean isClosed() {
-    return !socket.isOpen();
-  }
-
-  @Override
-  public void close() throws IOException {
-    socket.close();
-  }
-
-  @Override
-  public String getRemoteAddressString() {
-    return "unix:" + socket.getPath();
-  }
-
-  @Override
-  public String getLocalAddressString() {
-    return "<local>";
-  }
-
-  @Override
-  public InputStream getInputStream() throws IOException {
-    return in;
-  }
-
-  @Override
-  public OutputStream getOutputStream() throws IOException {
-    return out;
-  }
-
-  @Override
-  public boolean isLocal() {
-    /* UNIX domain sockets can only be used for local communication. */
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return "DomainPeer(" + getRemoteAddressString() + ")";
-  }
-
-  @Override
-  public DomainSocket getDomainSocket() {
-    return socket;
-  }
-
-  @Override
-  public boolean hasSecureChannel() {
-    //
-    // Communication over domain sockets is assumed to be secure, since it
-    // doesn't pass over any network.  We also carefully control the privileges
-    // that can be used on the domain socket inode and its parent directories.
-    // See #{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0}
-    // for details.
-    //
-    // So unless you are running as root or the hdfs superuser, you cannot
-    // launch a man-in-the-middle attach on UNIX domain socket traffic.
-    //
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
deleted file mode 100644
index 42cf287..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.net;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.channels.ReadableByteChannel;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.net.unix.DomainSocket;
-
-/**
- * Represents a connection to a peer.
- */
-@InterfaceAudience.Private
-public interface Peer extends Closeable {
-  /**
-   * @return                The input stream channel associated with this
-   *                        peer, or null if it has none.
-   */
-  public ReadableByteChannel getInputStreamChannel();
-
-  /**
-   * Set the read timeout on this peer.
-   *
-   * @param timeoutMs       The timeout in milliseconds.
-   */
-  public void setReadTimeout(int timeoutMs) throws IOException;
-
-  /**
-   * @return                The receive buffer size.
-   */
-  public int getReceiveBufferSize() throws IOException;
-
-  /**
-   * @return                True if TCP_NODELAY is turned on.
-   */
-  public boolean getTcpNoDelay() throws IOException;
-
-  /**
-   * Set the write timeout on this peer.
-   *
-   * Note: this is not honored for BasicInetPeer.
-   * See {@link BasicSocketPeer#setWriteTimeout} for details.
-   * 
-   * @param timeoutMs       The timeout in milliseconds.
-   */
-  public void setWriteTimeout(int timeoutMs) throws IOException;
-
-  /**
-   * @return                true only if the peer is closed.
-   */
-  public boolean isClosed();
-  
-  /**
-   * Close the peer.
-   *
-   * It's safe to re-close a Peer that is already closed.
-   */
-  public void close() throws IOException;
-
-  /**
-   * @return               A string representing the remote end of our 
-   *                       connection to the peer.
-   */
-  public String getRemoteAddressString();
-
-  /**
-   * @return               A string representing the local end of our 
-   *                       connection to the peer.
-   */
-  public String getLocalAddressString();
-  
-  /**
-   * @return               An InputStream associated with the Peer.
-   *                       This InputStream will be valid until you close
-   *                       this peer with Peer#close.
-   */
-  public InputStream getInputStream() throws IOException;
-  
-  /**
-   * @return               An OutputStream associated with the Peer.
-   *                       This OutputStream will be valid until you close
-   *                       this peer with Peer#close.
-   */
-  public OutputStream getOutputStream() throws IOException;
-
-  /**
-   * @return               True if the peer resides on the same
-   *                       computer as we.
-   */
-  public boolean isLocal();
-
-  /**
-   * @return               The DomainSocket associated with the current
-   *                       peer, or null if there is none.
-   */
-  public DomainSocket getDomainSocket();
-  
-  /**
-   * Return true if the channel is secure.
-   *
-   * @return               True if our channel to this peer is not
-   *                       susceptible to man-in-the-middle attacks.
-   */
-  public boolean hasSecureChannel();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
deleted file mode 100644
index 5f86e52..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/** Block Construction Stage */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public enum BlockConstructionStage {
-  /** The enumerates are always listed as regular stage followed by the
-   * recovery stage. 
-   * Changing this order will make getRecoveryStage not working.
-   */
-  // pipeline set up for block append
-  PIPELINE_SETUP_APPEND,
-  // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
-  PIPELINE_SETUP_APPEND_RECOVERY,
-  // data streaming
-  DATA_STREAMING,
-  // pipeline setup for failed data streaming recovery
-  PIPELINE_SETUP_STREAMING_RECOVERY,
-  // close the block and pipeline
-  PIPELINE_CLOSE,
-  // Recover a failed PIPELINE_CLOSE
-  PIPELINE_CLOSE_RECOVERY,
-  // pipeline set up for block creation
-  PIPELINE_SETUP_CREATE,
-  // transfer RBW for adding datanodes
-  TRANSFER_RBW,
-  // transfer Finalized for adding datanodes
-  TRANSFER_FINALIZED;
-  
-  final static private byte RECOVERY_BIT = (byte)1;
-  
-  /**
-   * get the recovery stage of this stage
-   */
-  public BlockConstructionStage getRecoveryStage() {
-    if (this == PIPELINE_SETUP_CREATE) {
-      throw new IllegalArgumentException( "Unexpected blockStage " + this);
-    } else {
-      return values()[ordinal()|RECOVERY_BIT];
-    }
-  }
-}    

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
deleted file mode 100644
index 284281a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceInfo;
-import org.apache.htrace.TraceScope;
-
-/**
- * Static utilities for dealing with the protocol buffers used by the
- * Data Transfer Protocol.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public abstract class DataTransferProtoUtil {
-  static BlockConstructionStage fromProto(
-      OpWriteBlockProto.BlockConstructionStage stage) {
-    return BlockConstructionStage.valueOf(stage.name());
-  }
-
-  static OpWriteBlockProto.BlockConstructionStage toProto(
-      BlockConstructionStage stage) {
-    return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name());
-  }
-
-  public static ChecksumProto toProto(DataChecksum checksum) {
-    ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType());
-    // ChecksumType#valueOf never returns null
-    return ChecksumProto.newBuilder()
-      .setBytesPerChecksum(checksum.getBytesPerChecksum())
-      .setType(type)
-      .build();
-  }
-
-  public static DataChecksum fromProto(ChecksumProto proto) {
-    if (proto == null) return null;
-
-    int bytesPerChecksum = proto.getBytesPerChecksum();
-    DataChecksum.Type type = PBHelper.convert(proto.getType());
-    return DataChecksum.newDataChecksum(type, bytesPerChecksum);
-  }
-
-  static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
-      String client, Token<BlockTokenIdentifier> blockToken) {
-    ClientOperationHeaderProto header =
-      ClientOperationHeaderProto.newBuilder()
-        .setBaseHeader(buildBaseHeader(blk, blockToken))
-        .setClientName(client)
-        .build();
-    return header;
-  }
-
-  static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
-      Token<BlockTokenIdentifier> blockToken) {
-    BaseHeaderProto.Builder builder =  BaseHeaderProto.newBuilder()
-      .setBlock(PBHelper.convert(blk))
-      .setToken(PBHelper.convert(blockToken));
-    if (Trace.isTracing()) {
-      Span s = Trace.currentSpan();
-      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
-          .setTraceId(s.getTraceId())
-          .setParentId(s.getSpanId()));
-    }
-    return builder.build();
-  }
-
-  public static TraceInfo fromProto(DataTransferTraceInfoProto proto) {
-    if (proto == null) return null;
-    if (!proto.hasTraceId()) return null;
-    return new TraceInfo(proto.getTraceId(), proto.getParentId());
-  }
-
-  public static TraceScope continueTraceSpan(ClientOperationHeaderProto header,
-      String description) {
-    return continueTraceSpan(header.getBaseHeader(), description);
-  }
-
-  public static TraceScope continueTraceSpan(BaseHeaderProto header,
-      String description) {
-    return continueTraceSpan(header.getTraceInfo(), description);
-  }
-
-  public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto,
-      String description) {
-    TraceScope scope = null;
-    TraceInfo info = fromProto(proto);
-    if (info != null) {
-      scope = Trace.startSpan(description, info);
-    }
-    return scope;
-  }
-
-  public static void checkBlockOpStatus(
-          BlockOpResponseProto response,
-          String logInfo) throws IOException {
-    if (response.getStatus() != Status.SUCCESS) {
-      if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
-        throw new InvalidBlockTokenException(
-          "Got access token error"
-          + ", status message " + response.getMessage()
-          + ", " + logInfo
-        );
-      } else {
-        throw new IOException(
-          "Got error"
-          + ", status message " + response.getMessage()
-          + ", " + logInfo
-        );
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
deleted file mode 100644
index 48e931d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-
-/**
- * Transfer data to/from datanode using a streaming protocol.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface DataTransferProtocol {
-  public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
-  
-  /** Version for data transfers between clients and datanodes
-   * This should change when serialization of DatanodeInfo, not just
-   * when protocol changes. It is not very obvious. 
-   */
-  /*
-   * Version 28:
-   *    Declare methods in DataTransferProtocol interface.
-   */
-  public static final int DATA_TRANSFER_VERSION = 28;
-
-  /** 
-   * Read a block.
-   * 
-   * @param blk the block being read.
-   * @param blockToken security token for accessing the block.
-   * @param clientName client's name.
-   * @param blockOffset offset of the block.
-   * @param length maximum number of bytes for this read.
-   * @param sendChecksum if false, the DN should skip reading and sending
-   *        checksums
-   * @param cachingStrategy  The caching strategy to use.
-   */
-  public void readBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final long blockOffset,
-      final long length,
-      final boolean sendChecksum,
-      final CachingStrategy cachingStrategy) throws IOException;
-
-  /**
-   * Write a block to a datanode pipeline.
-   * The receiver datanode of this call is the next datanode in the pipeline.
-   * The other downstream datanodes are specified by the targets parameter.
-   * Note that the receiver {@link DatanodeInfo} is not required in the
-   * parameter list since the receiver datanode knows its info.  However, the
-   * {@link StorageType} for storing the replica in the receiver datanode is a 
-   * parameter since the receiver datanode may support multiple storage types.
-   *
-   * @param blk the block being written.
-   * @param storageType for storing the replica in the receiver datanode.
-   * @param blockToken security token for accessing the block.
-   * @param clientName client's name.
-   * @param targets other downstream datanodes in the pipeline.
-   * @param targetStorageTypes target {@link StorageType}s corresponding
-   *                           to the target datanodes.
-   * @param source source datanode.
-   * @param stage pipeline stage.
-   * @param pipelineSize the size of the pipeline.
-   * @param minBytesRcvd minimum number of bytes received.
-   * @param maxBytesRcvd maximum number of bytes received.
-   * @param latestGenerationStamp the latest generation stamp of the block.
-   * @param pinning whether to pin the block, so Balancer won't move it.
-   * @param targetPinnings whether to pin the block on target datanode
-   */
-  public void writeBlock(final ExtendedBlock blk,
-      final StorageType storageType, 
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes, 
-      final DatanodeInfo source,
-      final BlockConstructionStage stage,
-      final int pipelineSize,
-      final long minBytesRcvd,
-      final long maxBytesRcvd,
-      final long latestGenerationStamp,
-      final DataChecksum requestedChecksum,
-      final CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist,
-      final boolean pinning,
-      final boolean[] targetPinnings) throws IOException;
-  /**
-   * Transfer a block to another datanode.
-   * The block stage must be
-   * either {@link BlockConstructionStage#TRANSFER_RBW}
-   * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
-   * 
-   * @param blk the block being transferred.
-   * @param blockToken security token for accessing the block.
-   * @param clientName client's name.
-   * @param targets target datanodes.
-   */
-  public void transferBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes) throws IOException;
-
-  /**
-   * Request short circuit access file descriptors from a DataNode.
-   *
-   * @param blk             The block to get file descriptors for.
-   * @param blockToken      Security token for accessing the block.
-   * @param slotId          The shared memory slot id to use, or null 
-   *                          to use no slot id.
-   * @param maxVersion      Maximum version of the block data the client 
-   *                          can understand.
-   * @param supportsReceiptVerification  True if the client supports
-   *                          receipt verification.
-   */
-  public void requestShortCircuitFds(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
-        throws IOException;
-
-  /**
-   * Release a pair of short-circuit FDs requested earlier.
-   *
-   * @param slotId          SlotID used by the earlier file descriptors.
-   */
-  public void releaseShortCircuitFds(final SlotId slotId) throws IOException;
-
-  /**
-   * Request a short circuit shared memory area from a DataNode.
-   * 
-   * @param clientName       The name of the client.
-   */
-  public void requestShortCircuitShm(String clientName) throws IOException;
-  
-  /**
-   * Receive a block from a source datanode
-   * and then notifies the namenode
-   * to remove the copy from the original datanode.
-   * Note that the source datanode and the original datanode can be different.
-   * It is used for balancing purpose.
-   * 
-   * @param blk the block being replaced.
-   * @param storageType the {@link StorageType} for storing the block.
-   * @param blockToken security token for accessing the block.
-   * @param delHint the hint for deleting the block in the original datanode.
-   * @param source the source datanode for receiving the block.
-   */
-  public void replaceBlock(final ExtendedBlock blk,
-      final StorageType storageType, 
-      final Token<BlockTokenIdentifier> blockToken,
-      final String delHint,
-      final DatanodeInfo source) throws IOException;
-
-  /**
-   * Copy a block. 
-   * It is used for balancing purpose.
-   * 
-   * @param blk the block being copied.
-   * @param blockToken security token for accessing the block.
-   */
-  public void copyBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException;
-
-  /**
-   * Get block checksum (MD5 of CRC32).
-   * 
-   * @param blk a block.
-   * @param blockToken security token for accessing the block.
-   * @throws IOException
-   */
-  public void blockChecksum(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
deleted file mode 100644
index 3077498..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/** Operation */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public enum Op {
-  WRITE_BLOCK((byte)80),
-  READ_BLOCK((byte)81),
-  READ_METADATA((byte)82),
-  REPLACE_BLOCK((byte)83),
-  COPY_BLOCK((byte)84),
-  BLOCK_CHECKSUM((byte)85),
-  TRANSFER_BLOCK((byte)86),
-  REQUEST_SHORT_CIRCUIT_FDS((byte)87),
-  RELEASE_SHORT_CIRCUIT_FDS((byte)88),
-  REQUEST_SHORT_CIRCUIT_SHM((byte)89),
-  CUSTOM((byte)127);
-
-  /** The code for this operation. */
-  public final byte code;
-  
-  private Op(byte code) {
-    this.code = code;
-  }
-  
-  private static final int FIRST_CODE = values()[0].code;
-  /** Return the object represented by the code. */
-  private static Op valueOf(byte code) {
-    final int i = (code & 0xff) - FIRST_CODE;
-    return i < 0 || i >= values().length? null: values()[i];
-  }
-
-  /** Read from in */
-  public static Op read(DataInput in) throws IOException {
-    return valueOf(in.readByte());
-  }
-
-  /** Write to out */
-  public void write(DataOutput out) throws IOException {
-    out.write(code);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
index a811f39..44f38c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer;
 
-import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
 
 import java.io.IOException;
 import java.io.InputStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index d435543..694f521 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.protocol.datatransfer;
 
 import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
 
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
 import org.apache.htrace.TraceScope;
@@ -136,7 +137,7 @@ public abstract class Receiver implements DataTransferProtocol {
         proto.getClass().getSimpleName());
     try {
       writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
-          PBHelper.convertStorageType(proto.getStorageType()),
+          PBHelperClient.convertStorageType(proto.getStorageType()),
           PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
           proto.getHeader().getClientName(),
           targets,
@@ -228,7 +229,7 @@ public abstract class Receiver implements DataTransferProtocol {
         proto.getClass().getSimpleName());
     try {
       replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
-          PBHelper.convertStorageType(proto.getStorageType()),
+          PBHelperClient.convertStorageType(proto.getStorageType()),
           PBHelper.convert(proto.getHeader().getToken()),
           proto.getDelHint(),
           PBHelper.convert(proto.getSource()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
deleted file mode 100644
index df69125..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto;
-
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-
-import org.apache.htrace.Trace;
-import org.apache.htrace.Span;
-
-import com.google.protobuf.Message;
-
-/** Sender */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class Sender implements DataTransferProtocol {
-  private final DataOutputStream out;
-
-  /** Create a sender for DataTransferProtocol with a output stream. */
-  public Sender(final DataOutputStream out) {
-    this.out = out;    
-  }
-
-  /** Initialize a operation. */
-  private static void op(final DataOutput out, final Op op
-      ) throws IOException {
-    out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-    op.write(out);
-  }
-
-  private static void send(final DataOutputStream out, final Op opcode,
-      final Message proto) throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
-          + ": " + proto);
-    }
-    op(out, opcode);
-    proto.writeDelimitedTo(out);
-    out.flush();
-  }
-
-  static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
-    CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
-    if (cachingStrategy.getReadahead() != null) {
-      builder.setReadahead(cachingStrategy.getReadahead().longValue());
-    }
-    if (cachingStrategy.getDropBehind() != null) {
-      builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
-    }
-    return builder.build();
-  }
-
-  @Override
-  public void readBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final long blockOffset,
-      final long length,
-      final boolean sendChecksum,
-      final CachingStrategy cachingStrategy) throws IOException {
-
-    OpReadBlockProto proto = OpReadBlockProto.newBuilder()
-      .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
-      .setOffset(blockOffset)
-      .setLen(length)
-      .setSendChecksums(sendChecksum)
-      .setCachingStrategy(getCachingStrategy(cachingStrategy))
-      .build();
-
-    send(out, Op.READ_BLOCK, proto);
-  }
-  
-
-  @Override
-  public void writeBlock(final ExtendedBlock blk,
-      final StorageType storageType, 
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes, 
-      final DatanodeInfo source,
-      final BlockConstructionStage stage,
-      final int pipelineSize,
-      final long minBytesRcvd,
-      final long maxBytesRcvd,
-      final long latestGenerationStamp,
-      DataChecksum requestedChecksum,
-      final CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist,
-      final boolean pinning,
-      final boolean[] targetPinnings) throws IOException {
-    ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
-        blk, clientName, blockToken);
-    
-    ChecksumProto checksumProto =
-      DataTransferProtoUtil.toProto(requestedChecksum);
-
-    OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
-      .setHeader(header)
-      .setStorageType(PBHelper.convertStorageType(storageType))
-      .addAllTargets(PBHelper.convert(targets, 1))
-      .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
-      .setStage(toProto(stage))
-      .setPipelineSize(pipelineSize)
-      .setMinBytesRcvd(minBytesRcvd)
-      .setMaxBytesRcvd(maxBytesRcvd)
-      .setLatestGenerationStamp(latestGenerationStamp)
-      .setRequestedChecksum(checksumProto)
-      .setCachingStrategy(getCachingStrategy(cachingStrategy))
-      .setAllowLazyPersist(allowLazyPersist)
-      .setPinning(pinning)
-      .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));
-    
-    if (source != null) {
-      proto.setSource(PBHelper.convertDatanodeInfo(source));
-    }
-
-    send(out, Op.WRITE_BLOCK, proto.build());
-  }
-
-  @Override
-  public void transferBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      final String clientName,
-      final DatanodeInfo[] targets,
-      final StorageType[] targetStorageTypes) throws IOException {
-    
-    OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
-      .setHeader(DataTransferProtoUtil.buildClientHeader(
-          blk, clientName, blockToken))
-      .addAllTargets(PBHelper.convert(targets))
-      .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
-      .build();
-
-    send(out, Op.TRANSFER_BLOCK, proto);
-  }
-
-  @Override
-  public void requestShortCircuitFds(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken,
-      SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
-        throws IOException {
-    OpRequestShortCircuitAccessProto.Builder builder =
-        OpRequestShortCircuitAccessProto.newBuilder()
-          .setHeader(DataTransferProtoUtil.buildBaseHeader(
-            blk, blockToken)).setMaxVersion(maxVersion);
-    if (slotId != null) {
-      builder.setSlotId(PBHelper.convert(slotId));
-    }
-    builder.setSupportsReceiptVerification(supportsReceiptVerification);
-    OpRequestShortCircuitAccessProto proto = builder.build();
-    send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
-  }
-  
-  @Override
-  public void releaseShortCircuitFds(SlotId slotId) throws IOException {
-    ReleaseShortCircuitAccessRequestProto.Builder builder =
-        ReleaseShortCircuitAccessRequestProto.newBuilder().
-        setSlotId(PBHelper.convert(slotId));
-    if (Trace.isTracing()) {
-      Span s = Trace.currentSpan();
-      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
-          .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
-    }
-    ReleaseShortCircuitAccessRequestProto proto = builder.build();
-    send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
-  }
-
-  @Override
-  public void requestShortCircuitShm(String clientName) throws IOException {
-    ShortCircuitShmRequestProto.Builder builder =
-        ShortCircuitShmRequestProto.newBuilder().
-        setClientName(clientName);
-    if (Trace.isTracing()) {
-      Span s = Trace.currentSpan();
-      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
-          .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
-    }
-    ShortCircuitShmRequestProto proto = builder.build();
-    send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
-  }
-  
-  @Override
-  public void replaceBlock(final ExtendedBlock blk,
-      final StorageType storageType, 
-      final Token<BlockTokenIdentifier> blockToken,
-      final String delHint,
-      final DatanodeInfo source) throws IOException {
-    OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
-      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
-      .setStorageType(PBHelper.convertStorageType(storageType))
-      .setDelHint(delHint)
-      .setSource(PBHelper.convertDatanodeInfo(source))
-      .build();
-    
-    send(out, Op.REPLACE_BLOCK, proto);
-  }
-
-  @Override
-  public void copyBlock(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException {
-    OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
-      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
-      .build();
-    
-    send(out, Op.COPY_BLOCK, proto);
-  }
-
-  @Override
-  public void blockChecksum(final ExtendedBlock blk,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException {
-    OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
-      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
-      .build();
-    
-    send(out, Op.BLOCK_CHECKSUM, proto);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
index 398d44c..852819f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -24,7 +24,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
 
 import java.io.IOException;
 import java.io.InputStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index 2bc6a18..a628287 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -137,7 +137,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
     return GetBlockLocalPathInfoResponseProto.newBuilder()
-        .setBlock(PBHelper.convert(resp.getBlock()))
+        .setBlock(PBHelperClient.convert(resp.getBlock()))
         .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath())
         .build();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e4afa3a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index 9d6375b..53ca147 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -185,7 +185,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
   @Override
   public long getReplicaVisibleLength(ExtendedBlock b) throws IOException {
     GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto
-        .newBuilder().setBlock(PBHelper.convert(b)).build();
+        .newBuilder().setBlock(PBHelperClient.convert(b)).build();
     try {
       return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength();
     } catch (ServiceException e) {
@@ -218,8 +218,8 @@ public class ClientDatanodeProtocolTranslatorPB implements
       Token<BlockTokenIdentifier> token) throws IOException {
     GetBlockLocalPathInfoRequestProto req =
         GetBlockLocalPathInfoRequestProto.newBuilder()
-        .setBlock(PBHelper.convert(block))
-        .setToken(PBHelper.convert(token)).build();
+        .setBlock(PBHelperClient.convert(block))
+        .setToken(PBHelperClient.convert(token)).build();
     GetBlockLocalPathInfoResponseProto resp;
     try {
       resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);


Mime
View raw message