hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l..@apache.org
Subject hadoop git commit: HDFS-8924. Add pluggable interface for reading replicas in DFSClient. (Colin Patrick McCabe via Lei Xu)
Date Sat, 22 Aug 2015 00:03:10 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk caa636bf1 -> 7087e700e


HDFS-8924. Add pluggable interface for reading replicas in DFSClient. (Colin Patrick McCabe
via Lei Xu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7087e700
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7087e700
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7087e700

Branch: refs/heads/trunk
Commit: 7087e700e032dabc174ecc12b62c12e7d49b995f
Parents: caa636b
Author: Lei Xu <lei@cloudera.com>
Authored: Fri Aug 21 17:02:00 2015 -0700
Committer: Lei Xu <lei@cloudera.com>
Committed: Fri Aug 21 17:02:00 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/ReplicaAccessor.java |  88 ++++++
 .../hadoop/hdfs/ReplicaAccessorBuilder.java     | 101 +++++++
 .../hdfs/client/HdfsClientConfigKeys.java       |   3 +
 .../hadoop/hdfs/client/impl/DfsClientConf.java  |  50 +++-
 .../src/main/proto/datatransfer.proto           |   4 +
 .../apache/hadoop/hdfs/BlockReaderFactory.java  |  48 +++
 .../apache/hadoop/hdfs/ExternalBlockReader.java | 120 ++++++++
 .../hadoop/hdfs/protocol/datatransfer/Op.java   |   3 +-
 .../hdfs/protocol/datatransfer/Receiver.java    |   1 +
 .../hadoop/hdfs/TestExternalBlockReader.java    | 298 +++++++++++++++++++
 10 files changed, 712 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7087e700/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
new file mode 100644
index 0000000..720e6a1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * The public API for ReplicaAccessor objects.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class ReplicaAccessor {
+  /**
+   * Read bytes from the replica.
+   *
+   * @param pos    The position in the replica to start reading at.
+   *                 Must not be negative.
+   * @param buf    The byte array to read into.
+   * @param off    The offset within buf to start reading into.
+   * @param len    The maximum length to read.
+   *
+   * @return       The number of bytes read.  If the read extends past the end
+   *                  of the replica, a short read count will be returned.  We
+   *                  will never return a negative number.  We will never
+   *                  return a short read count unless EOF is reached.
+   */
+  public abstract int read(long pos, byte[] buf, int off, int len)
+      throws IOException;
+
+  /**
+   * Read bytes from the replica.
+   *
+   * @param pos    The position in the replica to start reading at.
+   *                 Must not be negative.
+   * @param buf    The byte buffer to read into.  The amount to read will be
+   *                 dictated by the remaining bytes between the current
+   *                 position and the limit.  The ByteBuffer may or may not be
+   *                 direct.
+   *
+   * @return       The number of bytes read.  If the read extends past the end
+   *                 of the replica, a short read count will be returned.  We
+   *                 will never return a negative number.  We will never return
+   *                 a short read count unless EOF is reached.
+   */
+  public abstract int read(long pos, ByteBuffer buf) throws IOException;
+
+  /**
+   * Release the resources associated with the ReplicaAccessor.
+   *
+   * It is recommended that implementations never throw an IOException.  The
+   * method is declared as throwing IOException in order to remain compatible
+   * with java.io.Closeable.  If an exception is thrown, the ReplicaAccessor
+   * must still be closed when the function returns in order to prevent a
+   * resource leak.
+   */
+  public abstract void close() throws IOException;
+
+  /**
+   * Return true if bytes read via this accessor should count towards the
+   * local byte count statistics.
+   */
+  public abstract boolean isLocal();
+
+  /**
+   * Return true if bytes read via this accessor should count towards the
+   * short-circuit byte count statistics.
+   */
+  public abstract boolean isShortCircuit();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7087e700/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessorBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessorBuilder.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessorBuilder.java
new file mode 100644
index 0000000..2905df1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessorBuilder.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The public API for creating a new ReplicaAccessor.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class ReplicaAccessorBuilder {
+  /**
+   * Set the file name which is being opened.  Provided for debugging purposes.
+   */
+  public abstract ReplicaAccessorBuilder setFileName(String fileName);
+
+  /** Set the block ID and block pool ID which are being opened. */
+  public abstract ReplicaAccessorBuilder
+      setBlock(long blockId, String blockPoolId);
+
+  /**
+   * Set whether checksums must be verified.  Checksums should be skipped if
+   * the user has disabled checksum verification in the configuration.  Users
+   * may wish to do this if their software does checksum verification at a
+   * higher level than HDFS.
+   */
+  public abstract ReplicaAccessorBuilder
+      setVerifyChecksum(boolean verifyChecksum);
+
+  /** Set the name of the HDFS client.  Provided for debugging purposes. */
+  public abstract ReplicaAccessorBuilder setClientName(String clientName);
+
+  /**
+   * Set whether short-circuit is enabled.  Short-circuit may be disabled if
+   * the user has set dfs.client.read.shortcircuit to false, or if the block
+   * being read is under construction.  The fact that this bit is enabled does
+   * not mean that the user has permission to do short-circuit reads or to
+   * access the replica-- that must be checked separately by the
+   * ReplicaAccessorBuilder implementation.
+   */
+  public abstract ReplicaAccessorBuilder
+      setAllowShortCircuitReads(boolean allowShortCircuit);
+
+  /**
+   * Set the length of the replica which is visible to this client.  If bytes
+   * are added later, they will not be visible to the ReplicaAccessor we are
+   * building.  In order to see more of the replica, the client must re-open
+   * this HDFS file.  The visible length provides an upper bound, but not a
+   * lower one.  If the replica is deleted or truncated, fewer bytes may be
+   * visible than specified here.
+   */
+  public abstract ReplicaAccessorBuilder setVisibleLength(long visibleLength);
+
+  /**
+   * Set the configuration to use.  ReplicaAccessorBuilder subclasses should
+   * define their own configuration prefix.  For example, the foobar plugin
+   * could look for configuration keys like foo.bar.parameter1,
+   * foo.bar.parameter2.
+   */
+  public abstract ReplicaAccessorBuilder setConfiguration(Configuration conf);
+
+  /**
+   * Set the block access token to use.
+   */
+  public abstract ReplicaAccessorBuilder setBlockAccessToken(byte[] token);
+
+  /**
+   * Build a new ReplicaAccessor.
+   *
+   * The implementation must perform any necessary access checks before
+   * constructing the ReplicaAccessor.  If there is a hardware-level or
+   * network-level setup operation that could fail, it should be done here.  If
+   * the implementation returns a ReplicaAccessor, we will assume that it works
+   * and not attempt to construct a normal BlockReader.
+   *
+   * If the ReplicaAccessor could not be built, implementations may wish to log
+   * a message at TRACE level indicating why.
+   *
+   * @return    null if the ReplicaAccessor could not be built; the
+   *                ReplicaAccessor otherwise.
+   */
+  public abstract ReplicaAccessor build();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7087e700/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index ccf5f4a..57bac8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -113,6 +113,9 @@ public interface HdfsClientConfigKeys {
       "dfs.datanode.hdfs-blocks-metadata.enabled";
   boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
 
+  static final String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
+      PREFIX + "replica.accessor.builder.classes";
+
   /** dfs.client.retry configuration properties */
   interface Retry {
     String PREFIX = HdfsClientConfigKeys.PREFIX + "retry.";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7087e700/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 52aed2f..869fdc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.util.DataChecksum;
@@ -83,6 +85,11 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
 
+import java.lang.Class;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * DFSClient configuration.
  */
@@ -126,6 +133,8 @@ public class DfsClientConf {
 
   private final long hedgedReadThresholdMillis;
   private final int hedgedReadThreadpoolSize;
+  private final List<Class<? extends ReplicaAccessorBuilder>>
+      replicaAccessorBuilderClasses;
 
   public DfsClientConf(Configuration conf) {
     // The hdfsTimeout is currently the same as the ipc timeout
@@ -231,8 +240,35 @@ public class DfsClientConf {
         HedgedRead.THRESHOLD_MILLIS_KEY,
         HedgedRead.THRESHOLD_MILLIS_DEFAULT);
     hedgedReadThreadpoolSize = conf.getInt(
-        HedgedRead.THREADPOOL_SIZE_KEY,
-        HedgedRead.THREADPOOL_SIZE_DEFAULT);
+        HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
+        HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
+
+    replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);
+  }
+
+  @SuppressWarnings("unchecked")
+  private List<Class<? extends ReplicaAccessorBuilder>>
+      loadReplicaAccessorBuilderClasses(Configuration conf)
+  {
+    String classNames[] = conf.getTrimmedStrings(
+        HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY);
+    if (classNames.length == 0) {
+      return Collections.emptyList();
+    }
+    ArrayList<Class<? extends ReplicaAccessorBuilder>> classes =
+        new ArrayList<Class<? extends ReplicaAccessorBuilder>>();
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    for (String className: classNames) {
+      try {
+        Class<? extends ReplicaAccessorBuilder> cls =
+          (Class<? extends ReplicaAccessorBuilder>)
+            classLoader.loadClass(className);
+        classes.add(cls);
+      } catch (Throwable t) {
+        LOG.warn("Unable to load " + className, t);
+      }
+    }
+    return classes;
   }
 
   private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -489,6 +525,14 @@ public class DfsClientConf {
   }
 
   /**
+   * @return the replicaAccessorBuilderClasses
+   */
+  public List<Class<? extends ReplicaAccessorBuilder>>
+        getReplicaAccessorBuilderClasses() {
+    return replicaAccessorBuilderClasses;
+  }
+
+  /**
    * @return the shortCircuitConf
    */
   public ShortCircuitConf getShortCircuitConf() {
@@ -738,4 +782,4 @@ public class DfsClientConf {
       return builder.toString();
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7087e700/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 5071d15..a091d41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -302,3 +302,7 @@ message OpBlockChecksumResponseProto {
   required bytes md5 = 3;
   optional ChecksumTypeProto crcType = 4;
 }
+
+message OpCustomProto {
+  required string customId = 1;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7087e700/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 96044e9..8517173 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -24,8 +24,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
+import java.util.List;
 
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,6 +56,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.unix.DomainSocket;
@@ -326,6 +331,10 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator
{
     BlockReader reader = null;
 
     Preconditions.checkNotNull(configuration);
+    reader = tryToCreateExternalBlockReader();
+    if (reader != null) {
+      return reader;
+    }
     final ShortCircuitConf scConf = conf.getShortCircuitConf();
     if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
       if (clientContext.getUseLegacyBlockReaderLocal()) {
@@ -362,6 +371,45 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator
{
     return getRemoteBlockReaderFromTcp();
   }
 
+  private BlockReader tryToCreateExternalBlockReader() {
+    List<Class<? extends ReplicaAccessorBuilder>> clses =
+        conf.getReplicaAccessorBuilderClasses();
+    for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
+      try {
+        ByteArrayDataOutput bado = ByteStreams.newDataOutput();
+        token.write(bado);
+        byte tokenBytes[] = bado.toByteArray();
+
+        Constructor<? extends ReplicaAccessorBuilder> ctor =
+            cls.getConstructor();
+        ReplicaAccessorBuilder builder = ctor.newInstance();
+        ReplicaAccessor accessor = builder.
+            setAllowShortCircuitReads(allowShortCircuitLocalReads).
+            setBlock(block.getBlockId(), block.getBlockPoolId()).
+            setBlockAccessToken(tokenBytes).
+            setClientName(clientName).
+            setConfiguration(configuration).
+            setFileName(fileName).
+            setVerifyChecksum(verifyChecksum).
+            setVisibleLength(length).
+            build();
+        if (accessor == null) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": No ReplicaAccessor created by " +
+                cls.getName());
+          }
+        } else {
+          return new ExternalBlockReader(accessor, length, startOffset);
+        }
+      } catch (Throwable t) {
+        LOG.warn("Failed to construct new object of type " +
+            cls.getName(), t);
+      }
+    }
+    return null;
+  }
+
+
   /**
    * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
    * This block reader implements the path-based style of local reads

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7087e700/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
new file mode 100644
index 0000000..e135d8e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+
+/**
+ * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
+ * replicas.
+ */
+@InterfaceAudience.Private
+public final class ExternalBlockReader implements BlockReader {
+  private final ReplicaAccessor accessor;
+  private final long visibleLength;
+  private long pos;
+
+  ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
+                      long startOffset) {
+    this.accessor = accessor;
+    this.visibleLength = visibleLength;
+    this.pos = startOffset;
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws IOException {
+    int nread = accessor.read(pos, buf, off, len);
+    pos += nread;
+    return nread;
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    int nread = accessor.read(pos, buf);
+    pos += nread;
+    return nread;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    // You cannot skip backwards
+    if (n <= 0) {
+      return 0;
+    }
+    // You can't skip past the end of the replica.
+    long oldPos = pos;
+    pos += n;
+    if (pos > visibleLength) {
+      pos = visibleLength;
+    }
+    return pos - oldPos;
+  }
+
+  @Override
+  public int available() throws IOException {
+    // We return the amount of bytes that we haven't read yet from the
+    // replica, based on our current position.  Some of the other block
+    // readers return a shorter length than that.  The only advantage to
+    // returning a shorter length is that the DFSInputStream will
+    // trash your block reader and create a new one if someone tries to
+    // seek() beyond the available() region.
+    long diff = visibleLength - pos;
+    if (diff > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    } else {
+      return (int)diff;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    accessor.close();
+  }
+
+  @Override
+  public void readFully(byte[] buf, int offset, int len) throws IOException {
+    BlockReaderUtil.readFully(this, buf, offset, len);
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return BlockReaderUtil.readAll(this, buf, offset, len);
+  }
+
+  @Override
+  public boolean isLocal() {
+    return accessor.isLocal();
+  }
+
+  @Override
+  public boolean isShortCircuit() {
+    return accessor.isShortCircuit();
+  }
+
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    // For now, pluggable ReplicaAccessors do not support zero-copy.
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7087e700/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
index cf8addf..3077498 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
@@ -37,7 +37,8 @@ public enum Op {
   TRANSFER_BLOCK((byte)86),
   REQUEST_SHORT_CIRCUIT_FDS((byte)87),
   RELEASE_SHORT_CIRCUIT_FDS((byte)88),
-  REQUEST_SHORT_CIRCUIT_SHM((byte)89);
+  REQUEST_SHORT_CIRCUIT_SHM((byte)89),
+  CUSTOM((byte)127);
 
   /** The code for this operation. */
   public final byte code;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7087e700/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index a6fbb29..d435543 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCustomProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7087e700/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
new file mode 100644
index 0000000..48d337b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.primitives.Ints;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.UUID;
+
+public class TestExternalBlockReader {
+  private static final Log LOG = LogFactory.getLog(TestExternalBlockReader.class);
+
+  private static long SEED = 1234;
+
+  @Test
+  public void testMisconfiguredExternalBlockReader() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY,
+        "org.apache.hadoop.hdfs.NonExistentReplicaAccessorBuilderClass");
+    conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1)
+        .build();
+    final int TEST_LENGTH = 2048;
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    try {
+      DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED);
+      FSDataInputStream stream = dfs.open(new Path("/a"));
+      byte buf[] = new byte[TEST_LENGTH];
+      IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
+      byte expected[] = DFSTestUtil.
+          calculateFileContentsFromSeed(SEED, TEST_LENGTH);
+      Assert.assertArrayEquals(expected, buf);
+      stream.close();
+    } finally {
+      dfs.close();
+      cluster.shutdown();
+    }
+  }
+
+  private static final String SYNTHETIC_BLOCK_READER_TEST_UUID_KEY =
+      "synthetic.block.reader.test.uuid.key";
+
+  private static final HashMap<String, LinkedList<SyntheticReplicaAccessor>>
+      accessors = new HashMap<String, LinkedList<SyntheticReplicaAccessor>>(1);
+
+  public static class SyntheticReplicaAccessorBuilder
+      extends ReplicaAccessorBuilder {
+    String fileName;
+    long blockId;
+    String blockPoolId;
+    boolean verifyChecksum;
+    String clientName;
+    boolean allowShortCircuit;
+    long visibleLength;
+    Configuration conf;
+
+    @Override
+    public ReplicaAccessorBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    @Override
+    public ReplicaAccessorBuilder setBlock(long blockId, String blockPoolId) {
+      this.blockId = blockId;
+      this.blockPoolId = blockPoolId;
+      return this;
+    }
+
+    @Override
+    public ReplicaAccessorBuilder setVerifyChecksum(boolean verifyChecksum) {
+      this.verifyChecksum = verifyChecksum;
+      return this;
+    }
+
+    @Override
+    public ReplicaAccessorBuilder setClientName(String clientName) {
+      this.clientName = clientName;
+      return this;
+    }
+
+    @Override
+    public ReplicaAccessorBuilder setAllowShortCircuitReads(boolean allowShortCircuit) {
+      this.allowShortCircuit = allowShortCircuit;
+      return this;
+    }
+
+    @Override
+    public ReplicaAccessorBuilder setVisibleLength(long visibleLength) {
+      this.visibleLength = visibleLength;
+      return this;
+    }
+
+    @Override
+    public ReplicaAccessorBuilder setConfiguration(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    @Override
+    public ReplicaAccessorBuilder setBlockAccessToken(byte[] token) {
+      return this;
+    }
+
+    @Override
+    public ReplicaAccessor build() {
+      if (visibleLength < 1024) {
+        LOG.info("SyntheticReplicaAccessorFactory returning null for a " +
+            "smaller replica with size " + visibleLength); //trace
+        return null;
+      }
+      return new SyntheticReplicaAccessor(this);
+    }
+  }
+
+  public static class SyntheticReplicaAccessor extends ReplicaAccessor {
+    final long length;
+    final byte contents[];
+    final SyntheticReplicaAccessorBuilder builder;
+    long totalRead = 0;
+    int numCloses = 0;
+    String error = "";
+    String prefix = "";
+
+    SyntheticReplicaAccessor(SyntheticReplicaAccessorBuilder builder) {
+      this.length = builder.visibleLength;
+      this.contents = DFSTestUtil.
+          calculateFileContentsFromSeed(SEED, Ints.checkedCast(length));
+      this.builder = builder;
+      String uuid = this.builder.conf.
+          get(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY);
+      LinkedList<SyntheticReplicaAccessor> accessorsList =
+          accessors.get(uuid);
+      if (accessorsList == null) {
+        accessorsList = new LinkedList<SyntheticReplicaAccessor>();
+      }
+      accessorsList.add(this);
+      accessors.put(uuid, accessorsList);
+    }
+
+    @Override
+    public synchronized int read(long pos, byte[] buf, int off, int len)
+        throws IOException {
+      if (pos > Integer.MAX_VALUE) {
+        return 0;
+      } else if (pos < 0) {
+        addError("Attempted to read from a location that was less " +
+            "than 0 at " + pos);
+        return 0;
+      }
+      int i = 0, nread = 0;
+      for (int ipos = (int)pos;
+           (ipos < contents.length) && (nread < len);
+           ipos++) {
+        buf[i++] = contents[ipos];
+        nread++;
+        totalRead++;
+        LOG.info("ipos = " + ipos + ", contents.length = " + contents.length + ", nread =
" + nread + ", len = " + len);
+      }
+      return nread;
+    }
+
+    @Override
+    public synchronized int read(long pos, ByteBuffer buf) throws IOException {
+      if (pos > Integer.MAX_VALUE) {
+        return 0;
+      } else if (pos < 0) {
+        addError("Attempted to read from a location that was less " +
+            "than 0 at " + pos);
+        return 0;
+      }
+      int i = 0, nread = 0;
+      for (int ipos = (int)pos;
+           ipos < contents.length; ipos++) {
+        try {
+          buf.put(contents[ipos]);
+        } catch (BufferOverflowException bos) {
+          break;
+        }
+        nread++;
+        totalRead++;
+      }
+      return nread;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+      numCloses++;
+    }
+
+    @Override
+    public boolean isLocal() {
+      return true;
+    }
+
+    @Override
+    public boolean isShortCircuit() {
+      return true;
+    }
+
+    synchronized String getError() {
+      return error;
+    }
+
+    synchronized void addError(String text) {
+      LOG.error("SyntheticReplicaAccessor error: " + text);
+      error = error + prefix + text;
+      prefix = "; ";
+    }
+  }
+
+  @Test
+  public void testExternalBlockReader() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY,
+        SyntheticReplicaAccessorBuilder.class.getName());
+    conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    String uuid = UUID.randomUUID().toString();
+    conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1)
+        .build();
+    final int TEST_LENGTH = 2047;
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    try {
+      DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED);
+      HdfsDataInputStream stream =
+          (HdfsDataInputStream)dfs.open(new Path("/a"));
+      byte buf[] = new byte[TEST_LENGTH];
+      IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
+      byte expected[] = DFSTestUtil.
+          calculateFileContentsFromSeed(SEED, TEST_LENGTH);
+      ReadStatistics stats = stream.getReadStatistics();
+      Assert.assertEquals(1024, stats.getTotalShortCircuitBytesRead());
+      Assert.assertEquals(2047, stats.getTotalLocalBytesRead());
+      Assert.assertEquals(2047, stats.getTotalBytesRead());
+      Assert.assertArrayEquals(expected, buf);
+      stream.close();
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, new Path("/a"));
+      Assert.assertNotNull(block);
+      LinkedList<SyntheticReplicaAccessor> accessorList = accessors.get(uuid);
+      Assert.assertNotNull(accessorList);
+      Assert.assertEquals(2, accessorList.size());
+      SyntheticReplicaAccessor accessor = accessorList.get(0);
+      Assert.assertTrue(accessor.builder.allowShortCircuit);
+      Assert.assertEquals(block.getBlockPoolId(),
+          accessor.builder.blockPoolId);
+      Assert.assertEquals(block.getBlockId(),
+          accessor.builder.blockId);
+      Assert.assertEquals(dfs.getClient().clientName,
+          accessor.builder.clientName);
+      Assert.assertEquals("/a", accessor.builder.fileName);
+      Assert.assertTrue(accessor.builder.verifyChecksum);
+      Assert.assertEquals(1024L, accessor.builder.visibleLength);
+      Assert.assertEquals(1024L, accessor.totalRead);
+      Assert.assertEquals("", accessor.getError());
+      Assert.assertEquals(1, accessor.numCloses);
+      accessors.remove(uuid);
+    } finally {
+      dfs.close();
+      cluster.shutdown();
+    }
+  }
+}


Mime
View raw message