hadoop-common-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aaron Fabbri (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HADOOP-12666) Support Microsoft Azure Data Lake - as a file system in Hadoop
Date Tue, 23 Feb 2016 06:12:19 GMT

    [ https://issues.apache.org/jira/browse/HADOOP-12666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15158357#comment-15158357 ] 

Aaron Fabbri commented on HADOOP-12666:
---------------------------------------

Thank your for your contributions.  This is a large patch with some dense spots,
which makes it hard for folks to get time to review properly.  In the future you
should break up the work into multiple commits and associate patches with jira
subtasks.  This will make your life easier as well.

Summary of issues, this round:

1. Still some parts I haven't carefully reviewed due to size of patch.
2. FileStatusCacheManager seems to have local race conditions and zero
intra-node coherency.
3. Seems like abuse of volatile / lack of locking in BatchByteArrayInputStream.
4. How do Hadoop folks feel about this hadoop-tools/hadoop-azure-datalake code
declaring classes in the hadoop.hdfs.web package?  I feel it needs cleanup.
5. Still need to put config parms in core-default.xml and make names lower case.

There are a bunch of other comments / questions inline below.  Search for "AF>"

{quote}
<snip>
<snip>
diff --git hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/FileStatusCacheManager.java hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/FileStatusCacheManager.java
new file mode 100644
index 0000000..fd6a2ff
--- /dev/null
+++ hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/FileStatusCacheManager.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
<snip>
+ * ACID properties are maintained in overloaded api in @see
+ * PrivateAzureDataLakeFileSystem class.
+ */
+public final class FileStatusCacheManager {
+  private static final FileStatusCacheManager FILE_STATUS_CACHE_MANAGER = new
+      FileStatusCacheManager();
+  private Map<String, FileStatusCacheObject> syncMap = null;
+
+  /**
+   * Constructor.
+   */
+  private FileStatusCacheManager() {

AF> This class seems to have serious issues that need addressing:

1. Local race conditions in caller PrivateAzureDataLakeFileSystem
2. No mechanism for cache invalidation across nodes in the cluster.

+    LinkedHashMap<String, FileStatusCacheObject> map = new
+        LinkedHashMap<String, FileStatusCacheObject>() {
+
+      private static final int MAX_ENTRIES = 5000;
+
+      @Override
+      protected boolean removeEldestEntry(Map.Entry eldest) {
<snip>
diff --git hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/FileStatusCacheObject.java hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/FileStatusCacheObject.java
new file mode 100644
index 0000000..5316443
--- /dev/null
+++ hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/FileStatusCacheObject.java
@@ -0,0 +1,59 @@
<snip>
diff --git hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLake.java hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLake.java
new file mode 100644
index 0000000..a0ca4a9
--- /dev/null
+++ hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLake.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
<snip>
+
+/**
+ * Create ADL filesystem delegation with Swebhdfs scheme. Intent to use by
+ * AdlFileSystem only.
+ */

AF> Update comment?  This uses "adl" scheme, right?

+public class PrivateAzureDataLake extends DelegateToFileSystem {
+  public static final int DEFAULT_PORT = 443;

AF> What is this class used for?  I didn't see any uses.

+
+  PrivateAzureDataLake(URI theUri, Configuration conf)
+      throws IOException, URISyntaxException {
+    super(theUri, createFileSystem(conf), conf,
+        PrivateAzureDataLakeFileSystem.SCHEME, false);
<snip>
diff --git hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
new file mode 100644
index 0000000..db4a83c
--- /dev/null
+++ hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
@@ -0,0 +1,1516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
<snip>
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;
+
AF> Care to comment why this is in the ..hdfs.web package instead of fs.adl?
It lives in hadoop-tools/hadoop-azure-datalake in the source tree.

+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
<snip>
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Extended @see SWebHdfsFileSystem API. This class contains Azure data lake
+ * specific stability, Reliability and performance improvement.
+ * <p>
+ * Motivation behind PrivateAzureDataLakeFileSystem

AF> ?

+ */
+public class PrivateAzureDataLakeFileSystem extends SWebHdfsFileSystem {
+
+  public static final String SCHEME = "adl";
+  /**
+   * Process wide thread pool for data lake file system.
+   * Threads need to be daemon so that they dont prevent the process from
+   * exiting
+   */
+  private static final ExecutorService EXECUTOR = Executors
+      .newCachedThreadPool(new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+          Thread t = Executors.defaultThreadFactory().newThread(r);
+          t.setDaemon(true);
+          return t;
+        }
+      });
+  private static String hostName = null;
+  private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
+  // Feature configuration
+  // Publicly Exposed
<snip>
+
+  /**
+   * Need to override default getHomeDirectory implementation due to
+   * HDFS-8542 causing MR jobs to fail in initial

AF> Due to the bug or due to the fix?  The fix was merged in 2.8.0, right?

+   * phase. Constructing home directory locally is fine as long as hadoop
+   * local user name and ADL user name relation
+   * ship is not agreed upon.

AF> I'm not understanding this last sentence, can you explain?

+   *
+   * @return Hadoop user home directory.
+   */
+  @Override
+  public final Path getHomeDirectory() {
+    try {
+      return makeQualified(new Path(
+          "/user/" + UserGroupInformation.getCurrentUser().getShortUserName()));
+    } catch (IOException e) {
+    }
+
+    return new Path("/user/" + userName);
+  }
+
<snip>
+   */
+  @Override
+  public final boolean setReplication(final Path p, final short replication)
+      throws IOException {
+    return true;
+  }
+
+  /**
+   * Invoked parent setTimes default implementation only.
+   *
+   * Removes cached FileStatus entry to maintain latest information on the
+   * FileStatus instance
+   *
+   * @param p     File/Folder path
+   * @param mtime Modification time
+   * @param atime Access time
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public final void setTimes(final Path p, final long mtime, final long atime)
+      throws IOException {
+    if (featureCacheFileStatus) {
+      String filePath = p.isAbsoluteAndSchemeAuthorityNull() ?
+          getUri() + p.toString() :
+          p.toString();
+      fileStatusCacheManager.remove(new Path(filePath));
+    }
+    super.setTimes(p, mtime, atime);
+  }
+
+  /**
+   * Invokes parent setPermission default implementation only.
+   *
+   * Removes cached FileStatus entry to maintain latest information on the
+   * FileStatus instance
+   *
+   * @param p          File/Folder path
+   * @param permission Instance FsPermission. Octal values
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public final void setPermission(final Path p, final FsPermission permission)
+      throws IOException {
+    if (featureCacheFileStatus) {
+      String filePath = p.isAbsoluteAndSchemeAuthorityNull() ?
+          getUri() + p.toString() :
+          p.toString();
+      fileStatusCacheManager.remove(new Path(filePath));
+    }
+    super.setPermission(p, permission);
+  }
+
+  /**
+   * Avoid call to Azure data lake backend system. Look in the local cache if
+   * FileStatus from the previous call has
+   * already been cached.
+   *
+   * Cache lookup is default enable. and can be set using configuration.
+   *
+   * @param f File/Folder path
+   * @return FileStatus instance containing metadata information of f
+   * @throws IOException For any system error
+   */
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    statistics.incrementReadOps(1);
+    FileStatus status = null;
+    if (featureCacheFileStatus) {
+      status = fileStatusCacheManager.get(makeQualified(f));
+    }
+
+    if (status == null) {
+      status = super.getFileStatus(f);
+    } else {
+      ADLLogger.log("Cached Instance Found : " + status.getPath());
+    }
+
+    if (featureCacheFileStatus) {
+      if (fileStatusCacheManager.get(makeQualified(f)) == null) {
+        fileStatusCacheManager.put(status, featureCacheFileStatusDuration);
+      }
+    }

AF> Is this a race condition?

thread 1> getFileStatus(), cache miss
          super.getStatus -> s1
          cache.get() -> null

thread 2> delete()
          cache.clear()

thread 1> cache.put(s1)

Maybe provide an atomic putIfAbsent() for FileStatusCacheManager.  You can
synchronize on the underlying map object I believe (see
Collections.synchronizedMap()).
+
+    if (overrideOwner) {
+      FileStatus proxiedStatus = new FileStatus(status.getLen(),
+          status.isDirectory(), status.getReplication(), status.getBlockSize(),
+          status.getModificationTime(), status.getAccessTime(),
+          status.getPermission(), userName, "hdfs", status.getPath());
+      return proxiedStatus;
+    } else {
+      return status;
+    }
+  }
+
+  /**
+   * Invokes parent delete() default implementation only.
+   *
+   * Removes cached FileStatus entry to maintain latest information on the
+   * FileStatus instance
+   *
+   * @param f         File/Folder path
+   * @param recursive true if the contents within folder needs to be removed
+   *                  as well
+   * @return true if the delete operation is successful other false.
+   * @throws IOException For any system exception
+   */
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    if (featureCacheFileStatus) {
+      FileStatus fs = fileStatusCacheManager.get(makeQualified(f));
+      if (fs != null && fs.isFile()) {
+        fileStatusCacheManager.remove(makeQualified(f));
+      } else {
+        fileStatusCacheManager.clear();

AF> Seems like there is a less-likely race condition here.  (f is replaced by
a directory after checking fs.isFile())

+      }
+    }
+    instrumentation.fileDeleted();
+    return super.delete(f, recursive);
+  }
+
+  /**
+   * Invokes parent rename default implementation only.
+   *
+   * Removes cached FileStatus entry to maintain latest information on the
+   * FileStatus instance     *
+   *
+   * @param src Source path
+   * @param dst Destination path
+   * @return True if the rename operation is successful otherwise false
+   * @throws IOException For any system error.
+   */
+  @Override
+  public boolean rename(final Path src, final Path dst) throws IOException {
+    if (featureCacheFileStatus) {
+      FileStatus fsSrc = fileStatusCacheManager.get(makeQualified(src));
+      FileStatus fsDst = fileStatusCacheManager.get(makeQualified(dst));
+
+      if ((fsSrc != null && !fsSrc.isFile()) || (fsDst != null && !fsDst
+          .isFile())) {
+        fileStatusCacheManager.clear();
+      } else {
+        fileStatusCacheManager.remove(makeQualified(src));
+        fileStatusCacheManager.remove(makeQualified(dst));
+      }
+    }
+    return super.rename(src, dst);

AF> Similar pattern of get/mutate non-atomically repeats here and below.

+  }
+
+  /**
+   * Overloaded version of rename. Invokes parent rename implementation only.
+   *
+   * Removes cached FileStatus entry to maintain latest information on the
+   * FileStatus instance
+   *
+   * @param src     Source path
+   * @param dst     Desitnation path
+   * @param options Defined in webhdfs specification
+   * @throws IOException For system error
+   */
+  @Override
+  public void rename(Path src, Path dst, Options.Rename... options)
+      throws IOException {
+    if (featureCacheFileStatus) {
+      FileStatus fsSrc = fileStatusCacheManager.get(makeQualified(src));
+      FileStatus fsDst = fileStatusCacheManager.get(makeQualified(dst));
+
+      if ((fsSrc != null && !fsSrc.isFile()) || (fsDst != null && !fsDst
+          .isFile())) {
+        fileStatusCacheManager.clear();
+      } else {
+        fileStatusCacheManager.remove(makeQualified(src));
+        fileStatusCacheManager.remove(makeQualified(dst));
+      }
+    }
+    super.rename(src, dst, options);
+  }
+
+  /**
+   * Invokes parent append default implementation
+   *
+   * Removes cached FileStatus entry to maintain latest information on the
+   * FileStatus instance.
+   *
+   * @param f Stream path
+   * @return Output stream.
+   * @throws IOException For system error
+   */
+  @Override
+  public FSDataOutputStream append(Path f) throws IOException {
+    String filePath = makeQualified(f).toString();
+    fileStatusCacheManager.remove(new Path(filePath));
+    return super.append(f);
+  }
+
+  /**
+   * Removes cached FileStatus entry to maintain latest information on the
+   * FileStatus instance.
+   *
+   * @param f          Existing file path
+   * @param bufferSize Size of the buffer
+   * @param progress   Progress indicator
+   * @return FSDataOutputStream OutputStream on which application can push
+   * stream of bytes
+   * @throws IOException For any system exception
+   */
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    String filePath = makeQualified(f).toString();
+    fileStatusCacheManager.remove(new Path(filePath));
+    return super.append(f, bufferSize, progress);
+  }
+
+  /**
+   * Removes cached FileStatus entry to maintain latest information on the
+   * FileStatus instance.
+   *
+   * @param trg  Target file path
+   * @param srcs List of sources to be concatinated. ADL concatinate in the
+   *             same order passed as parameter.
+   * @throws IOException For any system exception
+   */
+  @Override
+  public void concat(final Path trg, final Path[] srcs) throws IOException {
+    if (featureCacheFileStatus) {
+      String filePath = trg.isAbsoluteAndSchemeAuthorityNull() ?
+          getUri() + trg.toString() :
+          trg.toString();
+      fileStatusCacheManager.remove(new Path(filePath));
+      for (int i = 0; i < srcs.length; ++i) {
+        filePath = srcs[0].isAbsoluteAndSchemeAuthorityNull() ?
+            getUri() + srcs[0].toString() :
+            srcs[0].toString();
+        fileStatusCacheManager.remove(new Path(filePath));
+      }
+    }
+    super.concat(trg, srcs);
+  }
+
<snip>
+   * failures.
+   * 2. Performance boost to jobs which are slow writer, avoided network latency
+   * 3. ADL equally better performing with multiple of 4MB chunk as append
+   * calls.
+   *
+   * @param f           File path
+   * @param permission  Access perfrmission for the newly created file

AF> typo

+   * @param overwrite   Remove existing file and recreate new one if true
+   *                    otherwise throw error if file exist
+   * @param bufferSize  Buffer size, ADL backend does not honour
+   * @param replication Replication count, ADL backen does not hounour
+   * @param blockSize   Block size, ADL backend does not honour
+   * @param progress    Progress indicator
+   * @return FSDataOutputStream OutputStream on which application can push
+   * stream of bytes
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public FSDataOutputStream create(final Path f, final FsPermission permission,
+      final boolean overwrite, final int bufferSize, final short replication,
+      final long blockSize, final Progressable progress) throws IOException {
+    statistics.incrementWriteOps(1);
+    // Increment the counter
+    instrumentation.fileCreated();
+
+    if (featureCacheFileStatus) {
+      fileStatusCacheManager.remove(makeQualified(f));
+    }
+
+    return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
+        new PermissionParam(applyUMask(permission)),
+        new OverwriteParam(overwrite), new BufferSizeParam(bufferSize),
+        new ReplicationParam(replication), new BlockSizeParam(blockSize),
+        new ADLVersionInfo(getADLEnabledFeatureSet())), statistics) {
+    };
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public FSDataOutputStream createNonRecursive(final Path f,
+      final FsPermission permission, final EnumSet<CreateFlag> flag,
+      final int bufferSize, final short replication, final long blockSize,
+      final Progressable progress) throws IOException {
+    statistics.incrementWriteOps(1);
+    // Increment the counter
+    instrumentation.fileCreated();
+
+    if (featureCacheFileStatus) {
+      String filePath = makeQualified(f).toString();
+      fileStatusCacheManager.remove(new Path(filePath));
+    }
+
+    String leaseId = java.util.UUID.randomUUID().toString();
+    return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
+        new PermissionParam(applyUMask(permission)), new CreateFlagParam(flag),
+        new CreateParentParam(false), new BufferSizeParam(bufferSize),
+        new ReplicationParam(replication), new LeaseParam(leaseId),
+        new BlockSizeParam(blockSize),
+        new ADLVersionInfo(getADLEnabledFeatureSet())), statistics) {
+    };
+  }
+
+  /**
+   * Since defined as private in parent class, redefined to pass through
+   * Create api implementation.
+   *
+   * @param permission
+   * @return FsPermission list
+   */
+  private FsPermission applyUMask(FsPermission permission) {
+    FsPermission fsPermission = permission;
+    if (fsPermission == null) {
+      fsPermission = FsPermission.getDefault();
+    }
+    return fsPermission.applyUMask(FsPermission.getUMask(getConf()));
+  }
+
+  /**
+   * Open call semantic is handled differently in case of ADL. Instead of
+   * network stream is returned to the user,
+   * Overridden FsInputStream is returned.
+   *
+   * 1. No dedicated connection to server.
+   * 2. Process level concurrent read ahead Buffering is done, This allows
+   * data to be available for caller quickly.
+   * 3. Number of byte to read ahead is configurable.
+   *
+   * Advantage of Process level concurrent read ahead Buffering semantics is
+   * 1. ADL backend server does not allow idle connection for longer duration
+   * . In case of slow reader scenario,
+   * observed connection timeout/Connection reset causing occasional job
+   * failures.

AF> Did you guys consider handling this as transparently reconnecting, instead
of doing separate connections for each op?  Seems like performance would be alot
better?

+   * 2. Performance boost to jobs which are slow reader, avoided network latency

AF> I'd expect you to want a connection per-thread, instead of per-op.

+   * 3. Compressed format support like ORC, and large data files gains the
+   * most out of this implementation.
+   *
+   * Read ahead feature is configurable.
+   *
+   * @param f          File path
+   * @param buffersize Buffer size
+   * @return FSDataInputStream InputStream on which application can read
+   * stream of bytes
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public FSDataInputStream open(final Path f, final int buffersize)
+      throws IOException {
+    long statContructionTime = System.currentTimeMillis();
+    statistics.incrementReadOps(1);
+
+    ADLLogger.log("statistics report Time " + (System.currentTimeMillis()
+        - statContructionTime));
+
+    final HttpOpParam.Op op = GetOpParam.Op.OPEN;
+    // use a runner so the open can recover from an invalid token
+    FsPathConnectionRunner runner = null;
+
+    if (featureConcurrentReadWithReadAhead) {
+      long urlContructionTime = System.currentTimeMillis();
+      URL url = this.toUrl(op, f, new BufferSizeParam(buffersize),
+          new ReadADLNoRedirectParam(true),
+          new ADLVersionInfo(getADLEnabledFeatureSet()));
+      ADLLogger.log("URL Construction Time " + (System.currentTimeMillis()
+          - urlContructionTime));
+
+      long bbContructionTime = System.currentTimeMillis();
+      BatchByteArrayInputStream bb = new BatchByteArrayInputStream(url, f,
+          maxBufferSize, maxConcurrentConnection);
+      ADLLogger.log("BatchByteArrayInputStream Construction Time " + (
+          System.currentTimeMillis() - bbContructionTime));
+
+      long finContructionTime = System.currentTimeMillis();
+      FSDataInputStream fin = new FSDataInputStream(bb);
+      ADLLogger.log(
+          "FSDataInputStream Construction Time " + (System.currentTimeMillis()
+              - finContructionTime));

AF> This case could use some perf optimization.  e.g. Three calls to get system
time.

+      return fin;
+    } else {
+      if (featureRedirectOff) {
+        long urlContructionTime = System.currentTimeMillis();
+        runner = new FsPathConnectionRunner(ADLGetOpParam.Op.OPEN, f,
+            new BufferSizeParam(buffersize), new ReadADLNoRedirectParam(true),
+            new ADLVersionInfo(getADLEnabledFeatureSet()));
+        ADLLogger.log("Runner Construction Time " + (System.currentTimeMillis()
+            - urlContructionTime));

AF> How about adding ADLLogger.logWithTimestamp().  That way, if the logger is
disabled, you don't keep getting system time.

+      } else {
+        runner = new FsPathConnectionRunner(op, f,
+            new BufferSizeParam(buffersize));
+      }
+
+      return new FSDataInputStream(
+          new OffsetUrlInputStream(new UnresolvedUrlOpener(runner),
+              new OffsetUrlOpener(null)));
+    }
+  }
+
+  /**
+   * On successful response from the server, @see FileStatusCacheManger is
+   * updated with FileStatus objects.
+   *
+   * @param f File/Folder path
+   * @return FileStatus array list
+   * @throws IOException For system error
+   */
+  @Override
+  public FileStatus[] listStatus(final Path f) throws IOException {
+    FileStatus[] fileStatuses = super.listStatus(f);
+    for (int i = 0; i < fileStatuses.length; i++) {
+      if (featureCacheFileStatus) {
+        fileStatusCacheManager
+            .put(fileStatuses[i], featureCacheFileStatusDuration);
+      }
+
+      if (overrideOwner) {
+        fileStatuses[i] = new FileStatus(fileStatuses[i].getLen(),
+            fileStatuses[i].isDirectory(), fileStatuses[i].getReplication(),
+            fileStatuses[i].getBlockSize(),
+            fileStatuses[i].getModificationTime(),
+            fileStatuses[i].getAccessTime(), fileStatuses[i].getPermission(),
+            userName, "hdfs", fileStatuses[i].getPath());
+      }
+    }
+    return fileStatuses;
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(final FileStatus status,
+      final long offset, final long length) throws IOException {
+    if (status == null) {
+      return null;
+    }
+
+    if (featureGetBlockLocationLocallyBundled) {
+      if ((offset < 0) || (length < 0)) {
+        throw new IllegalArgumentException("Invalid start or len parameter");
+      }
+
+      if (status.getLen() < offset) {
+        if (ADLLogger.isLogEnabled()) {

AF> Redundant check of isLogEnabled()

+          ADLLogger.log("getFileBlockLocations : Returning 1 block");
+        }
+        return new BlockLocation[0];
+      }
+
+      final String[] name = {"localhost"};
+      final String[] host = {"localhost"};

AF> Just use "name" twice instead of defining host?

+      long blockSize = ADLConfKeys.DEFAULT_EXTENT_SIZE;
+      if (blockSize <= 0) {

AF> Why the runtime check of a compile-time constant?  How about just add a
comment near the definition "must be non-zero"

+        throw new IllegalArgumentException(
+            "The block size for the given file is not a positive number: "
+                + blockSize);
+      }
+      int numberOfLocations =
+          (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
+      BlockLocation[] locations = new BlockLocation[numberOfLocations];
+      for (int i = 0; i < locations.length; i++) {
+        long currentOffset = offset + (i * blockSize);
+        long currentLength = Math
+            .min(blockSize, offset + length - currentOffset);
+        locations[i] = new BlockLocation(name, host, currentOffset,
+            currentLength);
+      }
+
+      if (ADLLogger.isLogEnabled()) {

AF> Redundant check of isLogEnabled()

+        ADLLogger.log("getFileBlockLocations : Returning " + locations.length
+            + " Blocks");
+      }
+
+      return locations;
+
+    } else {
+      return getFileBlockLocations(status.getPath(), offset, length);
+    }
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
+      final long length) throws IOException {
+    statistics.incrementReadOps(1);
+
+    if (featureGetBlockLocationLocallyBundled) {
+      FileStatus fileStatus = getFileStatus(p);
+      return getFileBlockLocations(fileStatus, offset, length);
+    } else {
+      return super.getFileBlockLocations(p, offset, length);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    super.close();
+    AdlFileSystemMetricsSystem.unregisterSource(metricsSourceName);
+    AdlFileSystemMetricsSystem.fileSystemClosed();
+  }
+
+  private String getADLEnabledFeatureSet() {
+    // TODO : Implement current feature set enabed for the instance.
+    // example cache file status, reah ahead ..
+    return ADLConfKeys.LOG_VERSION;
+  }
+
+  enum StreamState {
+    Initial,
+    DataCachedInLocalBuffer,
+    StreamEnd
+  }
+
+  class BatchAppendOutputStream extends OutputStream {
+    private Path fsPath;
+    private Param<?, ?>[] parameters;
+    private byte[] data = null;
+    private int offset = 0;
+    private long length = 0;
+    private boolean eof = false;
+    private boolean hadError = false;
+    private int bufferIndex = 0;
+    private byte[][] dataBuffers = new byte[2][];
+    private int bufSize = 0;
+    private Future<Object> flushTask = null;
+
+    public BatchAppendOutputStream(Path path, int bufferSize,
+        Param<?, ?>... param) throws IOException {
+      if (bufferSize < (ADLConfKeys.DEFAULT_BLOCK_SIZE)) {
+        bufSize = ADLConfKeys.DEFAULT_BLOCK_SIZE;
+      } else {
+        bufSize = bufferSize;
+      }
+
+      this.fsPath = path;
+      this.parameters = param;
+      this.data = getBuffer();
+      FSDataOutputStream createStream = null;
+      try {
+        if (featureRedirectOff) {
+          CreateADLNoRedirectParam skipRedirect = new CreateADLNoRedirectParam(
+              true);
+          Param<?, ?>[] tmpParam = featureFlushWhenEOF ?
+              new Param<?, ?>[param.length + 2] :
+              new Param<?, ?>[param.length + 1];
+          System.arraycopy(param, 0, tmpParam, 0, param.length);
+          tmpParam[param.length] = skipRedirect;
+          if (featureFlushWhenEOF) {
+            tmpParam[param.length + 1] = new ADLFlush(false);
+          }
+          createStream = new FsPathOutputStreamRunner(ADLPutOpParam.Op.CREATE,
+              fsPath, 1, tmpParam).run();
+        } else {
+          createStream = new FsPathOutputStreamRunner(PutOpParam.Op.CREATE,
+              fsPath, 1, param).run();
+        }
+      } finally {
+        if (createStream != null) {
+          createStream.close();
+        }
+      }
+    }
+
+    @Override
+    public final synchronized void write(int b) throws IOException {
+      if (offset == (data.length)) {
+        flush();
+      }
+
+      data[offset] = (byte) b;
+      offset++;
+
+      // Statistics will get incremented again as part of the batch updates,
+      // decrement here to avoid double value
+      if (statistics != null) {
+        statistics.incrementBytesWritten(-1);
+      }
+    }
+
+    @Override
+    public final synchronized void write(byte[] buf, int off, int len)
+        throws IOException {
+      int bytesToWrite = len;
+      int localOff = off;
+      int localLen = len;
+      if (localLen >= data.length) {
+        // Flush data that is already in our internal buffer
+        flush();
+
+        // Keep committing data until we have less than our internal buffers
+        // length left
+        do {
+          try {
+            commit(buf, localOff, data.length, eof);
+          } catch (IOException e) {
+            hadError = true;
+            throw e;
+          }
+          localOff += data.length;
+          localLen -= data.length;
+        } while (localLen >= data.length);
+      }
+
+      // At this point, we have less than data.length left to copy from users
+      // buffer
+      if (offset + localLen >= data.length) {
+        // Users buffer has enough data left to fill our internal buffer
+        int bytesToCopy = data.length - offset;
+        System.arraycopy(buf, localOff, data, offset, bytesToCopy);
+        offset += bytesToCopy;
+
+        // Flush our internal buffer asynchronously
+        flushAsync();
+        localOff += bytesToCopy;
+        localLen -= bytesToCopy;
+      }
+
+      if (localLen > 0) {
+        // Simply copy the remainder from the users buffer into our internal
+        // buffer
+        System.arraycopy(buf, localOff, data, offset, localLen);
+        offset += localLen;
+      }
+
+      // Statistics will get incremented again as part of the batch updates,
+      // decrement here to avoid double value
+      if (statistics != null) {
+        statistics.incrementBytesWritten(-bytesToWrite);
+      }
+      instrumentation.rawBytesUploaded(bytesToWrite);
+    }
+
+    @Override
+    public final synchronized void flush() throws IOException {
+      waitForOutstandingFlush();
+      if (offset > 0) {
+        try {
+          commit(data, 0, offset, eof);
+        } catch (IOException e) {
+          hadError = true;
+          throw e;
+        }
+      }
+
+      offset = 0;
+    }
+
+    @Override
+    public final synchronized void close() throws IOException {
+      //TODO : 2ns call should not cause any error and no network calls.
+      if (featureRedirectOff) {
+        eof = true;
+      }
+
+      boolean flushedSomething = false;
+      if (hadError) {
+        // No point proceeding further since the error has occurered and
+        // stream would be required to upload again.
+        return;
+      } else {
+        flushedSomething = offset > 0;
+        flush();
+      }
+
+      if (featureRedirectOff) {
+        // If we didn't flush anything from our internal buffer, we have to
+        // call the service again
+        // with an empty payload and flush=true in the url
+        if (!flushedSomething) {
+          commit(null, 0, ADLConfKeys.KB, true);
+        }
+      }
+
+      ADLLogger.log(" Total bytes Written : " + (length) + " [" + fsPath + "]");
+    }
+
+    private void commit(byte[] buffer, int off, int len, boolean endOfFile)
+        throws IOException {
+      OutputStream out = null;
+      try {
+        if (featureRedirectOff) {
+          AppendADLNoRedirectParam skipRedirect = new AppendADLNoRedirectParam(
+              true);
+          Param<?, ?>[] tmpParam = featureFlushWhenEOF ?
+              new Param<?, ?>[parameters.length + 3] :
+              new Param<?, ?>[parameters.length + 1];
+          System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+          tmpParam[parameters.length] = skipRedirect;
+          if (featureFlushWhenEOF) {
+            tmpParam[parameters.length + 1] = new ADLFlush(endOfFile);
+            tmpParam[parameters.length + 2] = new OffsetParam(length);
+          }
+
+          out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
+              len, tmpParam).run();
+        } else {
+          out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
+              len, parameters).run();
+        }
+
+        if (buffer != null) {
+          fileStatusCacheManager.remove(fsPath);
+          out.write(buffer, off, len);
+          length += len;
+        }
+      } finally {
+        if (out != null) {
+          out.close();
+        }
+      }
+    }
+
+    private void flushAsync() throws IOException {
+      if (offset > 0) {
+        waitForOutstandingFlush();
+
+        // Submit the new flush task to the executor
+        flushTask = EXECUTOR.submit(new CommitTask(data, offset, eof));
+
+        // Get a new internal buffer for the user to write
+        data = getBuffer();
+        offset = 0;
+      }
+    }
+
+    private void waitForOutstandingFlush() throws IOException {
+      if (flushTask != null) {
+        try {
+          flushTask.get();
+        } catch (InterruptedException ex) {
+          throw new IOException(ex);
+        } catch (ExecutionException ex) {
+          // Wrap the ExecutionException in an IOException for callers can
+          // only handle IOException
+          throw new IOException(ex);
+        } finally {
+          flushTask = null;
+        }
+      }
+    }
+
+    private byte[] getBuffer() {
+      // Switch between the first and second buffer
+      if (bufferIndex == 0) {
+        if (dataBuffers[0] == null) {
+          dataBuffers[0] = new byte[bufSize];
+        }
+
+        bufferIndex = 1;
+        return dataBuffers[0];
+      } else {
+        if (dataBuffers[1] == null) {
+          dataBuffers[1] = new byte[bufSize];
+        }
+
+        bufferIndex = 0;
+        return dataBuffers[1];
+      }
+    }
+
+    public class CommitTask implements Callable<Object> {
+      private byte[] buff;
+      private int len;
+      private boolean eof;
+
+      public CommitTask(byte[] buffer, int size, boolean isEnd) {
+        buff = buffer;
+        len = size;
+        eof = isEnd;
+      }
+
+      public final Object call() throws IOException {
+        commit(buff, 0, len, eof);
+        return null;
+      }
+    }
+  }
+
+  @SuppressWarnings("checkstyle:javadocmethod")
+  /**
+   * Read data from backend in chunks instead of persistent connection. This
+   * is to avoid slow reader causing socket
+   * timeout.
+   */ protected class BatchByteArrayInputStream extends FSInputStream {

AF> Formatting.  Missing newline above.

+
+    private static final int SIZE4MB = 4 * 1024 * 1024;
+    private final URL runner;
+    private volatile byte[] data = null;
+    private volatile long validDataHoldingSize = 0;
+    private volatile int bufferOffset = 0;

AF> Why volatile here?  Needs comments.

I have a feeling this is wrong and you need some synchronized blocks below
instead.

+    private volatile long currentFileOffset = 0;
+    private volatile long nextFileOffset = 0;
+    private long fileSize = 0;
+    private String guid;
+    private StreamState state = StreamState.Initial;
+    private int maxBufferSize;
+    private int maxConcurrentConnection;
+    private Path fsPath;
+    private boolean streamIsClosed;
+    private Future[] subtasks = null;
+
+    BatchByteArrayInputStream(URL url, Path p, int bufferSize,
+        int concurrentConnection) throws IOException {
+      this.runner = url;
+      fsPath = p;
+      FileStatus fStatus = getFileStatus(fsPath);
+      if (!fStatus.isFile()) {
+        throw new IOException("Cannot open the directory " + p + " for " +
+            "reading");
+      }
+      fileSize = fStatus.getLen();
+      guid = getMachineName() + System.currentTimeMillis();

AF> Probably unique.  What is impact if this collides?  Why not use a random
java UUID?

+      this.maxBufferSize = bufferSize;
+      this.maxConcurrentConnection = concurrentConnection;
+      this.streamIsClosed = false;
+    }
+
+    @Override
+    public final int read(long position, byte[] buffer, int offset, int length)
+        throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      long oldPos = this.getPos();
+
+      int nread1;
+      try {
+        this.seek(position);
+        nread1 = this.read(buffer, offset, length);
+      } finally {
+        this.seek(oldPos);
+      }
+
+      return nread1;
+    }
+
+    @Override
+    public final int read() throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      int status = doBufferAvailabilityCheck();
+      if (status == -1) {
+        return status;
+      }
+      int ch = data[bufferOffset++] & (0xff);

AF> this is not thread safe, but looking at FSInputStream, it appears read() is
supposed to be.

Seems this class is racy in general?

+      if (statistics != null) {
+        statistics.incrementBytesRead(1);
+      }
+      return ch;
+    }
+
+    @Override
+    public final void readFully(long position, byte[] buffer, int offset,
+        int length) throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      long startTime = System.currentTimeMillis();
+      super.readFully(position, buffer, offset, length);
+      ADLLogger.log("ReadFully1 Time Taken : " + (System.currentTimeMillis()
+          - startTime));
+      if (statistics != null) {
+        statistics.incrementBytesRead(length);
+      }
+      instrumentation.rawBytesDownloaded(length);
+    }
+
+    @Override
+    public final int read(byte[] b, int off, int len) throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      int status = doBufferAvailabilityCheck();
+      if (status == -1) {
+        return status;
+      }
+
+      long startTime = System.currentTimeMillis();
+      int byteRead = 0;
+      long availableBytes = validDataHoldingSize - off;
+      long requestedBytes = bufferOffset + len - off;
+      if (requestedBytes <= availableBytes) {
+        if (b == null) {
+          throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+          throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+          return 0;
+        }
+
+        ADLLogger.log("AC - [BufferOffset : " + bufferOffset + " " +
+            "CurrentFileSite : " + currentFileOffset + "] Offset : " + off
+            + " Length : " + len + " Read Time Taken : " + (
+            System.currentTimeMillis() - startTime));
+        try {
+          System.arraycopy(data, bufferOffset, b, off, len);
+        } catch (ArrayIndexOutOfBoundsException e) {
+          ADLLogger.log("ArrayIndexOutOfBoundsException AC - [BufferOffset : " +
+              "" + bufferOffset + " CurrentFileSite : " + currentFileOffset
+              + "] Offset : " + off + " Length : " + len + " Read Time Taken : "
+              + (System.currentTimeMillis() - startTime));
+          throw e;
+        }
+        bufferOffset += len;
+        byteRead = len;
+      } else {
+        ADLLogger.log("HC - [BufferOffset : " + bufferOffset + " " +
+            "CurrentFileSite : " + currentFileOffset + "] Offset : " + off
+            + " Length : " + len + " Read Time Taken : " + (
+            System.currentTimeMillis() - startTime));
+        byteRead = super.read(b, off, len);
+      }
+      if (statistics != null) {
+        statistics.incrementBytesRead(byteRead);
+      }
+      instrumentation.rawBytesDownloaded(byteRead);
+      return byteRead;
+    }
+
+    private int doBufferAvailabilityCheck() throws IOException {
+      if (state == StreamState.Initial) {
+        ADLLogger.log("Initial Fill");
+        validDataHoldingSize = fill(nextFileOffset);
+      }
+
+      long dataReloadSize = 0;
+      switch ((int) validDataHoldingSize) {
+      case -1:
+        state = StreamState.StreamEnd;
+        return -1;
+      case 0:
+        dataReloadSize = fill(nextFileOffset);
+        if (dataReloadSize <= 0) {
+          state = StreamState.StreamEnd;
+          return (int) dataReloadSize;
+        } else {
+          validDataHoldingSize = dataReloadSize;
+        }
+        break;
+      default:
+        break;
+      }
+
+      if (bufferOffset >= validDataHoldingSize) {
+        dataReloadSize = fill(nextFileOffset);
+      }
+
+      if (bufferOffset >= ((dataReloadSize == 0) ?
+          validDataHoldingSize :
+          dataReloadSize)) {
+        state = StreamState.StreamEnd;
+        return -1;
+      }
+
+      validDataHoldingSize = ((dataReloadSize == 0) ?
+          validDataHoldingSize :
+          dataReloadSize);
+      state = StreamState.DataCachedInLocalBuffer;
+      return 0;

AF> I didn't have time to review this part thoroughly..  Can take
better look next round.

+    }
+
+    public final long fill(final long off) throws IOException {
+      ADLLogger.log("Fill from " + off);
+      long startTime = System.currentTimeMillis();
+      if (state == StreamState.StreamEnd) {
+        return -1;
+      }
+
+      if (fileSize <= off) {
+        state = StreamState.StreamEnd;
+        return -1;
+      }
+      int len = maxBufferSize;
+      long fileOffset = 0;
+      boolean isEntireFileCached = true;
+      if ((fileSize < maxBufferSize)) {
+        len = (int) fileSize;
+        currentFileOffset = 0;
+        nextFileOffset = 0;
+      } else {
+        if (len > (fileSize - off)) {
+          len = (int) (fileSize - off);
+        }
+
+        if (BufferManager.getInstance()
+            .hasValidDataForOffset(fsPath.toString(), off)) {
+          len = (int) (
+              BufferManager.getInstance().getBufferOffset() + BufferManager
+                  .getInstance().getBufferSize() - (int) off);
+        }
+
+        if (len <= 0) {
+          len = maxBufferSize;
+        }
+        fileOffset = off;
+        isEntireFileCached = false;
+      }
+
+      data = null;
+      BufferManager bm = BufferManager.getInstance();
+      data = bm.getEmpty(len);
+      boolean fetchDataOverNetwork = false;
+      if (bm.hasData(fsPath.toString(), fileOffset, len)) {
+        try {
+          bm.get(data, fileOffset);
+          validDataHoldingSize = data.length;
+          currentFileOffset = fileOffset;
+        } catch (ArrayIndexOutOfBoundsException e) {
+          fetchDataOverNetwork = true;
+        }
+      } else {
+        fetchDataOverNetwork = true;
+      }
+
+      if (fetchDataOverNetwork) {
+        int splitSize = getSplitSize(len);
+        try {
+          validDataHoldingSize = fillDataConcurrently(data, len, fileOffset,
+              splitSize);
+        } catch (InterruptedException e) {
+          throw new IOException(e.getMessage());
+        }
+        bm.add(data, fsPath.toString(), fileOffset);
+        currentFileOffset = nextFileOffset;
+      }
+
+      nextFileOffset += validDataHoldingSize;
+      state = StreamState.DataCachedInLocalBuffer;
+      bufferOffset = isEntireFileCached ? (int) off : 0;
+      ADLLogger.log("Buffer Refill Time Taken : " + (System.currentTimeMillis()
+          - startTime));
+      return validDataHoldingSize;
+    }
+
+    int getSplitSize(int size) {
+      if (size <= SIZE4MB) {
+        return 1;
+      }
+
+      // Not practical
+      if (size > maxBufferSize) {
+        size = maxBufferSize;
+      }
+
+      int equalBufferSplit = size / SIZE4MB;
+      int splitSize = Math.min(equalBufferSplit, maxConcurrentConnection);
+      return splitSize;
+    }
+
+    @Override
+    public final void seek(long pos) throws IOException {
+      if (pos == -1) {
+        throw new IOException("Bad offset, cannot seek to " + pos);
+      }
+
+      BufferManager bm = BufferManager.getInstance();
+      if (bm.hasValidDataForOffset(fsPath.toString(), pos)) {
+        state = StreamState.DataCachedInLocalBuffer;
+      } else if (pos >= 0) {
+        state = StreamState.Initial;
+      }
+
+      long availableBytes = (currentFileOffset + validDataHoldingSize);
+      ADLLogger.log("SEEK  : " + pos + " Available " + currentFileOffset + " " +
+          "Count " + availableBytes);
+
+      // Check if this position falls under buffered data
+      if (pos < currentFileOffset || availableBytes <= 0) {
+        validDataHoldingSize = 0;
+        currentFileOffset = pos;
+        nextFileOffset = pos;
+        bufferOffset = 0;
+        return;
+      }
+
+      if (pos < availableBytes && pos >= currentFileOffset) {
+        state = StreamState.DataCachedInLocalBuffer;
+        bufferOffset = (int) (pos - currentFileOffset);
+      } else {
+        validDataHoldingSize = 0;
+        currentFileOffset = pos;
+        nextFileOffset = pos;
+        bufferOffset = 0;
+      }
+    }
+
+    @Override
+    public final long getPos() throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      return bufferOffset + currentFileOffset;
+    }
+
+    @Override
+    public final int available() throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public final boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    public final int fillDataConcurrently(byte[] byteArray, int length,
+        long globalOffset, int splitSize)
+        throws IOException, InterruptedException {
+      ADLLogger.log("Fill up Data from " + globalOffset + " len : " + length);
+      ExecutorService executor = Executors.newFixedThreadPool(splitSize);
+      subtasks = new Future[splitSize];
+      for (int i = 0; i < splitSize; i++) {
+        int offset = i * (length / splitSize);
+        int splitLength = (splitSize == (i + 1)) ?
+            (length / splitSize) + (length % splitSize) :
+            (length / splitSize);
+        subtasks[i] = executor.submit(
+            new BackgroundReadThread(byteArray, offset, splitLength,
+                globalOffset + offset));
+      }
+
+      executor.shutdown();

AF> Is this shutdown() needed?

+      // wait until all tasks are finished
+      try {
+        executor.awaitTermination(ADLConfKeys.DEFAULT_TIMEOUT_IN_SECONDS,
+            TimeUnit.SECONDS);

AF> Curious, why don't you care about timeout expiring?  Can't you just remove
this whole try-catch block?  The futures are what are ensuring tasks finish
below, no?

+      } catch (InterruptedException e) {
+        ADLLogger.log("Interupted : " + e.getMessage());
+        throw e;
+      }
+
+      int totalBytePainted = 0;
+      for (int i = 0; i < splitSize; ++i) {
+        try {
+          totalBytePainted += (Integer) subtasks[i].get();
+        } catch (InterruptedException e) {
+          throw new IOException(e.getCause());
+        } catch (ExecutionException e) {
+          throw new IOException(e.getCause());
+        }
+      }
+
+      if (totalBytePainted != length) {
+        throw new IOException("Expected " + length + " bytes, Got " +
+            totalBytePainted + " bytes");
+      }
+
+      return totalBytePainted;
+    }
+
+    @Override
+    public final void close() throws IOException {
+      BufferManager.getInstance().clear();
+
+      //need to cleanup the above code the stream and connection close doesnt
+      // happen here
+      //flag set to mark close happened, cannot use the stream once closed
+      streamIsClosed = true;
+    }
+

