hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [3/4] hadoop git commit: Revert "HDFS-8934. Move ShortCircuitShm to hdfs-client. Contributed by Mingliang Liu."
Date Mon, 24 Aug 2015 19:59:35 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
deleted file mode 100644
index 78325a3..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
+++ /dev/null
@@ -1,647 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Random;
-
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.InvalidRequestException;
-import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.StringUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import sun.misc.Unsafe;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.primitives.Ints;
-
-/**
- * A shared memory segment used to implement short-circuit reads.
- */
-public class ShortCircuitShm {
-  private static final Logger LOG = LoggerFactory.getLogger(ShortCircuitShm.class);
-
-  protected static final int BYTES_PER_SLOT = 64;
-
-  private static final Unsafe unsafe = safetyDance();
-
-  private static Unsafe safetyDance() {
-    try {
-      Field f = Unsafe.class.getDeclaredField("theUnsafe");
-      f.setAccessible(true);
-      return (Unsafe)f.get(null);
-    } catch (Throwable e) {
-      LOG.error("failed to load misc.Unsafe", e);
-    }
-    return null;
-  }
-
-  /**
-   * Calculate the usable size of a shared memory segment.
-   * We round down to a multiple of the slot size and do some validation.
-   *
-   * @param stream The stream we're using.
-   * @return       The usable size of the shared memory segment.
-   */
-  private static int getUsableLength(FileInputStream stream)
-      throws IOException {
-    int intSize = Ints.checkedCast(stream.getChannel().size());
-    int slots = intSize / BYTES_PER_SLOT;
-    if (slots == 0) {
-      throw new IOException("size of shared memory segment was " +
-          intSize + ", but that is not enough to hold even one slot.");
-    }
-    return slots * BYTES_PER_SLOT;
-  }
-
-  /**
-   * Identifies a DfsClientShm.
-   */
-  public static class ShmId implements Comparable<ShmId> {
-    private static final Random random = new Random();
-    private final long hi;
-    private final long lo;
-
-    /**
-     * Generate a random ShmId.
-     * 
-     * We generate ShmIds randomly to prevent a malicious client from
-     * successfully guessing one and using that to interfere with another
-     * client.
-     */
-    public static ShmId createRandom() {
-      return new ShmId(random.nextLong(), random.nextLong());
-    }
-
-    public ShmId(long hi, long lo) {
-      this.hi = hi;
-      this.lo = lo;
-    }
-    
-    public long getHi() {
-      return hi;
-    }
-    
-    public long getLo() {
-      return lo;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if ((o == null) || (o.getClass() != this.getClass())) {
-        return false;
-      }
-      ShmId other = (ShmId)o;
-      return new EqualsBuilder().
-          append(hi, other.hi).
-          append(lo, other.lo).
-          isEquals();
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder().
-          append(this.hi).
-          append(this.lo).
-          toHashCode();
-    }
-
-    @Override
-    public String toString() {
-      return String.format("%016x%016x", hi, lo);
-    }
-
-    @Override
-    public int compareTo(ShmId other) {
-      return ComparisonChain.start().
-          compare(hi, other.hi).
-          compare(lo, other.lo).
-          result();
-    }
-  };
-
-  /**
-   * Uniquely identifies a slot.
-   */
-  public static class SlotId {
-    private final ShmId shmId;
-    private final int slotIdx;
-    
-    public SlotId(ShmId shmId, int slotIdx) {
-      this.shmId = shmId;
-      this.slotIdx = slotIdx;
-    }
-
-    public ShmId getShmId() {
-      return shmId;
-    }
-
-    public int getSlotIdx() {
-      return slotIdx;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if ((o == null) || (o.getClass() != this.getClass())) {
-        return false;
-      }
-      SlotId other = (SlotId)o;
-      return new EqualsBuilder().
-          append(shmId, other.shmId).
-          append(slotIdx, other.slotIdx).
-          isEquals();
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder().
-          append(this.shmId).
-          append(this.slotIdx).
-          toHashCode();
-    }
-
-    @Override
-    public String toString() {
-      return String.format("SlotId(%s:%d)", shmId.toString(), slotIdx);
-    }
-  }
-
-  public class SlotIterator implements Iterator<Slot> {
-    int slotIdx = -1;
-
-    @Override
-    public boolean hasNext() {
-      synchronized (ShortCircuitShm.this) {
-        return allocatedSlots.nextSetBit(slotIdx + 1) != -1;
-      }
-    }
-
-    @Override
-    public Slot next() {
-      synchronized (ShortCircuitShm.this) {
-        int nextSlotIdx = allocatedSlots.nextSetBit(slotIdx + 1);
-        if (nextSlotIdx == -1) {
-          throw new NoSuchElementException();
-        }
-        slotIdx = nextSlotIdx;
-        return slots[nextSlotIdx];
-      }
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("SlotIterator " +
-          "doesn't support removal");
-    }
-  }
-  
-  /**
-   * A slot containing information about a replica.
-   *
-   * The format is:
-   * word 0
-   *   bit 0:32   Slot flags (see below).
-   *   bit 33:63  Anchor count.
-   * word 1:7
-   *   Reserved for future use, such as statistics.
-   *   Padding is also useful for avoiding false sharing.
-   *
-   * Little-endian versus big-endian is not relevant here since both the client
-   * and the server reside on the same computer and use the same orientation.
-   */
-  public class Slot {
-    /**
-     * Flag indicating that the slot is valid.  
-     * 
-     * The DFSClient sets this flag when it allocates a new slot within one of
-     * its shared memory regions.
-     * 
-     * The DataNode clears this flag when the replica associated with this slot
-     * is no longer valid.  The client itself also clears this flag when it
-     * believes that the DataNode is no longer using this slot to communicate.
-     */
-    private static final long VALID_FLAG =          1L<<63;
-
-    /**
-     * Flag indicating that the slot can be anchored.
-     */
-    private static final long ANCHORABLE_FLAG =     1L<<62;
-
-    /**
-     * The slot address in memory.
-     */
-    private final long slotAddress;
-
-    /**
-     * BlockId of the block this slot is used for.
-     */
-    private final ExtendedBlockId blockId;
-
-    Slot(long slotAddress, ExtendedBlockId blockId) {
-      this.slotAddress = slotAddress;
-      this.blockId = blockId;
-    }
-
-    /**
-     * Get the short-circuit memory segment associated with this Slot.
-     *
-     * @return      The enclosing short-circuit memory segment.
-     */
-    public ShortCircuitShm getShm() {
-      return ShortCircuitShm.this;
-    }
-
-    /**
-     * Get the ExtendedBlockId associated with this slot.
-     *
-     * @return      The ExtendedBlockId of this slot.
-     */
-    public ExtendedBlockId getBlockId() {
-      return blockId;
-    }
-
-    /**
-     * Get the SlotId of this slot, containing both shmId and slotIdx.
-     *
-     * @return      The SlotId of this slot.
-     */
-    public SlotId getSlotId() {
-      return new SlotId(getShmId(), getSlotIdx());
-    }
-
-    /**
-     * Get the Slot index.
-     *
-     * @return      The index of this slot.
-     */
-    public int getSlotIdx() {
-      return Ints.checkedCast(
-          (slotAddress - baseAddress) / BYTES_PER_SLOT);
-    }
-
-    /**
-     * Clear the slot.
-     */
-    void clear() {
-      unsafe.putLongVolatile(null, this.slotAddress, 0);
-    }
-
-    private boolean isSet(long flag) {
-      long prev = unsafe.getLongVolatile(null, this.slotAddress);
-      return (prev & flag) != 0;
-    }
-
-    private void setFlag(long flag) {
-      long prev;
-      do {
-        prev = unsafe.getLongVolatile(null, this.slotAddress);
-        if ((prev & flag) != 0) {
-          return;
-        }
-      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, prev | flag));
-    }
-
-    private void clearFlag(long flag) {
-      long prev;
-      do {
-        prev = unsafe.getLongVolatile(null, this.slotAddress);
-        if ((prev & flag) == 0) {
-          return;
-        }
-      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, prev & (~flag)));
-    }
-    
-    public boolean isValid() {
-      return isSet(VALID_FLAG);
-    }
-
-    public void makeValid() {
-      setFlag(VALID_FLAG);
-    }
-
-    public void makeInvalid() {
-      clearFlag(VALID_FLAG);
-    }
-
-    public boolean isAnchorable() {
-      return isSet(ANCHORABLE_FLAG);
-    }
-
-    public void makeAnchorable() {
-      setFlag(ANCHORABLE_FLAG);
-    }
-
-    public void makeUnanchorable() {
-      clearFlag(ANCHORABLE_FLAG);
-    }
-
-    public boolean isAnchored() {
-      long prev = unsafe.getLongVolatile(null, this.slotAddress);
-      if ((prev & VALID_FLAG) == 0) {
-        // Slot is no longer valid.
-        return false;
-      }
-      return ((prev & 0x7fffffff) != 0);
-    }
-
-    /**
-     * Try to add an anchor for a given slot.
-     *
-     * When a slot is anchored, we know that the block it refers to is resident
-     * in memory.
-     *
-     * @return          True if the slot is anchored.
-     */
-    public boolean addAnchor() {
-      long prev;
-      do {
-        prev = unsafe.getLongVolatile(null, this.slotAddress);
-        if ((prev & VALID_FLAG) == 0) {
-          // Slot is no longer valid.
-          return false;
-        }
-        if ((prev & ANCHORABLE_FLAG) == 0) {
-          // Slot can't be anchored right now.
-          return false;
-        }
-        if ((prev & 0x7fffffff) == 0x7fffffff) {
-          // Too many other threads have anchored the slot (2 billion?)
-          return false;
-        }
-      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, prev + 1));
-      return true;
-    }
-
-    /**
-     * Remove an anchor for a given slot.
-     */
-    public void removeAnchor() {
-      long prev;
-      do {
-        prev = unsafe.getLongVolatile(null, this.slotAddress);
-        Preconditions.checkState((prev & 0x7fffffff) != 0,
-            "Tried to remove anchor for slot " + slotAddress +", which was " +
-            "not anchored.");
-      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, prev - 1));
-    }
-
-    @Override
-    public String toString() {
-      return "Slot(slotIdx=" + getSlotIdx() + ", shm=" + getShm() + ")";
-    }
-  }
-
-  /**
-   * ID for this SharedMemorySegment.
-   */
-  private final ShmId shmId;
-
-  /**
-   * The base address of the memory-mapped file.
-   */
-  private final long baseAddress;
-
-  /**
-   * The mmapped length of the shared memory segment
-   */
-  private final int mmappedLength;
-
-  /**
-   * The slots associated with this shared memory segment.
-   * slot[i] contains the slot at offset i * BYTES_PER_SLOT,
-   * or null if that slot is not allocated.
-   */
-  private final Slot slots[];
-
-  /**
-   * A bitset where each bit represents a slot which is in use.
-   */
-  private final BitSet allocatedSlots;
-
-  /**
-   * Create the ShortCircuitShm.
-   * 
-   * @param shmId       The ID to use.
-   * @param stream      The stream that we're going to use to create this 
-   *                    shared memory segment.
-   *                    
-   *                    Although this is a FileInputStream, we are going to
-   *                    assume that the underlying file descriptor is writable
-   *                    as well as readable. It would be more appropriate to use
-   *                    a RandomAccessFile here, but that class does not have
-   *                    any public accessor which returns a FileDescriptor,
-   *                    unlike FileInputStream.
-   */
-  public ShortCircuitShm(ShmId shmId, FileInputStream stream)
-        throws IOException {
-    if (!NativeIO.isAvailable()) {
-      throw new UnsupportedOperationException("NativeIO is not available.");
-    }
-    if (Shell.WINDOWS) {
-      throw new UnsupportedOperationException(
-          "DfsClientShm is not yet implemented for Windows.");
-    }
-    if (unsafe == null) {
-      throw new UnsupportedOperationException(
-          "can't use DfsClientShm because we failed to " +
-          "load misc.Unsafe.");
-    }
-    this.shmId = shmId;
-    this.mmappedLength = getUsableLength(stream);
-    this.baseAddress = POSIX.mmap(stream.getFD(), 
-        POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength);
-    this.slots = new Slot[mmappedLength / BYTES_PER_SLOT];
-    this.allocatedSlots = new BitSet(slots.length);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("creating " + this.getClass().getSimpleName() +
-          "(shmId=" + shmId +
-          ", mmappedLength=" + mmappedLength +
-          ", baseAddress=" + String.format("%x", baseAddress) +
-          ", slots.length=" + slots.length + ")");
-    }
-  }
-
-  public final ShmId getShmId() {
-    return shmId;
-  }
-  
-  /**
-   * Determine if this shared memory object is empty.
-   *
-   * @return    True if the shared memory object is empty.
-   */
-  synchronized final public boolean isEmpty() {
-    return allocatedSlots.nextSetBit(0) == -1;
-  }
-
-  /**
-   * Determine if this shared memory object is full.
-   *
-   * @return    True if the shared memory object is full.
-   */
-  synchronized final public boolean isFull() {
-    return allocatedSlots.nextClearBit(0) >= slots.length;
-  }
-
-  /**
-   * Calculate the base address of a slot.
-   *
-   * @param slotIdx   Index of the slot.
-   * @return          The base address of the slot.
-   */
-  private final long calculateSlotAddress(int slotIdx) {
-    long offset = slotIdx;
-    offset *= BYTES_PER_SLOT;
-    return this.baseAddress + offset;
-  }
-
-  /**
-   * Allocate a new slot and register it.
-   *
-   * This function chooses an empty slot, initializes it, and then returns
-   * the relevant Slot object.
-   *
-   * @return    The new slot.
-   */
-  synchronized public final Slot allocAndRegisterSlot(
-      ExtendedBlockId blockId) {
-    int idx = allocatedSlots.nextClearBit(0);
-    if (idx >= slots.length) {
-      throw new RuntimeException(this + ": no more slots are available.");
-    }
-    allocatedSlots.set(idx, true);
-    Slot slot = new Slot(calculateSlotAddress(idx), blockId);
-    slot.clear();
-    slot.makeValid();
-    slots[idx] = slot;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots +
-                  StringUtils.getStackTrace(Thread.currentThread()));
-    }
-    return slot;
-  }
-
-  synchronized public final Slot getSlot(int slotIdx)
-      throws InvalidRequestException {
-    if (!allocatedSlots.get(slotIdx)) {
-      throw new InvalidRequestException(this + ": slot " + slotIdx +
-          " does not exist.");
-    }
-    return slots[slotIdx];
-  }
-
-  /**
-   * Register a slot.
-   *
-   * This function looks at a slot which has already been initialized (by
-   * another process), and registers it with us.  Then, it returns the 
-   * relevant Slot object.
-   *
-   * @return    The slot.
-   *
-   * @throws InvalidRequestException
-   *            If the slot index we're trying to allocate has not been
-   *            initialized, or is already in use.
-   */
-  synchronized public final Slot registerSlot(int slotIdx,
-      ExtendedBlockId blockId) throws InvalidRequestException {
-    if (slotIdx < 0) {
-      throw new InvalidRequestException(this + ": invalid negative slot " +
-          "index " + slotIdx);
-    }
-    if (slotIdx >= slots.length) {
-      throw new InvalidRequestException(this + ": invalid slot " +
-          "index " + slotIdx);
-    }
-    if (allocatedSlots.get(slotIdx)) {
-      throw new InvalidRequestException(this + ": slot " + slotIdx +
-          " is already in use.");
-    }
-    Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId);
-    if (!slot.isValid()) {
-      throw new InvalidRequestException(this + ": slot " + slotIdx +
-          " is not marked as valid.");
-    }
-    slots[slotIdx] = slot;
-    allocatedSlots.set(slotIdx, true);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots +
-                  StringUtils.getStackTrace(Thread.currentThread()));
-    }
-    return slot;
-  }
-
-  /**
-   * Unregisters a slot.
-   * 
-   * This doesn't alter the contents of the slot.  It just means
-   *
-   * @param slotIdx  Index of the slot to unregister.
-   */
-  synchronized public final void unregisterSlot(int slotIdx) {
-    Preconditions.checkState(allocatedSlots.get(slotIdx),
-        "tried to unregister slot " + slotIdx + ", which was not registered.");
-    allocatedSlots.set(slotIdx, false);
-    slots[slotIdx] = null;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + ": unregisterSlot " + slotIdx);
-    }
-  }
-  
-  /**
-   * Iterate over all allocated slots.
-   * 
-   * Note that this method isn't safe if 
-   *
-   * @return        The slot iterator.
-   */
-  public SlotIterator slotIterator() {
-    return new SlotIterator();
-  }
-
-  public void free() {
-    try {
-      POSIX.munmap(baseAddress, mmappedLength);
-    } catch (IOException e) {
-      LOG.warn(this + ": failed to munmap", e);
-    }
-    LOG.trace(this + ": freed");
-  }
-  
-  @Override
-  public String toString() {
-    return this.getClass().getSimpleName() + "(" + shmId + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java
deleted file mode 100644
index 17365fb..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.util;
-
-import java.io.EOFException;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-
-/**
- * An InputStream implementations which reads from some other InputStream
- * but expects an exact number of bytes. Any attempts to read past the
- * specified number of bytes will return as if the end of the stream
- * was reached. If the end of the underlying stream is reached prior to
- * the specified number of bytes, an EOFException is thrown.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class ExactSizeInputStream extends FilterInputStream {
-  private int remaining;
-
-  /**
-   * Construct an input stream that will read no more than
-   * 'numBytes' bytes.
-   * 
-   * If an EOF occurs on the underlying stream before numBytes
-   * bytes have been read, an EOFException will be thrown.
-   * 
-   * @param in the inputstream to wrap
-   * @param numBytes the number of bytes to read
-   */
-  public ExactSizeInputStream(InputStream in, int numBytes) {
-    super(in);
-    Preconditions.checkArgument(numBytes >= 0,
-        "Negative expected bytes: ", numBytes);
-    this.remaining = numBytes;
-  }
-
-  @Override
-  public int available() throws IOException {
-    return Math.min(super.available(), remaining);
-  }
-
-  @Override
-  public int read() throws IOException {
-    // EOF if we reached our limit
-    if (remaining <= 0) {
-      return -1;
-    }
-    final int result = super.read();
-    if (result >= 0) {
-      --remaining;
-    } else if (remaining > 0) {
-      // Underlying stream reached EOF but we haven't read the expected
-      // number of bytes.
-      throw new EOFException(
-          "Premature EOF. Expected " + remaining + "more bytes");
-    }
-    return result;
-  }
-
-  @Override
-  public int read(final byte[] b, final int off, int len)
-                  throws IOException {
-    if (remaining <= 0) {
-      return -1;
-    }
-    len = Math.min(len, remaining);
-    final int result = super.read(b, off, len);
-    if (result >= 0) {
-      remaining -= result;
-    } else if (remaining > 0) {
-      // Underlying stream reached EOF but we haven't read the expected
-      // number of bytes.
-      throw new EOFException(
-          "Premature EOF. Expected " + remaining + "more bytes");
-    }
-    return result;
-  }
-
-  @Override
-  public long skip(final long n) throws IOException {
-    final long result = super.skip(Math.min(n, remaining));
-    if (result > 0) {
-      remaining -= result;
-    } else if (remaining > 0) {
-      // Underlying stream reached EOF but we haven't read the expected
-      // number of bytes.
-      throw new EOFException(
-          "Premature EOF. Expected " + remaining + "more bytes");
-    }
-    return result;
-  }
-  
-  @Override
-  public boolean markSupported() {
-    return false;
-  }
-
-  @Override
-  public void mark(int readlimit) {
-    throw new UnsupportedOperationException();
-  }
-  
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index a52367b..9c53874 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -482,8 +482,6 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8823. Move replication factor into individual blocks. (wheat9)
 
-    HDFS-8934. Move ShortCircuitShm to hdfs-client. (Mingliang Liu via wheat9)
-
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index fec6b85..8517173 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -592,7 +592,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
         failureInjector.getSupportsReceiptVerification());
     DataInputStream in = new DataInputStream(peer.getInputStream());
     BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
