hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject [6/6] hadoop git commit: HADOOP-12635. Adding Append API support for WASB. Contributed by Dushyanth.
Date Mon, 18 Jan 2016 17:22:15 GMT
HADOOP-12635. Adding Append API support for WASB. Contributed by Dushyanth.

(cherry picked from commit 8bc93db2e7c64830b6a662f28c8917a9eef4e7c9)
(cherry picked from commit 62d616621134cc5e68f4d5fd32f49ea4d731417c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ffc0d988
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ffc0d988
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ffc0d988

Branch: refs/heads/branch-2.8
Commit: ffc0d988869fb964fb052e8d109821807ca75ee1
Parents: 23d729f
Author: cnauroth <cnauroth@apache.org>
Authored: Mon Jan 18 09:08:53 2016 -0800
Committer: cnauroth <cnauroth@apache.org>
Committed: Mon Jan 18 09:13:31 2016 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   2 +
 .../fs/azure/AzureNativeFileSystemStore.java    |  23 +-
 .../hadoop/fs/azure/BlockBlobAppendStream.java  | 775 +++++++++++++++++++
 .../hadoop/fs/azure/NativeAzureFileSystem.java  | 216 +++---
 .../fs/azure/NativeAzureFileSystemHelper.java   | 107 +++
 .../hadoop/fs/azure/NativeFileSystemStore.java  |   2 +
 .../hadoop/fs/azure/PageBlobOutputStream.java   |  17 +-
 .../hadoop/fs/azure/StorageInterface.java       |  89 ++-
 .../hadoop/fs/azure/StorageInterfaceImpl.java   |  33 +-
 .../hadoop-azure/src/site/markdown/index.md     |  20 +-
 .../hadoop/fs/azure/MockStorageInterface.java   |  34 +-
 .../azure/TestNativeAzureFileSystemAppend.java  | 362 +++++++++
 12 files changed, 1550 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 1eba762..3711b43 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -46,6 +46,8 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12691. Add CSRF Filter for REST APIs to Hadoop Common.
     (Larry McCay via cnauroth)
 
+    HADOOP-12635. Adding Append API support for WASB. (Dushyanth via cnauroth)
+
   IMPROVEMENTS
 
     HADOOP-12458. Retries is typoed to spell Retires in parts of

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index a936cd6..0097912 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -33,13 +33,11 @@ import java.net.URLEncoder;
 import java.security.InvalidKeyException;
 import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
@@ -64,6 +62,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.mortbay.util.ajax.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.OperationContext;
@@ -2681,4 +2680,24 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     close();
     super.finalize();
   }