AF> Add function comment w/ params and @return explanation?

+    private int fillUpData(byte[] buffer, int offset, int length,
+        long globalOffset) throws IOException {
+      int totalBytesRead = 0;
+      final URL offsetUrl = new URL(
+          runner + "&" + new OffsetParam(String.valueOf(globalOffset)) + "&"
+              + new LengthParam(String.valueOf(length)) + "&openid=" + guid);

AF> Can you explain what this openid does?

+      HttpURLConnection conn = new URLRunner(GetOpParam.Op.OPEN, offsetUrl,
+          true).run();
+      InputStream in = conn.getInputStream();
+      try {
+        int bytesRead = 0;
+        while ((bytesRead = in.read(buffer, (int) offset + totalBytesRead,
+            (int) (length - totalBytesRead))) > 0) {
+          totalBytesRead += bytesRead;
+        }
+
+        // InputStream must be fully consumed to enable http keep-alive
+        if (bytesRead == 0) {
+          // Looking for EOF marker byte needs to be read.
+          if (in.read() != -1) {
+            throw new SocketException(
+                "Server returned more than requested " + "data.");

AF> humm.. we expect in.read() to return 0, and then return -1?

Also, you can remove the string concatenation in the exception message.

+          }
+        }
+      } finally {
+        in.close();
+        conn.disconnect();
+      }
+
+      return totalBytesRead;
+    }
+
+    private class BackgroundReadThread implements Callable {
+
+      private final byte[] data;
+      private int offset;
+      private int length;
+      private long globalOffset;
+
+      BackgroundReadThread(byte[] buffer, int off, int size, long position) {
+        this.data = buffer;
+        this.offset = off;
+        this.length = size;
+        this.globalOffset = position;
+      }
+
+      public Object call() throws IOException {
+        return fillUpData(data, offset, length, globalOffset);
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateDebugAzureDataLake.java hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateDebugAzureDataLake.java
new file mode 100644
index 0000000..a3af0f3
--- /dev/null
+++ hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateDebugAzureDataLake.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
<snip>
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;

AF> As noted before, using filesystem types to acheive logging tags is awkward.
Also question why this is not in org.apache.hadoop.fs.adl

+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Use this class implementation to log debug information as part of the
+ * debug logs. Used during development.
+ */
+public class PrivateDebugAzureDataLake extends DelegateToFileSystem {
<snip>
<snip>
iff --git hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
new file mode 100644
index 0000000..c3e14f6
--- /dev/null
+++ hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
@@ -0,0 +1,63 @@
+/*
<snip>
+/**
+ * Query parameter to notify backend server that the all the data has been
+ * pushed to over the stream.
+ *
+ * Used in operation code Create and Append.
+ */
+public class ADLFlush extends BooleanParam {
+  /**

AF> I did not review all the HTTP Op Param stuff... someone should.
<snip>
diff --git hadoop-tools/hadoop-azure-datalake/src/site/markdown/index.md hadoop-tools/hadoop-azure-datalake/src/site/markdown/index.md
new file mode 100644
index 0000000..4dbc27f
--- /dev/null
+++ hadoop-tools/hadoop-azure-datalake/src/site/markdown/index.md
@@ -0,0 +1,246 @@
<snip>
+## <a name="Introduction" />Introduction
+
+The hadoop-azure-datalake module provides support for integration with
+[Azure Data Lake Store](https://azure.microsoft.com/en-in/services/data-lake-store/).
+The jar file is named azure-datalake-store.jar.
+

AF> Thank you for the documentation.

<snip>
+#### <a name="#FileStatus_Cache_Management" />FileStatus Cache Management
+Performance is one of the key features of Azure Data Lake Storage service. In order to gain a performance boost, hadoop-azure-datalake module provides basic FileStatus cache management on the client. This reduces the number of REST calls to the backend service.
+
+FileStatus cache scope is per process and shared between multiple `AdlFileSystem` instances within the process. FileStatus cache is built using ListStatus and GetFileStatus calls to Azure Data Lake Storage. The life of each FileStatus cached object is limited and default is 5 seconds. Time to live FileStatus cached object is configurable through core-site.xml.
+
+**This is an expermental feature and should be turned off for an unexpected behaviour observed during ListStatus and GetFileStatus operation.**
+

AF> Seems more than experimental.

+To Enable/Disable FileStatus cache management
+
+    <property>
+        <name>adl.feature.override.cache.filestatus</name>
+        <value>true</value>
<snip>
+        <property>
+            <name>dfs.webhdfs.oauth2.refresh.token</name>
+            <value></value>
+        </property>
+
+For ADL FileSystem to take effect. Update core-site.xml with
+
+        <property>
+            <name>fs.adl.impl</name>
+            <value>org.apache.hadoop.fs.adl.AdlFileSystem</value>
+        </property>
+
+        <property>
+            <name>fs.AbstractFileSystem.adl.impl</name>
+            <value>org.apache.hadoop.fs.adl.Adl</value>
+        </property>

AF> Again, need to put these in core-default.xml with documentation and
defaults, and always use lowercase in the property names.

+
+
+### <a name="Accessing_adl_URLs" />Accessing adl URLs
+
+After credentials are configured in core-site.xml, any Hadoop component may
<snip>
diff --git hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/TestADLResponseData.java hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/TestADLResponseData.java
new file mode 100644
index 0000000..1aec96c
--- /dev/null
+++ hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/TestADLResponseData.java

AF> Thank you for including tests. I did not have time to review the test code
this round.
{quote}

> Support Microsoft Azure Data Lake - as a file system in Hadoop
> --------------------------------------------------------------
>
>                 Key: HADOOP-12666
>                 URL: https://issues.apache.org/jira/browse/HADOOP-12666
>             Project: Hadoop Common
>          Issue Type: New Feature
>          Components: fs, fs/azure, tools
>            Reporter: Vishwajeet Dusane
>            Assignee: Vishwajeet Dusane
>         Attachments: HADOOP-12666-002.patch, HADOOP-12666-003.patch, HADOOP-12666-004.patch, HADOOP-12666-005.patch, HADOOP-12666-006.patch, HADOOP-12666-1.patch
>
>   Original Estimate: 336h
>          Time Spent: 336h
>  Remaining Estimate: 0h
>
> h2. Description
> This JIRA describes a new file system implementation for accessing Microsoft Azure Data Lake Store (ADL) from within Hadoop. This would enable existing Hadoop applications such has MR, HIVE, Hbase etc..,  to use ADL store as input or output.
>  
> ADL is ultra-high capacity, Optimized for massive throughput with rich management and security features. More details available at https://azure.microsoft.com/en-us/services/data-lake-store/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message