+        PBHelper.vintPrefixed(in));
     DomainSocket sock = peer.getDomainSocket();
     failureInjector.injectRequestFileDescriptorsFailure();
     switch (resp.getStatus()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 12646b5..a7b518e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -149,7 +149,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -1928,7 +1928,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
           new Sender(out).blockChecksum(block, lb.getBlockToken());
 
           final BlockOpResponseProto reply =
-            BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
+            BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
 
           String logInfo = "for block " + block + " from datanode " + datanodes[j];
           DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
@@ -1960,7 +1960,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
           // read crc-type
           final DataChecksum.Type ct;
           if (checksumData.hasCrcType()) {
-            ct = PBHelperClient.convert(checksumData
+            ct = PBHelper.convert(checksumData
                 .getCrcType());
           } else {
             LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
@@ -2088,11 +2088,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
           0, 1, true, CachingStrategy.newDefaultStrategy());
       final BlockOpResponseProto reply =
-          BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
+          BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
       String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn;
       DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
 
-      return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
+      return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
     } finally {
       IOUtils.cleanup(null, pair.in, pair.out);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index a975312..8dd85b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -1245,7 +1245,7 @@ class DataStreamer extends Daemon {
 
       //ack
       BlockOpResponseProto response =
-          BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
+          BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
       if (SUCCESS != response.getStatus()) {
         throw new IOException("Failed to add a datanode");
       }
@@ -1524,7 +1524,7 @@ class DataStreamer extends Daemon {
 
         // receive ack for connect
         BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-            PBHelperClient.vintPrefixed(blockReplyStream));
+            PBHelper.vintPrefixed(blockReplyStream));
         pipelineStatus = resp.getStatus();
         firstBadLink = resp.getFirstBadLink();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
new file mode 100644
index 0000000..7b9e8e3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+
+/**
+ * An immutable key which identifies a block.
+ */
+@InterfaceAudience.Private
+final public class ExtendedBlockId {
+  /**
+   * The block ID for this block.
+   */
+  private final long blockId;
+
+  /**
+   * The block pool ID for this block.
+   */
+  private final String bpId;
+
+  public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) {
+    return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+  }
+  
+  public ExtendedBlockId(long blockId, String bpId) {
+    this.blockId = blockId;
+    this.bpId = bpId;
+  }
+
+  public long getBlockId() {
+    return this.blockId;
+  }
+
+  public String getBlockPoolId() {
+    return this.bpId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if ((o == null) || (o.getClass() != this.getClass())) {
+      return false;
+    }
+    ExtendedBlockId other = (ExtendedBlockId)o;
+    return new EqualsBuilder().
+        append(blockId, other.blockId).
+        append(bpId, other.bpId).
+        isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().
+        append(this.blockId).
+        append(this.bpId).
+        toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append(blockId).
+        append("_").append(bpId).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 05a9f2c..d70f419 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
@@ -414,7 +414,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
         new BufferedInputStream(peer.getInputStream(), bufferSize));
     
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
+        PBHelper.vintPrefixed(in));
     RemoteBlockReader2.checkSuccess(status, peer, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 4c23d36..c368d65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
@@ -417,7 +417,7 @@ public class RemoteBlockReader2  implements BlockReader {
     DataInputStream in = new DataInputStream(peer.getInputStream());
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
+        PBHelper.vintPrefixed(in));
     checkSuccess(status, peer, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
new file mode 100644
index 0000000..4792b0e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Represents a peer that we communicate with by using blocking I/O 
+ * on a UNIX domain socket.
+ */
+@InterfaceAudience.Private
+public class DomainPeer implements Peer {
+  private final DomainSocket socket;
+  private final OutputStream out;
+  private final InputStream in;
+  private final ReadableByteChannel channel;
+
+  public DomainPeer(DomainSocket socket) {
+    this.socket = socket;
+    this.out = socket.getOutputStream();
+    this.in = socket.getInputStream();
+    this.channel = socket.getChannel();
+  }
+
+  @Override
+  public ReadableByteChannel getInputStreamChannel() {
+    return channel;
+  }
+
+  @Override
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs);
+  }
+
+  @Override
+  public int getReceiveBufferSize() throws IOException {
+    return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+  }
+
+  @Override
+  public boolean getTcpNoDelay() throws IOException {
+    /* No TCP, no TCP_NODELAY. */
+    return false;
+  }
+
+  @Override
+  public void setWriteTimeout(int timeoutMs) throws IOException {
+    socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs);
+  }
+
+  @Override
+  public boolean isClosed() {
+    return !socket.isOpen();
+  }
+
+  @Override
+  public void close() throws IOException {
+    socket.close();
+  }
+
+  @Override
+  public String getRemoteAddressString() {
+    return "unix:" + socket.getPath();
+  }
+
+  @Override
+  public String getLocalAddressString() {
+    return "<local>";
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public boolean isLocal() {
+    /* UNIX domain sockets can only be used for local communication. */
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "DomainPeer(" + getRemoteAddressString() + ")";
+  }
+
+  @Override
+  public DomainSocket getDomainSocket() {
+    return socket;
+  }
+
+  @Override
+  public boolean hasSecureChannel() {
+    //
+    // Communication over domain sockets is assumed to be secure, since it
+    // doesn't pass over any network.  We also carefully control the privileges
+    // that can be used on the domain socket inode and its parent directories.
+    // See #{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0}
+    // for details.
+    //
+    // So unless you are running as root or the hdfs superuser, you cannot
+    // launch a man-in-the-middle attach on UNIX domain socket traffic.
+    //
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
new file mode 100644
index 0000000..42cf287
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.net.unix.DomainSocket;
+
+/**
+ * Represents a connection to a peer.
+ */
+@InterfaceAudience.Private
+public interface Peer extends Closeable {
+  /**
+   * @return                The input stream channel associated with this
+   *                        peer, or null if it has none.
+   */
+  public ReadableByteChannel getInputStreamChannel();
+
+  /**
+   * Set the read timeout on this peer.
+   *
+   * @param timeoutMs       The timeout in milliseconds.
+   */
+  public void setReadTimeout(int timeoutMs) throws IOException;
+
+  /**
+   * @return                The receive buffer size.
+   */
+  public int getReceiveBufferSize() throws IOException;
+
+  /**
+   * @return                True if TCP_NODELAY is turned on.
+   */
+  public boolean getTcpNoDelay() throws IOException;
+
+  /**
+   * Set the write timeout on this peer.
+   *
+   * Note: this is not honored for BasicInetPeer.
+   * See {@link BasicSocketPeer#setWriteTimeout} for details.
+   * 
+   * @param timeoutMs       The timeout in milliseconds.
+   */
+  public void setWriteTimeout(int timeoutMs) throws IOException;
+
+  /**
+   * @return                true only if the peer is closed.
+   */
+  public boolean isClosed();
+  
+  /**
+   * Close the peer.
+   *
+   * It's safe to re-close a Peer that is already closed.
+   */
+  public void close() throws IOException;
+
+  /**
+   * @return               A string representing the remote end of our 
+   *                       connection to the peer.
+   */
+  public String getRemoteAddressString();
+
+  /**
+   * @return               A string representing the local end of our 
+   *                       connection to the peer.
+   */
+  public String getLocalAddressString();
+  
+  /**
+   * @return               An InputStream associated with the Peer.
+   *                       This InputStream will be valid until you close
+   *                       this peer with Peer#close.
+   */
+  public InputStream getInputStream() throws IOException;
+  
+  /**
+   * @return               An OutputStream associated with the Peer.
+   *                       This OutputStream will be valid until you close
+   *                       this peer with Peer#close.
+   */
+  public OutputStream getOutputStream() throws IOException;
+
+  /**
+   * @return               True if the peer resides on the same
+   *                       computer as we.
+   */
+  public boolean isLocal();
+
+  /**
+   * @return               The DomainSocket associated with the current
+   *                       peer, or null if there is none.
+   */
+  public DomainSocket getDomainSocket();
+  
+  /**
+   * Return true if the channel is secure.
+   *
+   * @return               True if our channel to this peer is not
+   *                       susceptible to man-in-the-middle attacks.
+   */
+  public boolean hasSecureChannel();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
new file mode 100644
index 0000000..5f86e52
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/** Block Construction Stage */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum BlockConstructionStage {
+  /** The enumerates are always listed as regular stage followed by the
+   * recovery stage. 
+   * Changing this order will make getRecoveryStage not working.
+   */
+  // pipeline set up for block append
+  PIPELINE_SETUP_APPEND,
+  // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+  PIPELINE_SETUP_APPEND_RECOVERY,
+  // data streaming
+  DATA_STREAMING,
+  // pipeline setup for failed data streaming recovery
+  PIPELINE_SETUP_STREAMING_RECOVERY,
+  // close the block and pipeline
+  PIPELINE_CLOSE,
+  // Recover a failed PIPELINE_CLOSE
+  PIPELINE_CLOSE_RECOVERY,
+  // pipeline set up for block creation
+  PIPELINE_SETUP_CREATE,
+  // transfer RBW for adding datanodes
+  TRANSFER_RBW,
+  // transfer Finalized for adding datanodes
+  TRANSFER_FINALIZED;
+  
+  final static private byte RECOVERY_BIT = (byte)1;
+  
+  /**
+   * get the recovery stage of this stage
+   */
+  public BlockConstructionStage getRecoveryStage() {
+    if (this == PIPELINE_SETUP_CREATE) {
+      throw new IllegalArgumentException( "Unexpected blockStage " + this);
+    } else {
+      return values()[ordinal()|RECOVERY_BIT];
+    }
+  }
+}    

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
new file mode 100644
index 0000000..284281a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceInfo;
+import org.apache.htrace.TraceScope;
+
+/**
+ * Static utilities for dealing with the protocol buffers used by the
+ * Data Transfer Protocol.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class DataTransferProtoUtil {
+  static BlockConstructionStage fromProto(
+      OpWriteBlockProto.BlockConstructionStage stage) {
+    return BlockConstructionStage.valueOf(stage.name());
+  }
+
+  static OpWriteBlockProto.BlockConstructionStage toProto(
+      BlockConstructionStage stage) {
+    return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name());
+  }
+
+  public static ChecksumProto toProto(DataChecksum checksum) {
+    ChecksumTypeProto type = PBHelper.convert(checksum.getChecksumType());
+    // ChecksumType#valueOf never returns null
+    return ChecksumProto.newBuilder()
+      .setBytesPerChecksum(checksum.getBytesPerChecksum())
+      .setType(type)
+      .build();
+  }
+
+  public static DataChecksum fromProto(ChecksumProto proto) {
+    if (proto == null) return null;
+
+    int bytesPerChecksum = proto.getBytesPerChecksum();
+    DataChecksum.Type type = PBHelper.convert(proto.getType());
+    return DataChecksum.newDataChecksum(type, bytesPerChecksum);
+  }
+
+  static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
+      String client, Token<BlockTokenIdentifier> blockToken) {
+    ClientOperationHeaderProto header =
+      ClientOperationHeaderProto.newBuilder()
+        .setBaseHeader(buildBaseHeader(blk, blockToken))
+        .setClientName(client)
+        .build();
+    return header;
+  }
+
+  static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
+      Token<BlockTokenIdentifier> blockToken) {
+    BaseHeaderProto.Builder builder =  BaseHeaderProto.newBuilder()
+      .setBlock(PBHelper.convert(blk))
+      .setToken(PBHelper.convert(blockToken));
+    if (Trace.isTracing()) {
+      Span s = Trace.currentSpan();
+      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
+          .setTraceId(s.getTraceId())
+          .setParentId(s.getSpanId()));
+    }
+    return builder.build();
+  }
+
+  public static TraceInfo fromProto(DataTransferTraceInfoProto proto) {
+    if (proto == null) return null;
+    if (!proto.hasTraceId()) return null;
+    return new TraceInfo(proto.getTraceId(), proto.getParentId());
+  }
+
+  public static TraceScope continueTraceSpan(ClientOperationHeaderProto header,
+      String description) {
+    return continueTraceSpan(header.getBaseHeader(), description);
+  }
+
+  public static TraceScope continueTraceSpan(BaseHeaderProto header,
+      String description) {
+    return continueTraceSpan(header.getTraceInfo(), description);
+  }
+
+  public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto,
+      String description) {
+    TraceScope scope = null;
+    TraceInfo info = fromProto(proto);
+    if (info != null) {
+      scope = Trace.startSpan(description, info);
+    }
+    return scope;
+  }
+
+  public static void checkBlockOpStatus(
+          BlockOpResponseProto response,
+          String logInfo) throws IOException {
+    if (response.getStatus() != Status.SUCCESS) {
+      if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+        throw new InvalidBlockTokenException(
+          "Got access token error"
+          + ", status message " + response.getMessage()
+          + ", " + logInfo
+        );
+      } else {
+        throw new IOException(
+          "Got error"
+          + ", status message " + response.getMessage()
+          + ", " + logInfo
+        );
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
new file mode 100644
index 0000000..48e931d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+/**
+ * Transfer data to/from datanode using a streaming protocol.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface DataTransferProtocol {
+  public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
+  
+  /** Version for data transfers between clients and datanodes
+   * This should change when serialization of DatanodeInfo, not just
+   * when protocol changes. It is not very obvious. 
+   */
+  /*
+   * Version 28:
+   *    Declare methods in DataTransferProtocol interface.
+   */
+  public static final int DATA_TRANSFER_VERSION = 28;
+
+  /** 
+   * Read a block.
+   * 
+   * @param blk the block being read.
+   * @param blockToken security token for accessing the block.
+   * @param clientName client's name.
+   * @param blockOffset offset of the block.
+   * @param length maximum number of bytes for this read.
+   * @param sendChecksum if false, the DN should skip reading and sending
+   *        checksums
+   * @param cachingStrategy  The caching strategy to use.
+   */
+  public void readBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final long blockOffset,
+      final long length,
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException;
+
+  /**
+   * Write a block to a datanode pipeline.
+   * The receiver datanode of this call is the next datanode in the pipeline.
+   * The other downstream datanodes are specified by the targets parameter.
+   * Note that the receiver {@link DatanodeInfo} is not required in the
+   * parameter list since the receiver datanode knows its info.  However, the
+   * {@link StorageType} for storing the replica in the receiver datanode is a 
+   * parameter since the receiver datanode may support multiple storage types.
+   *
+   * @param blk the block being written.
+   * @param storageType for storing the replica in the receiver datanode.
+   * @param blockToken security token for accessing the block.
+   * @param clientName client's name.
+   * @param targets other downstream datanodes in the pipeline.
+   * @param targetStorageTypes target {@link StorageType}s corresponding
+   *                           to the target datanodes.
+   * @param source source datanode.
+   * @param stage pipeline stage.
+   * @param pipelineSize the size of the pipeline.
+   * @param minBytesRcvd minimum number of bytes received.
+   * @param maxBytesRcvd maximum number of bytes received.
+   * @param latestGenerationStamp the latest generation stamp of the block.
+   * @param pinning whether to pin the block, so Balancer won't move it.
+   * @param targetPinnings whether to pin the block on target datanode
+   */
+  public void writeBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes, 
+      final DatanodeInfo source,
+      final BlockConstructionStage stage,
+      final int pipelineSize,
+      final long minBytesRcvd,
+      final long maxBytesRcvd,
+      final long latestGenerationStamp,
+      final DataChecksum requestedChecksum,
+      final CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException;
+  /**
+   * Transfer a block to another datanode.
+   * The block stage must be
+   * either {@link BlockConstructionStage#TRANSFER_RBW}
+   * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
+   * 
+   * @param blk the block being transferred.
+   * @param blockToken security token for accessing the block.
+   * @param clientName client's name.
+   * @param targets target datanodes.
+   */
+  public void transferBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes) throws IOException;
+
+  /**
+   * Request short circuit access file descriptors from a DataNode.
+   *
+   * @param blk             The block to get file descriptors for.
+   * @param blockToken      Security token for accessing the block.
+   * @param slotId          The shared memory slot id to use, or null 
+   *                          to use no slot id.
+   * @param maxVersion      Maximum version of the block data the client 
+   *                          can understand.
+   * @param supportsReceiptVerification  True if the client supports
+   *                          receipt verification.
+   */
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
+        throws IOException;
+
+  /**
+   * Release a pair of short-circuit FDs requested earlier.
+   *
+   * @param slotId          SlotID used by the earlier file descriptors.
+   */
+  public void releaseShortCircuitFds(final SlotId slotId) throws IOException;
+
+  /**
+   * Request a short circuit shared memory area from a DataNode.
+   * 
+   * @param clientName       The name of the client.
+   */
+  public void requestShortCircuitShm(String clientName) throws IOException;
+  
+  /**
+   * Receive a block from a source datanode
+   * and then notifies the namenode
+   * to remove the copy from the original datanode.
+   * Note that the source datanode and the original datanode can be different.
+   * It is used for balancing purpose.
+   * 
+   * @param blk the block being replaced.
+   * @param storageType the {@link StorageType} for storing the block.
+   * @param blockToken security token for accessing the block.
+   * @param delHint the hint for deleting the block in the original datanode.
+   * @param source the source datanode for receiving the block.
+   */
+  public void replaceBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
+      final Token<BlockTokenIdentifier> blockToken,
+      final String delHint,
+      final DatanodeInfo source) throws IOException;
+
+  /**
+   * Copy a block. 
+   * It is used for balancing purpose.
+   * 
+   * @param blk the block being copied.
+   * @param blockToken security token for accessing the block.
+   */
+  public void copyBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException;
+
+  /**
+   * Get block checksum (MD5 of CRC32).
+   * 
+   * @param blk a block.
+   * @param blockToken security token for accessing the block.
+   * @throws IOException
+   */
+  public void blockChecksum(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
new file mode 100644
index 0000000..3077498
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/** Operation */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum Op {
+  WRITE_BLOCK((byte)80),
+  READ_BLOCK((byte)81),
+  READ_METADATA((byte)82),
+  REPLACE_BLOCK((byte)83),
+  COPY_BLOCK((byte)84),
+  BLOCK_CHECKSUM((byte)85),
+  TRANSFER_BLOCK((byte)86),
+  REQUEST_SHORT_CIRCUIT_FDS((byte)87),
+  RELEASE_SHORT_CIRCUIT_FDS((byte)88),
+  REQUEST_SHORT_CIRCUIT_SHM((byte)89),
+  CUSTOM((byte)127);
+
+  /** The code for this operation. */
+  public final byte code;
+  
+  private Op(byte code) {
+    this.code = code;
+  }
+  
+  private static final int FIRST_CODE = values()[0].code;
+  /** Return the object represented by the code. */
+  private static Op valueOf(byte code) {
+    final int i = (code & 0xff) - FIRST_CODE;
+    return i < 0 || i >= values().length? null: values()[i];
+  }
+
+  /** Read from in */
+  public static Op read(DataInput in) throws IOException {
+    return valueOf(in.readByte());
+  }
+
+  /** Write to out */
+  public void write(DataOutput out) throws IOException {
+    out.write(code);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
index 44f38c6..a811f39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer;
 
-import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.IOException;
 import java.io.InputStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index 694f521..d435543 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.protocol.datatransfer;
 
 import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
 import org.apache.htrace.TraceScope;
@@ -137,7 +136,7 @@ public abstract class Receiver implements DataTransferProtocol {
         proto.getClass().getSimpleName());
     try {
       writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
-          PBHelperClient.convertStorageType(proto.getStorageType()),
+          PBHelper.convertStorageType(proto.getStorageType()),
           PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
           proto.getHeader().getClientName(),
           targets,
@@ -229,7 +228,7 @@ public abstract class Receiver implements DataTransferProtocol {
         proto.getClass().getSimpleName());
     try {
       replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
-          PBHelperClient.convertStorageType(proto.getStorageType()),
+          PBHelper.convertStorageType(proto.getStorageType()),
           PBHelper.convert(proto.getHeader().getToken()),
           proto.getDelHint(),
           PBHelper.convert(proto.getSource()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
new file mode 100644
index 0000000..df69125
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+import org.apache.htrace.Trace;
+import org.apache.htrace.Span;
+
+import com.google.protobuf.Message;
+
+/** Sender */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class Sender implements DataTransferProtocol {
+  private final DataOutputStream out;
+
+  /** Create a sender for DataTransferProtocol with a output stream. */
+  public Sender(final DataOutputStream out) {
+    this.out = out;    
+  }
+
+  /** Initialize a operation. */
+  private static void op(final DataOutput out, final Op op
+      ) throws IOException {
+    out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+    op.write(out);
+  }
+
+  private static void send(final DataOutputStream out, final Op opcode,
+      final Message proto) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
+          + ": " + proto);
+    }
+    op(out, opcode);
+    proto.writeDelimitedTo(out);
+    out.flush();
+  }
+
+  static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
+    CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
+    if (cachingStrategy.getReadahead() != null) {
+      builder.setReadahead(cachingStrategy.getReadahead().longValue());
+    }
+    if (cachingStrategy.getDropBehind() != null) {
+      builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
+    }
+    return builder.build();
+  }
+
+  @Override
+  public void readBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final long blockOffset,
+      final long length,
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
+
+    OpReadBlockProto proto = OpReadBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
+      .setOffset(blockOffset)
+      .setLen(length)
+      .setSendChecksums(sendChecksum)
+      .setCachingStrategy(getCachingStrategy(cachingStrategy))
+      .build();
+
+    send(out, Op.READ_BLOCK, proto);
+  }
+  
+
+  @Override
+  public void writeBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes, 
+      final DatanodeInfo source,
+      final BlockConstructionStage stage,
+      final int pipelineSize,
+      final long minBytesRcvd,
+      final long maxBytesRcvd,
+      final long latestGenerationStamp,
+      DataChecksum requestedChecksum,
+      final CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException {
+    ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
+        blk, clientName, blockToken);
+    
+    ChecksumProto checksumProto =
+      DataTransferProtoUtil.toProto(requestedChecksum);
+
+    OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
+      .setHeader(header)
+      .setStorageType(PBHelper.convertStorageType(storageType))
+      .addAllTargets(PBHelper.convert(targets, 1))
+      .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
+      .setStage(toProto(stage))
+      .setPipelineSize(pipelineSize)
+      .setMinBytesRcvd(minBytesRcvd)
+      .setMaxBytesRcvd(maxBytesRcvd)
+      .setLatestGenerationStamp(latestGenerationStamp)
+      .setRequestedChecksum(checksumProto)
+      .setCachingStrategy(getCachingStrategy(cachingStrategy))
+      .setAllowLazyPersist(allowLazyPersist)
+      .setPinning(pinning)
+      .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));
+    
+    if (source != null) {
+      proto.setSource(PBHelper.convertDatanodeInfo(source));
+    }
+
+    send(out, Op.WRITE_BLOCK, proto.build());
+  }
+
+  @Override
+  public void transferBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes) throws IOException {
+    
+    OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildClientHeader(
+          blk, clientName, blockToken))
+      .addAllTargets(PBHelper.convert(targets))
+      .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
+      .build();
+
+    send(out, Op.TRANSFER_BLOCK, proto);
+  }
+
+  @Override
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
+        throws IOException {
+    OpRequestShortCircuitAccessProto.Builder builder =
+        OpRequestShortCircuitAccessProto.newBuilder()
+          .setHeader(DataTransferProtoUtil.buildBaseHeader(
+            blk, blockToken)).setMaxVersion(maxVersion);
+    if (slotId != null) {
+      builder.setSlotId(PBHelper.convert(slotId));
+    }
+    builder.setSupportsReceiptVerification(supportsReceiptVerification);
+    OpRequestShortCircuitAccessProto proto = builder.build();
+    send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
+  }
+  
+  @Override
+  public void releaseShortCircuitFds(SlotId slotId) throws IOException {
+    ReleaseShortCircuitAccessRequestProto.Builder builder =
+        ReleaseShortCircuitAccessRequestProto.newBuilder().
+        setSlotId(PBHelper.convert(slotId));
+    if (Trace.isTracing()) {
+      Span s = Trace.currentSpan();
+      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
+          .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
+    }
+    ReleaseShortCircuitAccessRequestProto proto = builder.build();
+    send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
+  }
+
+  @Override
+  public void requestShortCircuitShm(String clientName) throws IOException {
+    ShortCircuitShmRequestProto.Builder builder =
+        ShortCircuitShmRequestProto.newBuilder().
+        setClientName(clientName);
+    if (Trace.isTracing()) {
+      Span s = Trace.currentSpan();
+      builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
+          .setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
+    }
+    ShortCircuitShmRequestProto proto = builder.build();
+    send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
+  }
+  
+  @Override
+  public void replaceBlock(final ExtendedBlock blk,
+      final StorageType storageType, 
+      final Token<BlockTokenIdentifier> blockToken,
+      final String delHint,
+      final DatanodeInfo source) throws IOException {
+    OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+      .setStorageType(PBHelper.convertStorageType(storageType))
+      .setDelHint(delHint)
+      .setSource(PBHelper.convertDatanodeInfo(source))
+      .build();
+    
+    send(out, Op.REPLACE_BLOCK, proto);
+  }
+
+  @Override
+  public void copyBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+    OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+      .build();
+    
+    send(out, Op.COPY_BLOCK, proto);
+  }
+
+  @Override
+  public void blockChecksum(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+    OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+      .build();
+    
+    send(out, Op.BLOCK_CHECKSUM, proto);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
index 852819f..398d44c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -24,7 +24,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.IOException;
 import java.io.InputStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index a628287..2bc6a18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -137,7 +137,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
     return GetBlockLocalPathInfoResponseProto.newBuilder()
-        .setBlock(PBHelperClient.convert(resp.getBlock()))
+        .setBlock(PBHelper.convert(resp.getBlock()))
         .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath())
         .build();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a727c6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index 53ca147..9d6375b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -185,7 +185,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
   @Override
   public long getReplicaVisibleLength(ExtendedBlock b) throws IOException {
     GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto
-        .newBuilder().setBlock(PBHelperClient.convert(b)).build();
+        .newBuilder().setBlock(PBHelper.convert(b)).build();
     try {
       return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength();
     } catch (ServiceException e) {
@@ -218,8 +218,8 @@ public class ClientDatanodeProtocolTranslatorPB implements
       Token<BlockTokenIdentifier> token) throws IOException {
     GetBlockLocalPathInfoRequestProto req =
         GetBlockLocalPathInfoRequestProto.newBuilder()
-        .setBlock(PBHelperClient.convert(block))
-        .setToken(PBHelperClient.convert(token)).build();
+        .setBlock(PBHelper.convert(block))
+        .setToken(PBHelper.convert(token)).build();
     GetBlockLocalPathInfoResponseProto resp;
     try {
       resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);


Mime
View raw message