+
+  @Override
+  public DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException {
+
+    try {
+
+      if (isPageBlobKey(key)) {
+        throw new UnsupportedOperationException("Append not supported for Page Blobs");
+      }
+
+      CloudBlobWrapper blob =  this.container.getBlockBlobReference(key);
+
+      BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext());
+      appendStream.initialize();
+
+      return new DataOutputStream(appendStream);
+    } catch(Exception ex) {
+      throw new AzureException(ex);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
new file mode 100644
index 0000000..d1ec8df
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
+import org.mortbay.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
+
+/**
+ * Stream object that implememnts append for Block Blobs in WASB.
+ */
+public class BlockBlobAppendStream extends OutputStream {
+
+  private final String key;
+  private final int bufferSize;
+  private ByteArrayOutputStream outBuffer;
+  private final CloudBlockBlobWrapper blob;
+  private final OperationContext opContext;
+
+  /**
+   * Variable to track if the stream has been closed.
+   */
+  private boolean closed = false;
+
+  /**
+   * Variable to track if the append lease is released.
+   */
+
+  private volatile boolean leaseFreed;
+
+  /**
+   * Variable to track if the append stream has been
+   * initialized.
+   */
+
+  private boolean initialized = false;
+
+  /**
+   *  Last IOException encountered
+   */
+  private volatile IOException lastError = null;
+
+  /**
+   * List to keep track of the uncommitted azure storage
+   * block ids
+   */
+  private final List<BlockEntry> uncommittedBlockEntries;
+
+  private static final int UNSET_BLOCKS_COUNT = -1;
+
+  /**
+   * Variable to hold the next block id to be used for azure
+   * storage blocks.
+   */
+  private long nextBlockCount = UNSET_BLOCKS_COUNT;
+
+  private final Random sequenceGenerator = new Random();
+
+  /**
+   *  Time to wait to renew lease in milliseconds
+   */
+  private static final int LEASE_RENEWAL_PERIOD = 10000;
+
+  /**
+   *  Number of times to retry for lease renewal
+   */
+  private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
+
+  /**
+   *  Time to wait before retrying to set the lease
+   */
+  private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
+
+  /**
+   *  Metadata key used on the blob to indicate append lease is active
+   */
+  public static final String APPEND_LEASE = "append_lease";
+
+  /**
+   * Timeout value for the append lease in millisecs. If the lease is not
+   * renewed within 30 seconds then another thread can acquire the append lease
+   * on the blob
+   */
+  public static final int APPEND_LEASE_TIMEOUT = 30000;
+
+  /**
+   *  Metdata key used on the blob to indicate last modified time of append lease
+   */
+  public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
+
+  /**
+   * Number of times block upload needs is retried.
+   */
+  private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
+
+  /**
+   * Wait time between block upload retries in millisecs.
+   */
+  private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
+
+  private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
+
+  private static final int MAX_BLOCK_COUNT = 100000;
+
+  private ThreadPoolExecutor ioThreadPool;
+
+  /**
+   * Atomic integer to provide thread id for thread names for uploader threads.
+   */
+  private final AtomicInteger threadSequenceNumber;
+
+  /**
+   * Prefix to be used for thread names for uploader threads.
+   */
+  private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
+
+  private static final String UTC_STR = "UTC";
+
+  public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
+      final String aKey, final int bufferSize, final OperationContext opContext)
+          throws IOException {
+
+    if (null == aKey || 0 == aKey.length()) {
+      throw new IllegalArgumentException(
+          "Illegal argument: The key string is null or empty");
+    }
+
+    if (0 >= bufferSize) {
+      throw new IllegalArgumentException(
+          "Illegal argument bufferSize cannot be zero or negative");
+    }
+
+
+    this.blob = blob;
+    this.opContext = opContext;
+    this.key = aKey;
+    this.bufferSize = bufferSize;
+    this.threadSequenceNumber = new AtomicInteger(0);
+    setBlocksCount();
+
+    this.outBuffer = new ByteArrayOutputStream(bufferSize);
+    this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
+
+    // Acquire append lease on the blob.
+    try {
+      //Set the append lease if the value of the append lease is false
+      if (!updateBlobAppendMetadata(true, false)) {
+        LOG.error("Unable to set Append Lease on the Blob : {} "
+            + "Possibly because another client already has a create or append stream open on the Blob", key);
+        throw new IOException("Unable to set Append lease on the Blob. "
+            + "Possibly because another client already had an append stream open on the Blob.");
+      }
+    } catch (StorageException ex) {
+      LOG.error("Encountered Storage exception while acquiring append "
+          + "lease on blob : {}. Storage Exception : {} ErrorCode : {}",
+          key, ex, ex.getErrorCode());
+
+      throw new IOException(ex);
+    }
+
+    leaseFreed = false;
+  }
+
+  /**
+   * Helper method that starts an Append Lease renewer thread and the
+   * thread pool.
+   */
+  public synchronized void initialize() {
+
+    if (initialized) {
+      return;
+    }
+    /*
+     * Start the thread for  Append lease renewer.
+     */
+    Thread appendLeaseRenewer = new Thread(new AppendRenewer());
+    appendLeaseRenewer.setDaemon(true);
+    appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key));
+    appendLeaseRenewer.start();
+
+    /*
+     * Parameters to ThreadPoolExecutor:
+     * corePoolSize : the number of threads to keep in the pool, even if they are idle,
+     *                unless allowCoreThreadTimeOut is set
+     * maximumPoolSize : the maximum number of threads to allow in the pool
+     * keepAliveTime - when the number of threads is greater than the core,
+     *                 this is the maximum time that excess idle threads will
+     *                 wait for new tasks before terminating.
+     * unit - the time unit for the keepAliveTime argument
+     * workQueue - the queue to use for holding tasks before they are executed
+     *  This queue will hold only the Runnable tasks submitted by the execute method.
+     */
+    this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory());
+
+    initialized = true;
+  }
+
+  /**
+   * Get the blob name.
+   *
+   * @return String Blob name.
+   */
+  public String getKey() {
+    return key;
+  }
+
+  /**
+   * Get the backing blob.
+   * @return buffer size of the stream.
+   */
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  /**
+   * Writes the specified byte to this output stream. The general contract for
+   * write is that one byte is written to the output stream. The byte to be
+   * written is the eight low-order bits of the argument b. The 24 high-order
+   * bits of b are ignored.
+   *
+   * @param byteVal
+   *          the byteValue to write.
+   * @throws IOException
+   *           if an I/O error occurs. In particular, an IOException may be
+   *           thrown if the output stream has been closed.
+   */
+  @Override
+  public void write(final int byteVal) throws IOException {
+    write(new byte[] { (byte) (byteVal & 0xFF) });
+  }
+
+  /**
+   * Writes b.length bytes from the specified byte array to this output stream.
+   *
+   * @param data
+   *          the byte array to write.
+   *
+   * @throws IOException
+   *           if an I/O error occurs. In particular, an IOException may be
+   *           thrown if the output stream has been closed.
+   */
+  @Override
+  public void write(final byte[] data) throws IOException {
+    write(data, 0, data.length);
+  }
+
+  /**
+   * Writes length bytes from the specified byte array starting at offset to
+   * this output stream.
+   *
+   * @param data
+   *          the byte array to write.
+   * @param offset
+   *          the start offset in the data.
+   * @param length
+   *          the number of bytes to write.
+   * @throws IOException
+   *           if an I/O error occurs. In particular, an IOException may be
+   *           thrown if the output stream has been closed.
+   */
+  @Override
+  public void write(final byte[] data, final int offset, final int length)
+      throws IOException {
+
+    if (offset < 0 || length < 0 || length > data.length - offset) {
+      throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
+    }
+
+    writeInternal(data, offset, length);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+
+    if (!initialized) {
+      throw new IOException("Trying to close an uninitialized Append stream");
+    }
+
+    if (closed) {
+      return;
+    }
+
+    if (leaseFreed) {
+      throw new IOException(String.format("Attempting to close an append stream on blob : %s "
+          + " that does not have lease on the Blob. Failing close", key));
+    }
+
+    if (outBuffer.size() > 0) {
+      uploadBlockToStorage(outBuffer.toByteArray());
+    }
+
+    ioThreadPool.shutdown();
+
+    try {
+      if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
+        LOG.error("Time out occured while waiting for IO request to finish in append"
+            + " for blob : {}", key);
+        NativeAzureFileSystemHelper.logAllLiveStackTraces();
+        throw new IOException("Timed out waiting for IO requests to finish");
+      }
+    } catch(InterruptedException intrEx) {
+
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key);
+      throw new IOException("Append Commit interrupted.");
+    }
+
+    // Calling commit after all blocks are succesfully uploaded.
+    if (lastError == null) {
+      commitAppendBlocks();
+    }
+
+    // Perform cleanup.
+    cleanup();
+
+    if (lastError != null) {
+      throw lastError;
+    }
+  }
+
+  /**
+   * Helper method that cleans up the append stream.
+   */
+  private synchronized void cleanup() {
+
+    closed = true;
+
+    try {
+      // Set the value of append lease to false if the value is set to true.
+        updateBlobAppendMetadata(false, true);
+    } catch(StorageException ex) {
+      LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} "
+          + "Error Code : {}",
+          key, ex, ex.getErrorCode());
+      lastError = new IOException(ex);
+    }
+
+    leaseFreed = true;
+  }
+
+  /**
+   * Method to commit all the uncommited blocks to azure storage.
+   * If the commit fails then blocks are automatically cleaned up
+   * by Azure storage.
+   * @throws IOException
+   */
+  private synchronized void commitAppendBlocks() throws IOException {
+
+    SelfRenewingLease lease = null;
+
+    try {
+      if (uncommittedBlockEntries.size() > 0) {
+
+        //Acquiring lease on the blob.
+        lease = new SelfRenewingLease(blob);
+
+        // Downloading existing blocks
+        List<BlockEntry> blockEntries =  blob.downloadBlockList(BlockListingFilter.COMMITTED,
+            new BlobRequestOptions(), opContext);
+
+        // Adding uncommitted blocks.
+        blockEntries.addAll(uncommittedBlockEntries);
+
+        AccessCondition accessCondition = new AccessCondition();
+        accessCondition.setLeaseID(lease.getLeaseID());
+        blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext);
+        uncommittedBlockEntries.clear();
+      }
+    } catch(StorageException ex) {
+      LOG.error("Storage exception encountered during block commit phase of append for blob"
+          + " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode());
+      throw new IOException("Encountered Exception while committing append blocks", ex);
+    } finally {
+      if (lease != null) {
+        try {
+          lease.free();
+        } catch(StorageException ex) {
+          LOG.debug("Exception encountered while releasing lease for "
+              + "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode());
+          // Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object.
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure
+   * storage SDK.
+   */
+  private void setBlocksCount() throws IOException {
+    try {
+
+      if (nextBlockCount == UNSET_BLOCKS_COUNT) {
+
+        nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
+            + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
+
+        List<BlockEntry> blockEntries =
+            blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext);
+
+        nextBlockCount += blockEntries.size();
+
+      }
+    } catch (StorageException ex) {
+      LOG.debug("Encountered storage exception during setting next Block Count."
+          + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
+      throw new IOException(ex);
+    }
+  }
+
+  /**
+   * Helper method that generates the next block id for uploading a block to azure storage.
+   * @return String representing the block ID generated.
+   * @throws IOException
+   */
+  private String generateBlockId() throws IOException {
+
+    if (nextBlockCount == UNSET_BLOCKS_COUNT) {
+      throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
+    }
+
+    byte[] blockIdInBytes = getBytesFromLong(nextBlockCount);
+    return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Returns a byte array that represents the data of a <code>long</code> value. This
+   * utility method is copied from com.microsoft.azure.storage.core.Utility class.
+   * This class is marked as internal, hence we clone the method here and not express
+   * dependency on the Utility Class
+   *
+   * @param value
+   *            The value from which the byte array will be returned.
+   *
+   * @return A byte array that represents the data of the specified <code>long</code> value.
+   */
+  private static byte[] getBytesFromLong(final long value) {
+      final byte[] tempArray = new byte[8];
+
+      for (int m = 0; m < 8; m++) {
+          tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
+      }
+
+      return tempArray;
+  }
+  /**
+   * Helper method that creates a thread to upload a block to azure storage.
+   * @param payload
+   * @throws IOException
+   */
+  private synchronized void uploadBlockToStorage(byte[] payload) throws IOException {
+
+    // upload payload to azure storage
+    nextBlockCount++;
+    String blockId = generateBlockId();
+    // Since uploads of the Azure storage are done in parallel threads, we go ahead
+    // add the blockId in the uncommitted list. If the upload of the block fails
+    // we don't commit the blockIds.
+    uncommittedBlockEntries.add(new BlockEntry(blockId));
+    ioThreadPool.execute(new WriteRequest(payload, blockId));
+  }
+
+
+  /**
+   * Helper method to updated the Blob metadata during Append lease operations.
+   * Blob metadata is updated to holdLease value only if the current lease
+   * status is equal to testCondition and the last update on the blob metadata
+   * is less that 30 secs old.
+   * @param holdLease
+   * @param testCondition
+   * @return true if the updated lease operation was successful or false otherwise
+   * @throws StorageException
+   */
+  private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition)
+      throws StorageException {
+
+    SelfRenewingLease lease = null;
+    StorageException lastStorageException = null;
+    int leaseRenewalRetryCount = 0;
+
+    /*
+     * Updating the Blob metadata honours following algorithm based on
+     *  1) If the append lease metadata is present
+     *  2) Last updated time of the append lease
+     *  3) Previous value of the Append lease metadata.
+     *
+     * The algorithm:
+     *  1) If append lease metadata is not part of the Blob. In this case
+     *     this is the first client to Append so we update the metadata.
+     *  2) If append lease metadata is present and timeout has occurred.
+     *     In this case irrespective of what the value of the append lease is we update the metadata.
+     *  3) If append lease metadata is present and is equal to testCondition value (passed as parameter)
+     *     and timeout has not occurred, we update the metadata.
+     *  4) If append lease metadata is present and is not equal to testCondition value (passed as parameter)
+     *     and timeout has not occurred, we do not update metadata and return false.
+     *
+     */
+    while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) {
+
+      lastStorageException = null;
+
+      synchronized(this) {
+        try {
+
+          final Calendar currentCalendar = Calendar
+              .getInstance(Locale.US);
+          currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
+          long currentTime = currentCalendar.getTime().getTime();
+
+          // Acquire lease on the blob.
+          lease = new SelfRenewingLease(blob);
+
+          blob.downloadAttributes(opContext);
+          HashMap<String, String> metadata = blob.getMetadata();
+
+          if (metadata.containsKey(APPEND_LEASE)
+              && currentTime - Long.parseLong(
+                  metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT
+              && !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) {
+            return false;
+          }
+
+          metadata.put(APPEND_LEASE, Boolean.toString(holdLease));
+          metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime));
+          blob.setMetadata(metadata);
+          AccessCondition accessCondition = new AccessCondition();
+          accessCondition.setLeaseID(lease.getLeaseID());
+          blob.uploadMetadata(accessCondition, null, opContext);
+          return true;
+
+        } catch (StorageException ex) {
+
+          lastStorageException = ex;
+          LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} "
+              + "Error Code : {}",
+              key, ex, ex.getErrorCode());
+          leaseRenewalRetryCount++;
+
+        } finally {
+
+          if (lease != null) {
+            try {
+              lease.free();
+            } catch(StorageException ex) {
+              LOG.debug("Encountered Storage exception while releasing lease for Blob {} "
+                  + "during Append  metadata operation. Storage Exception {} "
+                  + "Error Code : {} ", key, ex, ex.getErrorCode());
+            } finally {
+              lease = null;
+            }
+          }
+        }
+      }
+
+      if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) {
+        throw lastStorageException;
+      } else {
+        try {
+          Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD);
+        } catch(InterruptedException ex) {
+          LOG.debug("Blob append metadata updated method interrupted");
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+    // The code should not enter here because the while loop will
+    // always be executed and if the while loop is executed we
+    // would returning from the while loop.
+    return false;
+  }
+
+  /**
+   * This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer.
+   * @param data
+   * @param offset
+   * @param length
+   * @throws IOException
+   */
+  private synchronized void writeInternal(final byte[] data, final int offset, final int length)
+      throws IOException {
+
+    if (!initialized) {
+      throw new IOException("Trying to write to an un-initialized Append stream");
+    }
+
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+
+    if (leaseFreed) {
+      throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write"));
+    }
+
+    byte[] currentData = new byte[length];
+    System.arraycopy(data, offset, currentData, 0, length);
+
+    // check to see if the data to be appended exceeds the
+    // buffer size. If so we upload a block to azure storage.
+    while ((outBuffer.size() + currentData.length) > bufferSize) {
+
+      byte[] payload = new byte[bufferSize];
+
+      // Add data from the existing buffer
+      System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size());
+
+      // Updating the available size in the payload
+      int availableSpaceInPayload = bufferSize - outBuffer.size();
+
+      // Adding data from the current call
+      System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload);
+
+      uploadBlockToStorage(payload);
+
+      // updating the currentData buffer
+      byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload];
+      System.arraycopy(currentData, availableSpaceInPayload,
+          tempBuffer, 0, currentData.length - availableSpaceInPayload);
+      currentData = tempBuffer;
+      outBuffer = new ByteArrayOutputStream(bufferSize);
+    }
+
+    outBuffer.write(currentData);
+  }
+
+  /**
+   * Runnable instance that uploads the block of data to azure storage.
+   *
+   *
+   */
+  private class WriteRequest implements Runnable {
+    private final byte[] dataPayload;
+    private final String blockId;
+
+    public WriteRequest(byte[] dataPayload, String blockId) {
+      this.dataPayload = dataPayload;
+      this.blockId = blockId;
+    }
+
+    @Override
+    public void run() {
+
+      int uploadRetryAttempts = 0;
+      IOException lastLocalException = null;
+      while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
+        try {
+
+          blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload),
+              dataPayload.length, new BlobRequestOptions(), opContext);
+          break;
+        } catch(Exception ioe) {
+          Log.debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe);
+          uploadRetryAttempts++;
+          lastLocalException = new IOException("Encountered Exception while uploading block", ioe);
+          try {
+            Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL);
+          } catch(InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            break;
+          }
+        }
+      }
+
+      if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
+        lastError = lastLocalException;
+      }
+    }
+  }
+
+  /**
+   * A ThreadFactory that creates uploader thread with
+   * meaningful names helpful for debugging purposes.
+   */
+  class UploaderThreadFactory implements ThreadFactory {
+
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread t = new Thread(r);
+      t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key,
+          threadSequenceNumber.getAndIncrement()));
+      return t;
+    }
+  }
+
+  /**
+   * A deamon thread that renews the Append lease on the blob.
+   * The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing
+   * the lease. If an error is encountered while renewing the lease
+   * then an lease is released by this thread, which fails all other
+   * operations.
+   */
+  private class AppendRenewer implements Runnable {
+
+    @Override
+    public void run() {
+
+      while (!leaseFreed) {
+
+        try {
+          Thread.sleep(LEASE_RENEWAL_PERIOD);
+        } catch (InterruptedException ie) {
+          LOG.debug("Appender Renewer thread interrupted");
+          Thread.currentThread().interrupt();
+        }
+
+        Log.debug("Attempting to renew append lease on {}", key);
+
+        try {
+          if (!leaseFreed) {
+            // Update the blob metadata to renew the append lease
+            if (!updateBlobAppendMetadata(true, true)) {
+              LOG.error("Unable to re-acquire append lease on the Blob {} ", key);
+              leaseFreed = true;
+            }
+          }
+        } catch (StorageException ex) {
+
+          LOG.debug("Lease renewal for Blob : {} encountered "
+              + "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode());
+
+          // We swallow the exception here because if the blob metadata is not updated for
+          // APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and
+          // continue forward if it needs to append.
+          leaseFreed = true;
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index d2ff705..ed65184 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azure;
 
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -31,7 +32,6 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.TreeSet;
@@ -41,7 +41,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -60,8 +59,6 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.fs.azure.AzureException;
-import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 import org.slf4j.Logger;
@@ -73,12 +70,8 @@ import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.OperationContext;
-import com.microsoft.azure.storage.StorageErrorCode;
 import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.StorageErrorCodeStrings;
+
 
 import org.apache.hadoop.io.IOUtils;
 
@@ -288,7 +281,7 @@ public class NativeAzureFileSystem extends FileSystem {
         throw new IOException("Unable to write RenamePending file for folder rename from "
             + srcKey + " to " + dstKey, e);
       } finally {
-        NativeAzureFileSystem.cleanup(LOG, output);
+        NativeAzureFileSystemHelper.cleanup(LOG, output);
       }
     }
 
@@ -663,6 +656,11 @@ public class NativeAzureFileSystem extends FileSystem {
 
   public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";
 
+  /*
+   * Property to enable Append API.
+   */
+  public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";
+
   private class NativeAzureFsInputStream extends FSInputStream {
     private InputStream in;
     private final String key;
@@ -728,7 +726,7 @@ public class NativeAzureFileSystem extends FileSystem {
         return result;
       } catch(IOException e) {
 
-        Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
 
         if (innerException instanceof StorageException) {
 
@@ -736,7 +734,7 @@ public class NativeAzureFileSystem extends FileSystem {
               + " Exception details: {} Error Code : {}",
               key, e, ((StorageException) innerException).getErrorCode());
 
-          if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+          if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
             throw new FileNotFoundException(String.format("%s is not found", key));
           }
         }
@@ -782,7 +780,7 @@ public class NativeAzureFileSystem extends FileSystem {
         return result;
       } catch(IOException e) {
 
-        Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
 
         if (innerException instanceof StorageException) {
 
@@ -790,7 +788,7 @@ public class NativeAzureFileSystem extends FileSystem {
               + " Exception details: {} Error Code : {}",
               key, e, ((StorageException) innerException).getErrorCode());
 
-          if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+          if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
             throw new FileNotFoundException(String.format("%s is not found", key));
           }
         }
@@ -822,10 +820,10 @@ public class NativeAzureFileSystem extends FileSystem {
           this.pos);
       } catch(IOException e) {
 
-        Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
 
         if (innerException instanceof StorageException
-            && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
           throw new FileNotFoundException(String.format("%s is not found", key));
         }
 
@@ -1041,7 +1039,7 @@ public class NativeAzureFileSystem extends FileSystem {
   private static boolean suppressRetryPolicy = false;
   // A counter to create unique (within-process) names for my metrics sources.
   private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
-
+  private boolean appendSupportEnabled = false;
   
   public NativeAzureFileSystem() {
     // set store in initialize()
@@ -1164,7 +1162,7 @@ public class NativeAzureFileSystem extends FileSystem {
     this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
         MAX_AZURE_BLOCK_SIZE);
 
-
+    this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
     LOG.debug("NativeAzureFileSystem. Initializing.");
     LOG.debug("  blockSize  = {}",
         conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
@@ -1294,7 +1292,61 @@ public class NativeAzureFileSystem extends FileSystem {
   @Override
   public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
       throws IOException {
-    throw new IOException("Not supported");
+
+    if (!appendSupportEnabled) {
+      throw new UnsupportedOperationException("Append Support not enabled");
+    }
+
+    LOG.debug("Opening file: {} for append", f);
+
+    Path absolutePath = makeAbsolute(f);
+    String key = pathToKey(absolutePath);
+    FileMetadata meta = null;
+    try {
+      meta = store.retrieveMetadata(key);
+    } catch(Exception ex) {
+
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+
+      if (innerException instanceof StorageException
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+
+        throw new FileNotFoundException(String.format("%s is not found", key));
+      } else {
+        throw ex;
+      }
+    }
+
+    if (meta == null) {
+      throw new FileNotFoundException(f.toString());
+    }
+
+    if (meta.isDir()) {
+      throw new FileNotFoundException(f.toString()
+          + " is a directory not a file.");
+    }
+
+    if (store.isPageBlobKey(key)) {
+      throw new IOException("Append not supported for Page Blobs");
+    }
+
+    DataOutputStream appendStream = null;
+
+    try {
+      appendStream = store.retrieveAppendStream(key, bufferSize);
+    } catch (Exception ex) {
+
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+
+      if (innerException instanceof StorageException
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+        throw new FileNotFoundException(String.format("%s is not found", key));
+      } else {
+        throw ex;
+      }
+    }
+
+    return new FSDataOutputStream(appendStream, statistics);
   }
 
   @Override
@@ -1379,7 +1431,7 @@ public class NativeAzureFileSystem extends FileSystem {
           lease.free();
         }
       } catch (Exception e) {
-        NativeAzureFileSystem.cleanup(LOG, out);
+        NativeAzureFileSystemHelper.cleanup(LOG, out);
         String msg = "Unable to free lease on " + parent.toUri();
         LOG.error(msg);
         throw new IOException(msg, e);
@@ -1577,10 +1629,10 @@ public class NativeAzureFileSystem extends FileSystem {
       metaFile = store.retrieveMetadata(key);
     } catch (IOException e) {
 
-      Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
 
       if (innerException instanceof StorageException
-          && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
         return false;
       }
@@ -1611,7 +1663,7 @@ public class NativeAzureFileSystem extends FileSystem {
           parentMetadata = store.retrieveMetadata(parentKey);
         } catch (IOException e) {
 
-          Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
 
           if (innerException instanceof StorageException) {
             // Invalid State.
@@ -1619,7 +1671,7 @@ public class NativeAzureFileSystem extends FileSystem {
             // if the file not present. But not retrieving metadata here is an
             // unrecoverable state and can only happen if there is a race condition
             // hence throwing a IOException
-            if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
               throw new IOException("File " + f + " has a parent directory "
                   + parentPath + " whose metadata cannot be retrieved. Can't resolve");
             }
@@ -1662,10 +1714,10 @@ public class NativeAzureFileSystem extends FileSystem {
         instrumentation.fileDeleted();
       } catch(IOException e) {
 
-        Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
 
         if (innerException instanceof StorageException
-            && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
           return false;
         }
 
@@ -1684,7 +1736,7 @@ public class NativeAzureFileSystem extends FileSystem {
           parentMetadata = store.retrieveMetadata(parentKey);
         } catch (IOException e) {
 
-          Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
 
           if (innerException instanceof StorageException) {
             // Invalid State.
@@ -1692,7 +1744,7 @@ public class NativeAzureFileSystem extends FileSystem {
             // if the file not present. But not retrieving metadata here is an
             // unrecoverable state and can only happen if there is a race condition
             // hence throwing a IOException
-            if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
               throw new IOException("File " + f + " has a parent directory "
                   + parentPath + " whose metadata cannot be retrieved. Can't resolve");
             }
@@ -1728,10 +1780,10 @@ public class NativeAzureFileSystem extends FileSystem {
             priorLastKey);
       } catch(IOException e) {
 
-        Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
 
         if (innerException instanceof StorageException
-            && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
           return false;
         }
 
@@ -1763,10 +1815,10 @@ public class NativeAzureFileSystem extends FileSystem {
             instrumentation.fileDeleted();
           } catch(IOException e) {
 
-            Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+            Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
 
             if (innerException instanceof StorageException
-                && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+                && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
               return false;
             }
 
@@ -1785,10 +1837,10 @@ public class NativeAzureFileSystem extends FileSystem {
         store.delete(key);
       } catch(IOException e) {
 
-        Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
 
         if (innerException instanceof StorageException
-            && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
           return false;
         }
 
@@ -1829,10 +1881,10 @@ public class NativeAzureFileSystem extends FileSystem {
       meta = store.retrieveMetadata(key);
     } catch(Exception ex) {
 
-      Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
       if (innerException instanceof StorageException
-          && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
           throw new FileNotFoundException(String.format("%s is not found", key));
        }
@@ -1922,10 +1974,10 @@ public class NativeAzureFileSystem extends FileSystem {
       meta = store.retrieveMetadata(key);
     } catch (IOException ex) {
 
-      Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
       if (innerException instanceof StorageException
-          && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
         throw new FileNotFoundException(String.format("%s is not found", f));
       }
@@ -1948,10 +2000,10 @@ public class NativeAzureFileSystem extends FileSystem {
         listing  = store.list(key, AZURE_LIST_ALL, 1, partialKey);
       } catch (IOException ex) {
 
-        Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
         if (innerException instanceof StorageException
-            && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
             throw new FileNotFoundException(String.format("%s is not found", key));
         }
@@ -1972,10 +2024,10 @@ public class NativeAzureFileSystem extends FileSystem {
        try {
          listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
        } catch (IOException ex) {
-         Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+         Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
          if (innerException instanceof StorageException
-             && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+             && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
            throw new FileNotFoundException(String.format("%s is not found", key));
          }
@@ -2196,10 +2248,10 @@ public class NativeAzureFileSystem extends FileSystem {
       meta = store.retrieveMetadata(key);
     } catch(Exception ex) {
 
-      Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
       if (innerException instanceof StorageException
-          && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
         throw new FileNotFoundException(String.format("%s is not found", key));
       }
@@ -2219,10 +2271,10 @@ public class NativeAzureFileSystem extends FileSystem {
     try {
       inputStream = store.retrieve(key);
     } catch(Exception ex) {
-      Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
       if (innerException instanceof StorageException
-          && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
         throw new FileNotFoundException(String.format("%s is not found", key));
       }
@@ -2261,14 +2313,14 @@ public class NativeAzureFileSystem extends FileSystem {
       dstMetadata = store.retrieveMetadata(dstKey);
     } catch (IOException ex) {
 
-      Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
       // A BlobNotFound storage exception in only thrown from retrieveMetdata API when
       // there is a race condition. If there is another thread which deletes the destination
       // file or folder, then this thread calling rename should be able to continue with
       // rename gracefully. Hence the StorageException is swallowed here.
       if (innerException instanceof StorageException) {
-        if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+        if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
           LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
               + "Swallowin the exception to handle race condition gracefully", dstKey);
         }
@@ -2294,10 +2346,10 @@ public class NativeAzureFileSystem extends FileSystem {
         parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
       } catch (IOException ex) {
 
-        Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
         if (innerException instanceof StorageException
-            && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
           LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
           return false;
@@ -2320,10 +2372,10 @@ public class NativeAzureFileSystem extends FileSystem {
     try {
       srcMetadata = store.retrieveMetadata(srcKey);
     } catch (IOException ex) {
-      Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
       if (innerException instanceof StorageException
-          && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
         LOG.debug("Source {} doesn't exists. Failing rename", src);
         return false;
@@ -2342,10 +2394,10 @@ public class NativeAzureFileSystem extends FileSystem {
         store.rename(srcKey, dstKey);
       } catch(IOException ex) {
 
-        Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
         if (innerException instanceof StorageException
-            && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
           LOG.debug("BlobNotFoundException encountered. Failing rename", src);
           return false;
@@ -2552,10 +2604,10 @@ public class NativeAzureFileSystem extends FileSystem {
     try {
       metadata = store.retrieveMetadata(key);
     } catch (IOException ex) {
-      Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
       if (innerException instanceof StorageException
-          && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
         throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
       }
@@ -2591,10 +2643,10 @@ public class NativeAzureFileSystem extends FileSystem {
     try {
       metadata = store.retrieveMetadata(key);
     } catch (IOException ex) {
-      Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
       if (innerException instanceof StorageException
-          && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
 
         throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
       }
@@ -2817,52 +2869,4 @@ public class NativeAzureFileSystem extends FileSystem {
     // Return to the caller with the randomized key.
     return randomizedKey;
   }
-
-  private static void cleanup(Logger log, java.io.Closeable closeable) {
-    if (closeable != null) {
-      try {
-        closeable.close();
-      } catch(IOException e) {
-        if (log != null) {
-          log.debug("Exception in closing {}", closeable, e);
-        }
-      }
-    }
-  }
-
-  /*
-   * Helper method to recursively check if the cause of the exception is
-   * a Azure storage exception.
-   */
-  private static Throwable checkForAzureStorageException(Exception e) {
-
-    Throwable innerException = e.getCause();
-
-    while (innerException != null
-            && !(innerException instanceof StorageException)) {
-      innerException = innerException.getCause();
-    }
-
-    return innerException;
-  }
-
-  /*
-   * Helper method to check if the AzureStorageException is
-   * because backing blob was not found.
-   */
-  private static boolean isFileNotFoundException(StorageException e) {
-
-    String errorCode = ((StorageException) e).getErrorCode();
-    if (errorCode != null
-        && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
-            || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
-            || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
-            || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
-
-      return true;
-    }
-
-    return false;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
new file mode 100644
index 0000000..40efdc6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.storage.StorageErrorCode;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
+import com.microsoft.azure.storage.StorageException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+/**
+ * Utility class that has helper methods.
+ *
+ */
+
+@InterfaceAudience.Private
+final class NativeAzureFileSystemHelper {
+
+  private NativeAzureFileSystemHelper() {
+    // Hiding the cosnstructor as this is a utility class.
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystemHelper.class);
+
+  public static void cleanup(Logger log, java.io.Closeable closeable) {
+    if (closeable != null) {
+      try {
+        closeable.close();
+      } catch(IOException e) {
+        if (log != null) {
+          log.debug("Exception in closing {}", closeable, e);
+        }
+      }
+    }
+  }
+
+  /*
+   * Helper method to recursively check if the cause of the exception is
+   * a Azure storage exception.
+   */
+  public static Throwable checkForAzureStorageException(Exception e) {
+
+    Throwable innerException = e.getCause();
+
+    while (innerException != null
+            && !(innerException instanceof StorageException)) {
+
+      innerException = innerException.getCause();
+    }
+
+    return innerException;
+  }
+
+  /*
+   * Helper method to check if the AzureStorageException is
+   * because backing blob was not found.
+   */
+  public static boolean isFileNotFoundException(StorageException e) {
+
+    String errorCode = e.getErrorCode();
+    if (errorCode != null
+        && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
+            || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
+            || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
+            || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
+
+      return true;
+    }
+
+    return false;
+  }
+
+  /*
+   * Helper method that logs stack traces from all live threads.
+   */
+  public static void logAllLiveStackTraces() {
+
+    for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
+      LOG.debug("Thread " + entry.getKey().getName());
+      StackTraceElement[] trace = entry.getValue();
+      for (int j = 0; j < trace.length; j++) {
+        LOG.debug("\tat " + trace[j]);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 0229cb7..f052b7f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -107,4 +107,6 @@ interface NativeFileSystemStore {
   void delete(String key, SelfRenewingLease lease) throws IOException;
       
   SelfRenewingLease acquireLease(String key) throws AzureException;
+
+  DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
index 8689375..b2b34f8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -29,8 +29,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -216,7 +214,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
       LOG.debug(ioThreadPool.toString());
       if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
         LOG.debug("Timed out after 10 minutes waiting for IO requests to finish");
-        logAllStackTraces();
+        NativeAzureFileSystemHelper.logAllLiveStackTraces();
         LOG.debug(ioThreadPool.toString());
         throw new IOException("Timed out waiting for IO requests to finish");
       }
@@ -230,18 +228,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
     closed = true;
   }
 
-  // Log the stacks of all threads.
-  private void logAllStackTraces() {
-    Map liveThreads = Thread.getAllStackTraces();
-    for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) {
-      Thread key = (Thread) i.next();
-      LOG.debug("Thread " + key.getName());
-      StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key);
-      for (int j = 0; j < trace.length; j++) {
-        LOG.debug("\tat " + trace[j]);
-      }
-    }
-  }
+
 
   /**
    * A single write request for data to write to Azure storage.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index ce5f749..c2169a4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -24,11 +24,13 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.EnumSet;
 import java.util.HashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 
+import com.microsoft.azure.storage.AccessCondition;
 import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.OperationContext;
 import com.microsoft.azure.storage.RetryPolicyFactory;
@@ -36,6 +38,8 @@ import com.microsoft.azure.storage.StorageCredentials;
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
 import com.microsoft.azure.storage.blob.BlobRequestOptions;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CopyState;
@@ -269,13 +273,13 @@ abstract class StorageInterface {
 
     /**
      * Uploads the container's metadata using the specified operation context.
-     * 
+     *
      * @param opContext
      *          An {@link OperationContext} object that represents the context
      *          for the current operation. This object is used to track requests
      *          to the storage service, and to provide additional runtime
      *          information about the operation.
-     * 
+     *
      * @throws StorageException
      *           If a storage service error occurred.
      */
@@ -545,6 +549,30 @@ abstract class StorageInterface {
     void uploadMetadata(OperationContext opContext)
         throws StorageException;
 
+    /**
+     * Uploads the blob's metadata to the storage service using the specified
+     * lease ID, request options, and operation context.
+     *
+     * @param accessCondition
+     *           A {@link AccessCondition} object that represents the access conditions for the blob.
+     *
+     * @param options
+     *            A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+     *            <code>null</code> will use the default request options from the associated service client (
+     *            {@link CloudBlobClient}).
+     *
+     * @param opContext
+     *          An {@link OperationContext} object that represents the context
+     *          for the current operation. This object is used to track requests
+     *          to the storage service, and to provide additional runtime
+     *          information about the operation.
+     *
+     * @throws StorageException
+     *           If a storage service error occurred.
+     */
+    void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+        OperationContext opContext) throws StorageException;
+
     void uploadProperties(OperationContext opContext,
         SelfRenewingLease lease)
         throws StorageException;
@@ -602,6 +630,63 @@ abstract class StorageInterface {
     OutputStream openOutputStream(
         BlobRequestOptions options,
         OperationContext opContext) throws StorageException;
+
+    /**
+     *
+     * @param filter    A {@link BlockListingFilter} value that specifies whether to download
+     *                  committed blocks, uncommitted blocks, or all blocks.
+     * @param options   A {@link BlobRequestOptions} object that specifies any additional options for
+     *                  the request. Specifying null will use the default request options from
+     *                  the associated service client ( CloudBlobClient).
+     * @param opContext An {@link OperationContext} object that represents the context for the current
+     *                  operation. This object is used to track requests to the storage service,
+     *                  and to provide additional runtime information about the operation.
+     * @return          An ArrayList object of {@link BlockEntry} objects that represent the list
+     *                  block items downloaded from the block blob.
+     * @throws IOException  If an I/O error occurred.
+     * @throws StorageException If a storage service error occurred.
+     */
+    List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException;
+
+    /**
+     *
+     * @param blockId      A String that represents the Base-64 encoded block ID. Note for a given blob
+     *                     the length of all Block IDs must be identical.
+     * @param sourceStream An {@link InputStream} object that represents the input stream to write to the
+     *                     block blob.
+     * @param length       A long which represents the length, in bytes, of the stream data,
+     *                     or -1 if unknown.
+     * @param options      A {@link BlobRequestOptions} object that specifies any additional options for the
+     *                     request. Specifying null will use the default request options from the
+     *                     associated service client ( CloudBlobClient).
+     * @param opContext    An {@link OperationContext} object that represents the context for the current operation.
+     *                     This object is used to track requests to the storage service, and to provide
+     *                     additional runtime information about the operation.
+     * @throws IOException  If an I/O error occurred.
+     * @throws StorageException If a storage service error occurred.
+     */
+    void uploadBlock(String blockId, InputStream sourceStream,
+        long length, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException;
+
+    /**
+     *
+     * @param blockList       An enumerable collection of {@link BlockEntry} objects that represents the list
+     *                        block items being committed. The size field is ignored.
+     * @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob.
+     * @param options         A {@link BlobRequestOptions} object that specifies any additional options for the
+     *                        request. Specifying null will use the default request options from the associated
+     *                        service client ( CloudBlobClient).
+     * @param opContext       An {@link OperationContext} object that represents the context for the current operation.
+     *                        This object is used to track requests to the storage service, and to provide additional
+     *                        runtime information about the operation.
+     * @throws IOException      If an I/O error occurred.
+     * @throws StorageException If a storage service error occurred.
+     */
+    void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException;
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 382ff66..298f3aa 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
-
+import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 
 import com.microsoft.azure.storage.AccessCondition;
@@ -40,6 +40,8 @@ import com.microsoft.azure.storage.StorageUri;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobProperties;
 import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
@@ -362,7 +364,13 @@ class StorageInterfaceImpl extends StorageInterface {
     @Override
     public void uploadMetadata(OperationContext opContext)
         throws StorageException {
-      getBlob().uploadMetadata(null, null, opContext);
+      uploadMetadata(null, null, opContext);
+    }
+
+    @Override
+    public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options,
+        OperationContext opContext) throws StorageException{
+      getBlob().uploadMetadata(accessConditions, options, opContext);
     }
 
     public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
@@ -396,7 +404,7 @@ class StorageInterfaceImpl extends StorageInterface {
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
         OperationContext opContext)
             throws StorageException, URISyntaxException {
-      getBlob().startCopyFromBlob(((CloudBlobWrapperImpl)sourceBlob).blob,
+      getBlob().startCopyFromBlob(((CloudBlobWrapperImpl) sourceBlob).blob,
           null, null, options, opContext);
     }
 
@@ -440,6 +448,25 @@ class StorageInterfaceImpl extends StorageInterface {
       getBlob().uploadProperties(null, null, opContext);
     }
 
+    @Override
+    public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException {
+      return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext);
+
+    }
+
+    @Override
+    public void uploadBlock(String blockId, InputStream sourceStream,
+        long length, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException {
+      ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
+    }
+
+    @Override
+    public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException {
+      ((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext);
+    }
   }
 
   static class CloudPageBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudPageBlobWrapper {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 9d0115a..4402467 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -23,6 +23,7 @@
     * [Page Blob Support and Configuration](#Page_Blob_Support_and_Configuration)
     * [Atomic Folder Rename](#Atomic_Folder_Rename)
     * [Accessing wasb URLs](#Accessing_wasb_URLs)
+    * [Append API Support and Configuration](#Append_API_Support_and_Configuration)
 * [Testing the hadoop-azure Module](#Testing_the_hadoop-azure_Module)
 
 ## <a name="Introduction" />Introduction
@@ -51,7 +52,6 @@ on the additional artifacts it requires, notably the
 
 ## <a name="Limitations" />Limitations
 
-* The append operation is not implemented.
 * File owner and group are persisted, but the permissions model is not enforced.
   Authorization occurs at the level of the entire Azure Blob Storage account.
 * File last access time is not tracked.
@@ -199,6 +199,24 @@ It's also possible to configure `fs.defaultFS` to use a `wasb` or `wasbs` URL.
 This causes all bare paths, such as `/testDir/testFile` to resolve automatically
 to that file system.
 
+### <a name="Append_API_Support_and_Configuration" />Append API Support and Configuration
+
+The Azure Blob Storage interface for Hadoop has optional support for Append API for
+single writer by setting the configuration `fs.azure.enable.append.support` to true.
+
+For Example:
+
+    <property>
+      <name>fs.azure.enable.append.support</name>
+      <value>true</value>
+    </property>
+
+It must be noted Append support in Azure Blob Storage interface DIFFERS FROM HDFS SEMANTICS. Append
+support does not enforce single writer internally but requires applications to guarantee this semantic.
+It becomes a responsibility of the application either to ensure single-threaded handling for a particular
+file path, or rely on some external locking mechanism of its own.  Failure to do so will result in
+unexpected behavior.
+
 ## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
 
 The hadoop-azure module includes a full suite of unit tests.  Most of the tests

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index 9f84f4b..2bb2a9a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -32,11 +32,12 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.TimeZone;
-
+import java.util.List;
 import org.apache.commons.httpclient.URIException;
 import org.apache.commons.httpclient.util.URIUtil;
 import org.apache.commons.lang.NotImplementedException;
 
+import com.microsoft.azure.storage.AccessCondition;
 import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.OperationContext;
 import com.microsoft.azure.storage.RetryPolicyFactory;
@@ -46,6 +47,8 @@ import com.microsoft.azure.storage.StorageUri;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobProperties;
 import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 import com.microsoft.azure.storage.blob.CloudBlobDirectory;
@@ -524,6 +527,30 @@ public class MockStorageInterface extends StorageInterface {
     public CloudBlob getBlob() {
       return null;
     }
+
+    @Override
+    public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException {
+
+      throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
+    }
+    @Override
+    public void uploadBlock(String blockId, InputStream sourceStream,
+        long length, BlobRequestOptions options,
+        OperationContext opContext) throws IOException, StorageException {
+      throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");
+    }
+
+    @Override
+    public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition,
+        BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException {
+      throw new UnsupportedOperationException("commitBlockList not used in Mock Tests");
+    }
+
+    public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
+    }
   }
 
   class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
@@ -580,5 +607,10 @@ public class MockStorageInterface extends StorageInterface {
     public CloudBlob getBlob() {
       return null;
     }
+
+    public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
+    }
   }
 }


Mime
View raw message