hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [19/50] [abbrv] hadoop git commit: HADOOP-14520. WASB: Block compaction for Azure Block Blobs. Contributed by Georgi Chalakov
Date Mon, 11 Sep 2017 06:48:44 GMT
HADOOP-14520. WASB: Block compaction for Azure Block Blobs.
Contributed by Georgi Chalakov


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13eda500
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13eda500
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13eda500

Branch: refs/heads/YARN-5972
Commit: 13eda5000304099d1145631f9be13ce8a00b600d
Parents: d77ed23
Author: Steve Loughran <stevel@apache.org>
Authored: Thu Sep 7 18:35:03 2017 +0100
Committer: Steve Loughran <stevel@apache.org>
Committed: Thu Sep 7 18:35:03 2017 +0100

----------------------------------------------------------------------
 .../fs/azure/AzureNativeFileSystemStore.java    |   73 +-
 .../hadoop/fs/azure/BlockBlobAppendStream.java  | 1301 +++++++++++-------
 .../hadoop/fs/azure/NativeAzureFileSystem.java  |   77 +-
 .../hadoop/fs/azure/NativeFileSystemStore.java  |    5 +-
 .../fs/azure/SecureStorageInterfaceImpl.java    |   10 +-
 .../hadoop/fs/azure/SelfRenewingLease.java      |   10 +-
 .../hadoop/fs/azure/StorageInterface.java       |    3 +-
 .../hadoop/fs/azure/StorageInterfaceImpl.java   |   12 +-
 .../fs/azure/SyncableDataOutputStream.java      |   11 +
 .../hadoop-azure/src/site/markdown/index.md     |   34 +
 .../hadoop/fs/azure/MockStorageInterface.java   |    3 +-
 .../azure/TestAzureConcurrentOutOfBandIo.java   |    6 +-
 ...estNativeAzureFileSystemBlockCompaction.java |  266 ++++
 .../src/test/resources/log4j.properties         |    1 +
 14 files changed, 1273 insertions(+), 539 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index bd8ac68..639862f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -203,6 +203,23 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private Set<String> pageBlobDirs;
 
   /**
+   * Configuration key to indicate the set of directories in WASB where we
+   * should store files as block blobs with block compaction enabled.
+   *
+   * Entries can be directory paths relative to the container (e.g. "/path") or
+   * fully qualified wasb:// URIs (e.g.
+   * wasb://container@example.blob.core.windows.net/path)
+   */
+  public static final String KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES =
+          "fs.azure.block.blob.with.compaction.dir";
+
+  /**
+   * The set of directories where we should store files as block blobs with
+   * block compaction enabled.
+   */
+  private Set<String> blockBlobWithCompationDirs;
+
+  /**
    * Configuration key to indicate the set of directories in WASB where
    * we should do atomic folder rename synchronized with createNonRecursive.
    */
@@ -527,6 +544,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     // User-agent
     userAgentId = conf.get(USER_AGENT_ID_KEY, USER_AGENT_ID_DEFAULT);
 
+    // Extract the directories that should contain block blobs with compaction
+    blockBlobWithCompationDirs = getDirectorySet(
+        KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES);
+    LOG.debug("Block blobs with compaction directories:  {}",
+        setToString(blockBlobWithCompationDirs));
+
     // Extract directories that should have atomic rename applied.
     atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES);
     String hbaseRoot;
@@ -1165,6 +1188,17 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   /**
+   * Checks if the given key in Azure Storage should be stored as a block blobs
+   * with compaction enabled instead of normal block blob.
+   *
+   * @param key blob name
+   * @return true, if the file is in directory with block compaction enabled.
+   */
+  public boolean isBlockBlobWithCompactionKey(String key) {
+    return isKeyForDirectorySet(key, blockBlobWithCompationDirs);
+  }
+
+  /**
    * Checks if the given key in Azure storage should have synchronized
    * atomic folder rename createNonRecursive implemented.
    */
@@ -1356,7 +1390,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   @Override
-  public DataOutputStream storefile(String key, PermissionStatus permissionStatus)
+  public DataOutputStream storefile(String keyEncoded,
+                                    PermissionStatus permissionStatus,
+                                    String key)
       throws AzureException {
     try {
 
@@ -1417,12 +1453,26 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
       // Get the blob reference from the store's container and
       // return it.
-      CloudBlobWrapper blob = getBlobReference(key);
+      CloudBlobWrapper blob = getBlobReference(keyEncoded);
       storePermissionStatus(blob, permissionStatus);
 
       // Create the output stream for the Azure blob.
       //
-      OutputStream outputStream = openOutputStream(blob);
+      OutputStream outputStream;
+
+      if (isBlockBlobWithCompactionKey(key)) {
+        BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream(
+            (CloudBlockBlobWrapper) blob,
+            keyEncoded,
+            this.uploadBlockSizeBytes,
+            true,
+            getInstrumentedContext());
+
+        outputStream = blockBlobOutputStream;
+      } else {
+        outputStream = openOutputStream(blob);
+      }
+
       DataOutputStream dataOutStream = new SyncableDataOutputStream(outputStream);
       return dataOutStream;
     } catch (Exception e) {
@@ -2869,10 +2919,21 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
       CloudBlobWrapper blob =  this.container.getBlockBlobReference(key);
 
-      BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext());
-      appendStream.initialize();
+      OutputStream outputStream;
+
+      BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream(
+          (CloudBlockBlobWrapper) blob,
+          key,
+          bufferSize,
+          isBlockBlobWithCompactionKey(key),
+          getInstrumentedContext());
+
+      outputStream = blockBlobOutputStream;
+
+      DataOutputStream dataOutStream = new SyncableDataOutputStream(
+          outputStream);
 
-      return new DataOutputStream(appendStream);
+      return dataOutStream;
     } catch(Exception ex) {
       throw new AzureException(ex);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
index afb9379..84342cd 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
@@ -22,122 +22,256 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Locale;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.Random;
-import java.util.TimeZone;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
 
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
-import org.eclipse.jetty.util.log.Log;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.microsoft.azure.storage.AccessCondition;
 import com.microsoft.azure.storage.OperationContext;
 import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
 import com.microsoft.azure.storage.blob.BlobRequestOptions;
 import com.microsoft.azure.storage.blob.BlockEntry;
 import com.microsoft.azure.storage.blob.BlockListingFilter;
+import com.microsoft.azure.storage.blob.BlockSearchMode;
+
+import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH;
+import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC;
 
 /**
- * Stream object that implememnts append for Block Blobs in WASB.
+ * Stream object that implements append for Block Blobs in WASB.
+ *
+ * The stream object implements hflush/hsync and block compaction. Block
+ * compaction is the process of replacing a sequence of small blocks with one
+ * big block. Azure Block blobs supports up to 50000 blocks and every
+ * hflush/hsync generates one block. When the number of blocks is above 32000,
+ * the process of compaction decreases the total number of blocks, if possible.
+ * If compaction is disabled, hflush/hsync are empty functions.
+ *
+ * The stream object uses background threads for uploading the blocks and the
+ * block blob list. Blocks can be uploaded concurrently. However, when the block
+ * list is uploaded, block uploading should stop. If a block is uploaded before
+ * the block list and the block id is not in the list, the block will be lost.
+ * If the block is uploaded after the block list and the block id is in the
+ * list, the block list upload will fail. The exclusive access for the block
+ * list upload is managed by uploadingSemaphore.
  */
-public class BlockBlobAppendStream extends OutputStream {
 
+public class BlockBlobAppendStream extends OutputStream implements Syncable,
+    StreamCapabilities {
+
+  /**
+   * The name of the blob/file.
+   */
   private final String key;
-  private final int bufferSize;
-  private ByteArrayOutputStream outBuffer;
-  private final CloudBlockBlobWrapper blob;
-  private final OperationContext opContext;
 
   /**
-   * Variable to track if the stream has been closed.
+   * This variable tracks if this is new blob or existing one.
+   */
+  private boolean blobExist;
+
+  /**
+   * When the blob exist, to to prevent concurrent write we take a lease.
+   * Taking a lease is not necessary for new blobs.
    */
-  private boolean closed = false;
+  private SelfRenewingLease lease = null;
 
   /**
-   * Variable to track if the append lease is released.
+   * The support for process of compaction is optional.
    */
+  private final boolean compactionEnabled;
 
-  private volatile boolean leaseFreed;
+  /**
+   * The number of blocks above each block compaction is triggered.
+   */
+  private static final int DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT = 32000;
 
   /**
-   * Variable to track if the append stream has been
-   * initialized.
+   * The number of blocks above each block compaction is triggered.
    */
+  private int activateCompactionBlockCount
+      = DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT;
 
-  private boolean initialized = false;
+  /**
+   * The size of the output buffer. Writes store the data in outBuffer until
+   * either the size is above maxBlockSize or hflush/hsync is called.
+   */
+  private final AtomicInteger maxBlockSize;
 
   /**
-   *  Last IOException encountered
+   * The current buffer where writes are stored.
    */
-  private volatile IOException lastError = null;
+  private ByteBuffer outBuffer;
 
   /**
-   * List to keep track of the uncommitted azure storage
-   * block ids
+   * The size of the blob that has been successfully stored in the Azure Blob
+   * service.
    */
-  private final List<BlockEntry> uncommittedBlockEntries;
+  private final AtomicLong committedBlobLength = new AtomicLong(0);
 
-  private static final int UNSET_BLOCKS_COUNT = -1;
+  /**
+   * Position of last block in the blob.
+   */
+  private volatile long blobLength = 0;
 
   /**
-   * Variable to hold the next block id to be used for azure
-   * storage blocks.
+   * Minutes waiting before the close operation timed out.
    */
-  private long nextBlockCount = UNSET_BLOCKS_COUNT;
+  private static final int CLOSE_UPLOAD_DELAY = 10;
 
   /**
-   * Variable to hold the block id prefix to be used for azure
-   * storage blocks from azure-storage-java sdk version 4.2.0 onwards
+   * Keep alive time for the threadpool.
    */
-  private String blockIdPrefix = null;
+  private static final int THREADPOOL_KEEP_ALIVE = 30;
+  /**
+   * Azure Block Blob used for the stream.
+   */
+  private final CloudBlockBlobWrapper blob;
+
+  /**
+   * Azure Storage operation context.
+   */
+  private final OperationContext opContext;
+
+  /**
+   * Commands send from client calls to the background thread pool.
+   */
+  private abstract class UploadCommand {
+
+    // the blob offset for the command
+    private final long commandBlobOffset;
+
+    // command completion latch
+    private final CountDownLatch completed = new CountDownLatch(1);
+
+    UploadCommand(long offset) {
+      this.commandBlobOffset = offset;
+    }
+
+    long getCommandBlobOffset() {
+      return commandBlobOffset;
+    }
+
+    void await() throws InterruptedException {
+      completed.await();
+    }
+
+    void awaitAsDependent() throws InterruptedException {
+      await();
+    }
+
+    void setCompleted() {
+      completed.countDown();
+    }
 
-  private final Random sequenceGenerator = new Random();
+    void execute() throws InterruptedException, IOException {}
+
+    void dump() {}
+  }
+
+  /**
+   * The list of recent commands. Before block list is committed, all the block
+   * listed in the list must be uploaded. activeBlockCommands is used for
+   * enumerating the blocks and waiting on the latch until the block is
+   * uploaded.
+   */
+  private final ConcurrentLinkedQueue<UploadCommand> activeBlockCommands
+      = new ConcurrentLinkedQueue<>();
+
+  /**
+   * Variable to track if the stream has been closed.
+   */
+  private volatile boolean closed = false;
+
+  /**
+   *  First IOException encountered.
+   */
+  private final AtomicReference<IOException> firstError
+          = new AtomicReference<>();
+
+  /**
+   * Flag set when the first error has been thrown.
+   */
+  private boolean firstErrorThrown = false;
 
   /**
-   *  Time to wait to renew lease in milliseconds
+   * Semaphore for serializing block uploads with NativeAzureFileSystem.
+   *
+   * The semaphore starts with number of permits equal to the number of block
+   * upload threads. Each block upload thread needs one permit to start the
+   * upload. The put block list acquires all the permits before the block list
+   * is committed.
    */
-  private static final int LEASE_RENEWAL_PERIOD = 10000;
+  private final Semaphore uploadingSemaphore = new Semaphore(
+      MAX_NUMBER_THREADS_IN_THREAD_POOL,
+      true);
 
   /**
-   *  Number of times to retry for lease renewal
+   * Queue storing buffers with the size of the Azure block ready for
+   * reuse. The pool allows reusing the blocks instead of allocating new
+   * blocks. After the data is sent to the service, the buffer is returned
+   * back to the queue
    */
-  private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
+  private final ElasticByteBufferPool poolReadyByteBuffers
+          = new ElasticByteBufferPool();
 
   /**
-   *  Time to wait before retrying to set the lease
+   * The blob's block list.
    */
-  private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
+  private final List<BlockEntry> blockEntries = new ArrayList<>(
+      DEFAULT_CAPACITY_BLOCK_ENTRIES);
+
+  private static final int DEFAULT_CAPACITY_BLOCK_ENTRIES = 1024;
 
   /**
-   *  Metadata key used on the blob to indicate append lease is active
+   * The uncommitted blob's block list.
    */
-  public static final String APPEND_LEASE = "append_lease";
+  private final ConcurrentLinkedDeque<BlockEntry> uncommittedBlockEntries
+      = new ConcurrentLinkedDeque<>();
+
+  /**
+   * Variable to hold the next block id to be used for azure storage blocks.
+   */
+  private static final int UNSET_BLOCKS_COUNT = -1;
+  private long nextBlockCount = UNSET_BLOCKS_COUNT;
 
   /**
-   * Timeout value for the append lease in millisecs. If the lease is not
-   * renewed within 30 seconds then another thread can acquire the append lease
-   * on the blob
+   * Variable to hold the block id prefix to be used for azure storage blocks.
    */
-  public static final int APPEND_LEASE_TIMEOUT = 30000;
+  private String blockIdPrefix = null;
 
   /**
-   *  Metdata key used on the blob to indicate last modified time of append lease
+   *  Maximum number of threads in block upload thread pool.
    */
-  public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
+  private static final int MAX_NUMBER_THREADS_IN_THREAD_POOL = 4;
 
   /**
    * Number of times block upload needs is retried.
@@ -145,17 +279,33 @@ public class BlockBlobAppendStream extends OutputStream {
   private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
 
   /**
-   * Wait time between block upload retries in millisecs.
+   * Wait time between block upload retries in milliseconds.
    */
   private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
 
-  private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
+  /**
+   * Logger.
+   */
+  private static final Logger LOG =
+          LoggerFactory.getLogger(BlockBlobAppendStream.class);
 
+  /**
+   * The absolute maximum of blocks for a blob. It includes committed and
+   * temporary blocks.
+   */
   private static final int MAX_BLOCK_COUNT = 100000;
 
+  /**
+   * The upload thread pool executor.
+   */
   private ThreadPoolExecutor ioThreadPool;
 
   /**
+   * Azure Storage access conditions for the blob.
+   */
+  private final AccessCondition accessCondition = new AccessCondition();
+
+  /**
    * Atomic integer to provide thread id for thread names for uploader threads.
    */
   private final AtomicInteger threadSequenceNumber;
@@ -163,106 +313,123 @@ public class BlockBlobAppendStream extends OutputStream {
   /**
    * Prefix to be used for thread names for uploader threads.
    */
-  private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
-
-  private static final String UTC_STR = "UTC";
+  private static final String THREAD_ID_PREFIX = "append-blockblob";
 
+  /**
+   * BlockBlobAppendStream constructor.
+   *
+   * @param blob
+   *          Azure Block Blob
+   * @param aKey
+   *          blob's name
+   * @param bufferSize
+   *          the maximum size of a blob block.
+   * @param compactionEnabled
+   *          is the compaction process enabled for this blob
+   * @param opContext
+   *          Azure Store operation context for the blob
+   * @throws IOException
+   *           if an I/O error occurs. In particular, an IOException may be
+   *           thrown if the output stream cannot be used for append operations
+   */
   public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
-      final String aKey, final int bufferSize, final OperationContext opContext)
+                               final String aKey,
+                               final int bufferSize,
+                               final boolean compactionEnabled,
+                               final OperationContext opContext)
           throws IOException {
 
-    if (null == aKey || 0 == aKey.length()) {
-      throw new IllegalArgumentException(
-          "Illegal argument: The key string is null or empty");
-    }
-
-    if (0 >= bufferSize) {
-      throw new IllegalArgumentException(
-          "Illegal argument bufferSize cannot be zero or negative");
-    }
-
+    Preconditions.checkArgument(StringUtils.isNotEmpty(aKey));
+    Preconditions.checkArgument(bufferSize >= 0);
 
     this.blob = blob;
     this.opContext = opContext;
     this.key = aKey;
-    this.bufferSize = bufferSize;
+    this.maxBlockSize = new AtomicInteger(bufferSize);
     this.threadSequenceNumber = new AtomicInteger(0);
     this.blockIdPrefix = null;
-    setBlocksCountAndBlockIdPrefix();
+    this.compactionEnabled = compactionEnabled;
+    this.blobExist = true;
+    this.outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get());
 
-    this.outBuffer = new ByteArrayOutputStream(bufferSize);
-    this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
-
-    // Acquire append lease on the blob.
     try {
-      //Set the append lease if the value of the append lease is false
-      if (!updateBlobAppendMetadata(true, false)) {
-        LOG.error("Unable to set Append Lease on the Blob : {} "
-            + "Possibly because another client already has a create or append stream open on the Blob", key);
-        throw new IOException("Unable to set Append lease on the Blob. "
-            + "Possibly because another client already had an append stream open on the Blob.");
-      }
-    } catch (StorageException ex) {
-      LOG.error("Encountered Storage exception while acquiring append "
-          + "lease on blob : {}. Storage Exception : {} ErrorCode : {}",
-          key, ex, ex.getErrorCode());
+      // download the block list
+      blockEntries.addAll(
+          blob.downloadBlockList(
+              BlockListingFilter.COMMITTED,
+              new BlobRequestOptions(),
+              opContext));
+
+      blobLength = blob.getProperties().getLength();
+
+      committedBlobLength.set(blobLength);
 
-      throw new IOException(ex);
+      // Acquiring lease on the blob.
+      lease = new SelfRenewingLease(blob, true);
+      accessCondition.setLeaseID(lease.getLeaseID());
+
+    } catch (StorageException ex) {
+      if (ex.getErrorCode().equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)) {
+        blobExist = false;
+      }
+      else if (ex.getErrorCode().equals(
+              StorageErrorCodeStrings.LEASE_ALREADY_PRESENT)) {
+        throw new AzureException(
+                "Unable to set Append lease on the Blob: " + ex, ex);
+      }
+      else {
+        LOG.debug(
+            "Encountered storage exception."
+                + " StorageException : {} ErrorCode : {}",
+            ex,
+            ex.getErrorCode());
+        throw new AzureException(ex);
+      }
     }
 
-    leaseFreed = false;
+    setBlocksCountAndBlockIdPrefix(blockEntries);
+
+    this.ioThreadPool = new ThreadPoolExecutor(
+        MAX_NUMBER_THREADS_IN_THREAD_POOL,
+        MAX_NUMBER_THREADS_IN_THREAD_POOL,
+        THREADPOOL_KEEP_ALIVE,
+        TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(),
+        new UploaderThreadFactory());
   }
 
   /**
-   * Helper method that starts an Append Lease renewer thread and the
-   * thread pool.
+   * Set payload size of the stream.
+   * It is intended to be used for unit testing purposes only.
    */
-  public synchronized void initialize() {
-
-    if (initialized) {
-      return;
-    }
-    /*
-     * Start the thread for  Append lease renewer.
-     */
-    Thread appendLeaseRenewer = new Thread(new AppendRenewer());
-    appendLeaseRenewer.setDaemon(true);
-    appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key));
-    appendLeaseRenewer.start();
-
-    /*
-     * Parameters to ThreadPoolExecutor:
-     * corePoolSize : the number of threads to keep in the pool, even if they are idle,
-     *                unless allowCoreThreadTimeOut is set
-     * maximumPoolSize : the maximum number of threads to allow in the pool
-     * keepAliveTime - when the number of threads is greater than the core,
-     *                 this is the maximum time that excess idle threads will
-     *                 wait for new tasks before terminating.
-     * unit - the time unit for the keepAliveTime argument
-     * workQueue - the queue to use for holding tasks before they are executed
-     *  This queue will hold only the Runnable tasks submitted by the execute method.
-     */
-    this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS,
-        new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory());
+  @VisibleForTesting
+  synchronized void setMaxBlockSize(int size) {
+    maxBlockSize.set(size);
 
-    initialized = true;
+    // it is for testing only so we can abandon the previously allocated
+    // payload
+    this.outBuffer = ByteBuffer.allocate(maxBlockSize.get());
   }
 
   /**
-   * Get the blob name.
-   *
-   * @return String Blob name.
+   * Set compaction parameters.
+   * It is intended to be used for unit testing purposes only.
    */
-  public String getKey() {
-    return key;
+  @VisibleForTesting
+  void setCompactionBlockCount(int activationCount) {
+    activateCompactionBlockCount = activationCount;
   }
 
   /**
-   * Get the backing blob.
-   * @return buffer size of the stream.
+   * Get the list of block entries. It is used for testing purposes only.
+   * @return List of block entries.
    */
-  public int getBufferSize() {
-    return bufferSize;
+  @VisibleForTesting
+  List<BlockEntry> getBlockList() throws StorageException, IOException {
+    return blob.downloadBlockList(
+        BlockListingFilter.COMMITTED,
+        new BlobRequestOptions(),
+        opContext);
   }
 
   /**
@@ -283,21 +450,6 @@ public class BlockBlobAppendStream extends OutputStream {
   }
 
   /**
-   * Writes b.length bytes from the specified byte array to this output stream.
-   *
-   * @param data
-   *          the byte array to write.
-   *
-   * @throws IOException
-   *           if an I/O error occurs. In particular, an IOException may be
-   *           thrown if the output stream has been closed.
-   */
-  @Override
-  public void write(final byte[] data) throws IOException {
-    write(data, 0, data.length);
-  }
-
-  /**
    * Writes length bytes from the specified byte array starting at offset to
    * this output stream.
    *
@@ -312,529 +464,678 @@ public class BlockBlobAppendStream extends OutputStream {
    *           thrown if the output stream has been closed.
    */
   @Override
-  public void write(final byte[] data, final int offset, final int length)
+  public synchronized void write(final byte[] data, int offset, int length)
       throws IOException {
+    Preconditions.checkArgument(data != null, "null data");
 
     if (offset < 0 || length < 0 || length > data.length - offset) {
-      throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
-    }
-
-    writeInternal(data, offset, length);
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-
-    if (!initialized) {
-      throw new IOException("Trying to close an uninitialized Append stream");
+      throw new IndexOutOfBoundsException();
     }
 
     if (closed) {
-      return;
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
     }
 
-    if (leaseFreed) {
-      throw new IOException(String.format("Attempting to close an append stream on blob : %s "
-          + " that does not have lease on the Blob. Failing close", key));
-    }
+    while (outBuffer.remaining() < length) {
+
+      int remaining = outBuffer.remaining();
+      outBuffer.put(data, offset, remaining);
+
+      // upload payload to azure storage
+      addBlockUploadCommand();
 
-    if (outBuffer.size() > 0) {
-      uploadBlockToStorage(outBuffer.toByteArray());
+      offset += remaining;
+      length -= remaining;
     }
 
-    ioThreadPool.shutdown();
+    outBuffer.put(data, offset, length);
+  }
 
-    try {
-      if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
-        LOG.error("Time out occurred while waiting for IO request to finish in append"
-            + " for blob : {}", key);
-        NativeAzureFileSystemHelper.logAllLiveStackTraces();
-        throw new IOException("Timed out waiting for IO requests to finish");
-      }
-    } catch(InterruptedException intrEx) {
 
-      // Restore the interrupted status
-      Thread.currentThread().interrupt();
-      LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key);
-      throw new IOException("Append Commit interrupted.");
-    }
+  /**
+   * Flushes this output stream and forces any buffered output bytes to be
+   * written out. If any data remains in the payload it is committed to the
+   * service. Data is queued for writing and forced out to the service
+   * before the call returns.
+   */
+  @Override
+  public void flush() throws IOException {
 
-    // Calling commit after all blocks are succesfully uploaded.
-    if (lastError == null) {
-      commitAppendBlocks();
+    if (closed) {
+      // calling close() after the stream is closed starts with call to flush()
+      return;
     }
 
-    // Perform cleanup.
-    cleanup();
+    addBlockUploadCommand();
 
-    if (lastError != null) {
-      throw lastError;
+    if (committedBlobLength.get() < blobLength) {
+      try {
+        // wait until the block list is committed
+        addFlushCommand().await();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
     }
   }
 
   /**
-   * Helper method that cleans up the append stream.
+   * Force all data in the output stream to be written to Azure storage.
+   * Wait to return until this is complete.
    */
-  private synchronized void cleanup() {
-
-    closed = true;
-
-    try {
-      // Set the value of append lease to false if the value is set to true.
-        updateBlobAppendMetadata(false, true);
-    } catch(StorageException ex) {
-      LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} "
-          + "Error Code : {}",
-          key, ex, ex.getErrorCode());
-      lastError = new IOException(ex);
+  @Override
+  public void hsync() throws IOException {
+    // when block compaction is disabled, hsync is empty function
+    if (compactionEnabled) {
+      flush();
     }
+  }
 
-    leaseFreed = true;
+  /**
+   * Force all data in the output stream to be written to Azure storage.
+   * Wait to return until this is complete.
+   */
+  @Override
+  public void hflush() throws IOException {
+    // when block compaction is disabled, hflush is empty function
+    if (compactionEnabled) {
+      flush();
+    }
   }
 
   /**
-   * Method to commit all the uncommited blocks to azure storage.
-   * If the commit fails then blocks are automatically cleaned up
-   * by Azure storage.
-   * @throws IOException
+   * The Synchronization capabilities of this stream depend upon the compaction
+   * policy.
+   * @param capability string to query the stream support for.
+   * @return true for hsync and hflush when compaction is enabled.
    */
-  private synchronized void commitAppendBlocks() throws IOException {
+  @Override
+  public boolean hasCapability(String capability) {
+    return compactionEnabled
+        && (capability.equalsIgnoreCase(HSYNC.getValue())
+        || capability.equalsIgnoreCase((HFLUSH.getValue())));
+  }
 
-    SelfRenewingLease lease = null;
+  /**
+   * Force all data in the output stream to be written to Azure storage.
+   * Wait to return until this is complete. Close the access to the stream and
+   * shutdown the upload thread pool.
+   * If the blob was created, its lease will be released.
+   * Any error encountered caught in threads and stored will be rethrown here
+   * after cleanup.
+   */
+  @Override
+  public synchronized void close() throws IOException {
 
-    try {
-      if (uncommittedBlockEntries.size() > 0) {
+    LOG.debug("close {} ", key);
 
-        //Acquiring lease on the blob.
-        lease = new SelfRenewingLease(blob);
+    if (closed) {
+      return;
+    }
 
-        // Downloading existing blocks
-        List<BlockEntry> blockEntries =  blob.downloadBlockList(BlockListingFilter.COMMITTED,
-            new BlobRequestOptions(), opContext);
+    // Upload the last block regardless of compactionEnabled flag
+    flush();
 
-        // Adding uncommitted blocks.
-        blockEntries.addAll(uncommittedBlockEntries);
+    // Initiates an orderly shutdown in which previously submitted tasks are
+    // executed.
+    ioThreadPool.shutdown();
 
-        AccessCondition accessCondition = new AccessCondition();
-        accessCondition.setLeaseID(lease.getLeaseID());
-        blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext);
-        uncommittedBlockEntries.clear();
+    try {
+      // wait up to CLOSE_UPLOAD_DELAY minutes to upload all the blocks
+      if (!ioThreadPool.awaitTermination(CLOSE_UPLOAD_DELAY, TimeUnit.MINUTES)) {
+        LOG.error("Time out occurred while close() is waiting for IO request to"
+            + " finish in append"
+            + " for blob : {}",
+            key);
+        NativeAzureFileSystemHelper.logAllLiveStackTraces();
+        throw new AzureException("Timed out waiting for IO requests to finish");
       }
-    } catch(StorageException ex) {
-      LOG.error("Storage exception encountered during block commit phase of append for blob"
-          + " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode());
-      throw new IOException("Encountered Exception while committing append blocks", ex);
-    } finally {
-      if (lease != null) {
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    // release the lease
+    if (firstError.get() == null && blobExist) {
         try {
           lease.free();
-        } catch(StorageException ex) {
-          LOG.debug("Exception encountered while releasing lease for "
-              + "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode());
-          // Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object.
+        } catch (StorageException ex) {
+          LOG.debug("Lease free update blob {} encountered Storage Exception:"
+              + " {} Error Code : {}",
+              key,
+              ex,
+              ex.getErrorCode());
+          maybeSetFirstError(new AzureException(ex));
         }
-      }
+    }
+
+    closed = true;
+
+    // finally, throw the first exception raised if it has not
+    // been thrown elsewhere.
+    if (firstError.get() != null && !firstErrorThrown) {
+      throw firstError.get();
     }
   }
 
   /**
-   * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure
-   * storage SDK.
+   * Helper method used to generate the blockIDs. The algorithm used is similar
+   * to the Azure storage SDK.
    */
-  private void setBlocksCountAndBlockIdPrefix() throws IOException {
+  private void setBlocksCountAndBlockIdPrefix(List<BlockEntry> blockEntries) {
 
-    try {
+    if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix == null) {
 
-      if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix==null) {
+      Random sequenceGenerator = new Random();
 
-        List<BlockEntry> blockEntries =
-            blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext);
+      String blockZeroBlockId = (!blockEntries.isEmpty())
+          ? blockEntries.get(0).getId()
+          : "";
+      String prefix = UUID.randomUUID().toString() + "-";
+      String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix,
+          0);
 
-        String blockZeroBlockId = (blockEntries.size() > 0) ? blockEntries.get(0).getId() : "";
-        String prefix = UUID.randomUUID().toString() + "-";
-        String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix, 0);
+      if (!blockEntries.isEmpty()
+          && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) {
 
-        if (blockEntries.size() > 0 && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) {
+        // If blob has already been created with 2.2.0, append subsequent blocks
+        // with older version (2.2.0) blockId compute nextBlockCount, the way it
+        // was done before; and don't use blockIdPrefix
+        this.blockIdPrefix = "";
+        nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
+            + sequenceGenerator.nextInt(
+                Integer.MAX_VALUE - MAX_BLOCK_COUNT);
+        nextBlockCount += blockEntries.size();
 
-          // If blob has already been created with 2.2.0, append subsequent blocks with older version (2.2.0) blockId
-          // compute nextBlockCount, the way it was done before; and don't use blockIdPrefix
-          this.blockIdPrefix = "";
-          nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
-              + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
-          nextBlockCount += blockEntries.size();
-
-        } else {
-
-          // If there are no existing blocks, create the first block with newer version (4.2.0) blockId
-          // If blob has already been created with 4.2.0, append subsequent blocks with newer version (4.2.0) blockId
-          this.blockIdPrefix = prefix;
-          nextBlockCount = blockEntries.size();
-
-        }
+      } else {
 
+        // If there are no existing blocks, create the first block with newer
+        // version (4.2.0) blockId. If blob has already been created with 4.2.0,
+        // append subsequent blocks with newer version (4.2.0) blockId
+        this.blockIdPrefix = prefix;
+        nextBlockCount = blockEntries.size();
       }
-
-    } catch (StorageException ex) {
-      LOG.debug("Encountered storage exception during setting next Block Count and BlockId prefix."
-          + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
-      throw new IOException(ex);
     }
   }
 
   /**
-   * Helper method that generates the next block id for uploading a block to azure storage.
+   * Helper method that generates the next block id for uploading a block to
+   * azure storage.
    * @return String representing the block ID generated.
-   * @throws IOException
+   * @throws IOException if the stream is in invalid state
    */
   private String generateBlockId() throws IOException {
 
-    if (nextBlockCount == UNSET_BLOCKS_COUNT) {
-      throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
-    }
-
-    if (this.blockIdPrefix == null) {
-      throw new IOException("Append Stream in invalid state. blockIdPrefix not set correctly");
-    }
-
-    if (!this.blockIdPrefix.equals("")) {
-
-      return generateNewerVersionBlockId(this.blockIdPrefix, nextBlockCount++);
-
-    } else {
-
-      return generateOlderVersionBlockId(nextBlockCount++);
-
+    if (nextBlockCount == UNSET_BLOCKS_COUNT || blockIdPrefix == null) {
+      throw new AzureException(
+            "Append Stream in invalid state. nextBlockCount not set correctly");
     }
 
+    return (!blockIdPrefix.isEmpty())
+        ? generateNewerVersionBlockId(blockIdPrefix, nextBlockCount++)
+        : generateOlderVersionBlockId(nextBlockCount++);
   }
 
   /**
-   * Helper method that generates an older (2.2.0) version blockId
+   * Helper method that generates an older (2.2.0) version blockId.
    * @return String representing the block ID generated.
    */
   private String generateOlderVersionBlockId(long id) {
 
-    byte[] blockIdInBytes = getBytesFromLong(id);
-    return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
+    byte[] blockIdInBytes = new byte[8];
+    for (int m = 0; m < 8; m++) {
+      blockIdInBytes[7 - m] = (byte) ((id >> (8 * m)) & 0xFF);
+    }
+
+    return new String(
+            Base64.encodeBase64(blockIdInBytes),
+            StandardCharsets.UTF_8);
   }
 
   /**
-   * Helper method that generates an newer (4.2.0) version blockId
+   * Helper method that generates an newer (4.2.0) version blockId.
    * @return String representing the block ID generated.
    */
   private String generateNewerVersionBlockId(String prefix, long id) {
 
     String blockIdSuffix  = String.format("%06d", id);
-    byte[] blockIdInBytes = (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8);
+    byte[] blockIdInBytes =
+            (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8);
     return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
   }
 
   /**
-   * Returns a byte array that represents the data of a <code>long</code> value. This
-   * utility method is copied from com.microsoft.azure.storage.core.Utility class.
-   * This class is marked as internal, hence we clone the method here and not express
-   * dependency on the Utility Class
-   *
-   * @param value
-   *            The value from which the byte array will be returned.
-   *
-   * @return A byte array that represents the data of the specified <code>long</code> value.
+   * This is shared between upload block Runnable and CommitBlockList. The
+   * method captures retry logic
+   * @param blockId block name
+   * @param dataPayload block content
    */
-  private static byte[] getBytesFromLong(final long value) {
+  private void writeBlockRequestInternal(String blockId,
+                                         ByteBuffer dataPayload,
+                                         boolean bufferPoolBuffer) {
+    IOException lastLocalException = null;
+
+    int uploadRetryAttempts = 0;
+    while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
+      try {
+        long startTime = System.nanoTime();
+
+        blob.uploadBlock(blockId, accessCondition, new ByteArrayInputStream(
+            dataPayload.array()), dataPayload.position(),
+            new BlobRequestOptions(), opContext);
 
-    final byte[] tempArray = new byte[8];
+        LOG.debug("upload block finished for {} ms. block {} ",
+            TimeUnit.NANOSECONDS.toMillis(
+                System.nanoTime() - startTime), blockId);
+        break;
+
+      } catch(Exception ioe) {
+        LOG.debug("Encountered exception during uploading block for Blob {}"
+            + " Exception : {}", key, ioe);
+        uploadRetryAttempts++;
+        lastLocalException = new AzureException(
+            "Encountered Exception while uploading block: " + ioe, ioe);
+        try {
+          Thread.sleep(
+              BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1));
+        } catch(InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
 
-    for (int m = 0; m < 8; m++) {
-      tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
+    if (bufferPoolBuffer) {
+      poolReadyByteBuffers.putBuffer(dataPayload);
     }
 
-    return tempArray;
+    if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
+      maybeSetFirstError(lastLocalException);
+    }
   }
 
   /**
-   * Helper method that creates a thread to upload a block to azure storage.
-   * @param payload
-   * @throws IOException
+   * Set {@link #firstError} to the exception if it is not already set.
+   * @param exception exception to save
    */
-  private synchronized void uploadBlockToStorage(byte[] payload)
-      throws IOException {
-
-    // upload payload to azure storage
-    String blockId = generateBlockId();
-
-    // Since uploads of the Azure storage are done in parallel threads, we go ahead
-    // add the blockId in the uncommitted list. If the upload of the block fails
-    // we don't commit the blockIds.
-    BlockEntry blockEntry = new BlockEntry(blockId);
-    blockEntry.setSize(payload.length);
-    uncommittedBlockEntries.add(blockEntry);
-    ioThreadPool.execute(new WriteRequest(payload, blockId));
+  private void maybeSetFirstError(IOException exception) {
+    firstError.compareAndSet(null, exception);
   }
 
 
   /**
-   * Helper method to updated the Blob metadata during Append lease operations.
-   * Blob metadata is updated to holdLease value only if the current lease
-   * status is equal to testCondition and the last update on the blob metadata
-   * is less that 30 secs old.
-   * @param holdLease
-   * @param testCondition
-   * @return true if the updated lease operation was successful or false otherwise
-   * @throws StorageException
+   * Throw the first error caught if it has not been raised already
+   * @throws IOException if one is caught and needs to be thrown.
    */
-  private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition)
-      throws StorageException {
-
-    SelfRenewingLease lease = null;
-    StorageException lastStorageException = null;
-    int leaseRenewalRetryCount = 0;
-
-    /*
-     * Updating the Blob metadata honours following algorithm based on
-     *  1) If the append lease metadata is present
-     *  2) Last updated time of the append lease
-     *  3) Previous value of the Append lease metadata.
-     *
-     * The algorithm:
-     *  1) If append lease metadata is not part of the Blob. In this case
-     *     this is the first client to Append so we update the metadata.
-     *  2) If append lease metadata is present and timeout has occurred.
-     *     In this case irrespective of what the value of the append lease is we update the metadata.
-     *  3) If append lease metadata is present and is equal to testCondition value (passed as parameter)
-     *     and timeout has not occurred, we update the metadata.
-     *  4) If append lease metadata is present and is not equal to testCondition value (passed as parameter)
-     *     and timeout has not occurred, we do not update metadata and return false.
-     *
-     */
-    while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) {
-
-      lastStorageException = null;
-
-      synchronized(this) {
-        try {
-
-          final Calendar currentCalendar = Calendar
-              .getInstance(Locale.US);
-          currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
-          long currentTime = currentCalendar.getTime().getTime();
+  private void maybeThrowFirstError() throws IOException {
+    if (firstError.get() != null) {
+      firstErrorThrown = true;
+      throw firstError.get();
+    }
+  }
 
-          // Acquire lease on the blob.
-          lease = new SelfRenewingLease(blob);
+  /**
+   * Write block list. The method captures retry logic
+   */
+  private void writeBlockListRequestInternal() {
 
-          blob.downloadAttributes(opContext);
-          HashMap<String, String> metadata = blob.getMetadata();
+    IOException lastLocalException = null;
 
-          if (metadata.containsKey(APPEND_LEASE)
-              && currentTime - Long.parseLong(
-                  metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT
-              && !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) {
-            return false;
-          }
+    int uploadRetryAttempts = 0;
+    while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
+      try {
 
-          metadata.put(APPEND_LEASE, Boolean.toString(holdLease));
-          metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime));
-          blob.setMetadata(metadata);
-          AccessCondition accessCondition = new AccessCondition();
-          accessCondition.setLeaseID(lease.getLeaseID());
-          blob.uploadMetadata(accessCondition, null, opContext);
-          return true;
+        long startTime = System.nanoTime();
 
-        } catch (StorageException ex) {
-
-          lastStorageException = ex;
-          LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} "
-              + "Error Code : {}",
-              key, ex, ex.getErrorCode());
-          leaseRenewalRetryCount++;
-
-        } finally {
-
-          if (lease != null) {
-            try {
-              lease.free();
-            } catch(StorageException ex) {
-              LOG.debug("Encountered Storage exception while releasing lease for Blob {} "
-                  + "during Append  metadata operation. Storage Exception {} "
-                  + "Error Code : {} ", key, ex, ex.getErrorCode());
-            } finally {
-              lease = null;
-            }
-          }
-        }
-      }
+        blob.commitBlockList(blockEntries, accessCondition,
+            new BlobRequestOptions(), opContext);
 
-      if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) {
-        throw lastStorageException;
-      } else {
+        LOG.debug("Upload block list took {} ms for blob {} ",
+                TimeUnit.NANOSECONDS.toMillis(
+                    System.nanoTime() - startTime), key);
+        break;
+
+      } catch(Exception ioe) {
+        LOG.debug("Encountered exception during uploading block for Blob {}"
+            + " Exception : {}", key, ioe);
+        uploadRetryAttempts++;
+        lastLocalException = new AzureException(
+            "Encountered Exception while uploading block: " + ioe, ioe);
         try {
-          Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD);
-        } catch(InterruptedException ex) {
-          LOG.debug("Blob append metadata updated method interrupted");
+          Thread.sleep(
+              BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1));
+        } catch(InterruptedException ie) {
           Thread.currentThread().interrupt();
+          break;
         }
       }
     }
 
-    // The code should not enter here because the while loop will
-    // always be executed and if the while loop is executed we
-    // would returning from the while loop.
-    return false;
+    if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
+      maybeSetFirstError(lastLocalException);
+    }
   }
 
   /**
-   * This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer.
-   * @param data
-   * @param offset
-   * @param length
-   * @throws IOException
+   * A ThreadFactory that creates uploader thread with
+   * meaningful names helpful for debugging purposes.
    */
-  private synchronized void writeInternal(final byte[] data, final int offset, final int length)
-      throws IOException {
+  class UploaderThreadFactory implements ThreadFactory {
 
-    if (!initialized) {
-      throw new IOException("Trying to write to an un-initialized Append stream");
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread t = new Thread(r);
+      t.setName(String.format("%s-%d", THREAD_ID_PREFIX,
+          threadSequenceNumber.getAndIncrement()));
+      return t;
     }
+  }
 
-    if (closed) {
-      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
-    }
+  /**
+   * Upload block commands.
+   */
+  private class UploadBlockCommand extends UploadCommand {
 
-    if (leaseFreed) {
-      throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write"));
-    }
+    // the block content for upload
+    private final ByteBuffer payload;
 
-    byte[] currentData = new byte[length];
-    System.arraycopy(data, offset, currentData, 0, length);
+    // description of the block
+    private final BlockEntry entry;
 
-    // check to see if the data to be appended exceeds the
-    // buffer size. If so we upload a block to azure storage.
-    while ((outBuffer.size() + currentData.length) > bufferSize) {
+    UploadBlockCommand(String blockId, ByteBuffer payload) {
 
-      byte[] payload = new byte[bufferSize];
+      super(blobLength);
 
-      // Add data from the existing buffer
-      System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size());
+      BlockEntry blockEntry = new BlockEntry(blockId);
+      blockEntry.setSize(payload.position());
+      blockEntry.setSearchMode(BlockSearchMode.LATEST);
 
-      // Updating the available size in the payload
-      int availableSpaceInPayload = bufferSize - outBuffer.size();
+      this.payload = payload;
+      this.entry = blockEntry;
 
-      // Adding data from the current call
-      System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload);
+      uncommittedBlockEntries.add(blockEntry);
+    }
 
-      uploadBlockToStorage(payload);
+    /**
+     * Execute command.
+     */
+    void execute() throws InterruptedException {
+
+      uploadingSemaphore.acquire(1);
+      writeBlockRequestInternal(entry.getId(), payload, true);
+      uploadingSemaphore.release(1);
 
-      // updating the currentData buffer
-      byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload];
-      System.arraycopy(currentData, availableSpaceInPayload,
-          tempBuffer, 0, currentData.length - availableSpaceInPayload);
-      currentData = tempBuffer;
-      outBuffer = new ByteArrayOutputStream(bufferSize);
     }
 
-    outBuffer.write(currentData);
+    void dump() {
+      LOG.debug("upload block {} size: {} for blob {}",
+          entry.getId(),
+          entry.getSize(),
+          key);
+    }
   }
 
   /**
-   * Runnable instance that uploads the block of data to azure storage.
-   *
-   *
+   * Upload blob block list commands.
    */
-  private class WriteRequest implements Runnable {
-    private final byte[] dataPayload;
-    private final String blockId;
+  private class UploadBlockListCommand extends UploadCommand {
+
+    private BlockEntry lastBlock = null;
+
+    UploadBlockListCommand() {
+      super(blobLength);
 
-    public WriteRequest(byte[] dataPayload, String blockId) {
-      this.dataPayload = dataPayload;
-      this.blockId = blockId;
+      if (!uncommittedBlockEntries.isEmpty()) {
+        lastBlock = uncommittedBlockEntries.getLast();
+      }
     }
 
-    @Override
-    public void run() {
+    void awaitAsDependent() throws InterruptedException {
+      // empty. later commit block does not need to wait previous commit block
+      // lists.
+    }
 
-      int uploadRetryAttempts = 0;
-      IOException lastLocalException = null;
-      while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
-        try {
+    void dump() {
+      LOG.debug("commit block list with {} blocks for blob {}",
+          uncommittedBlockEntries.size(), key);
+    }
+
+    /**
+     * Execute command.
+     */
+    public void execute() throws InterruptedException, IOException {
+
+      if (committedBlobLength.get() >= getCommandBlobOffset()) {
+        LOG.debug("commit already applied for {}", key);
+        return;
+      }
+
+      if (lastBlock == null) {
+        LOG.debug("nothing to commit for {}", key);
+        return;
+      }
+
+      LOG.debug("active commands: {} for {}", activeBlockCommands.size(), key);
+
+      for (UploadCommand activeCommand : activeBlockCommands) {
+        if (activeCommand.getCommandBlobOffset() < getCommandBlobOffset()) {
+          activeCommand.dump();
+          activeCommand.awaitAsDependent();
+        } else {
+          break;
+        }
+      }
+
+      // stop all uploads until the block list is committed
+      uploadingSemaphore.acquire(MAX_NUMBER_THREADS_IN_THREAD_POOL);
+
+      BlockEntry uncommittedBlock;
+      do  {
+        uncommittedBlock = uncommittedBlockEntries.poll();
+        blockEntries.add(uncommittedBlock);
+      } while (uncommittedBlock != lastBlock);
+
+      if (blockEntries.size() > activateCompactionBlockCount) {
+        LOG.debug("Block compaction: activated with {} blocks for {}",
+            blockEntries.size(), key);
+
+        // Block compaction
+        long startCompaction = System.nanoTime();
+        blockCompaction();
+        LOG.debug("Block compaction finished for {} ms with {} blocks for {}",
+                TimeUnit.NANOSECONDS.toMillis(
+                    System.nanoTime() - startCompaction),
+                blockEntries.size(), key);
+      }
+
+      writeBlockListRequestInternal();
+
+      uploadingSemaphore.release(MAX_NUMBER_THREADS_IN_THREAD_POOL);
 
-          blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload),
-              dataPayload.length, new BlobRequestOptions(), opContext);
+      // remove blocks previous commands
+      for (Iterator<UploadCommand> it = activeBlockCommands.iterator();
+           it.hasNext();) {
+        UploadCommand activeCommand = it.next();
+        if (activeCommand.getCommandBlobOffset() <= getCommandBlobOffset()) {
+          it.remove();
+        } else {
           break;
-        } catch(Exception ioe) {
-          Log.getLog().debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe);
-          uploadRetryAttempts++;
-          lastLocalException = new IOException("Encountered Exception while uploading block", ioe);
-          try {
-            Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL);
-          } catch(InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            break;
+        }
+      }
+
+      committedBlobLength.set(getCommandBlobOffset());
+    }
+
+    /**
+     * Internal output stream with read access to the internal buffer.
+     */
+    private class ByteArrayOutputStreamInternal extends ByteArrayOutputStream {
+
+      ByteArrayOutputStreamInternal(int size) {
+        super(size);
+      }
+
+      byte[] getByteArray() {
+        return buf;
+      }
+    }
+
+    /**
+     * Block compaction process.
+     *
+     * Block compaction is only enabled when the number of blocks exceeds
+     * activateCompactionBlockCount. The algorithm searches for the longest
+     * segment [b..e) where (e-b) > 2 && |b| + |b+1| ... |e-1| < maxBlockSize
+     * such that size(b1) + size(b2) + ... + size(bn) < maximum-block-size.
+     * It then downloads the blocks in the sequence, concatenates the data to
+     * form a single block, uploads this new block, and updates the block
+     * list to replace the sequence of blocks with the new block.
+     */
+    private void blockCompaction() throws IOException {
+      //current segment [segmentBegin, segmentEnd) and file offset/size of the
+      // current segment
+      int segmentBegin = 0, segmentEnd = 0;
+      long segmentOffsetBegin = 0, segmentOffsetEnd = 0;
+
+      //longest segment [maxSegmentBegin, maxSegmentEnd) and file offset/size of
+      // the longest segment
+      int maxSegmentBegin = 0, maxSegmentEnd = 0;
+      long maxSegmentOffsetBegin = 0, maxSegmentOffsetEnd = 0;
+
+      for (BlockEntry block : blockEntries) {
+        segmentEnd++;
+        segmentOffsetEnd += block.getSize();
+        if (segmentOffsetEnd - segmentOffsetBegin > maxBlockSize.get()) {
+          if (segmentEnd - segmentBegin > 2) {
+            if (maxSegmentEnd - maxSegmentBegin < segmentEnd - segmentBegin) {
+              maxSegmentBegin = segmentBegin;
+              maxSegmentEnd = segmentEnd;
+              maxSegmentOffsetBegin = segmentOffsetBegin;
+              maxSegmentOffsetEnd = segmentOffsetEnd - block.getSize();
+            }
           }
+          segmentBegin = segmentEnd - 1;
+          segmentOffsetBegin = segmentOffsetEnd - block.getSize();
         }
       }
 
-      if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
-        lastError = lastLocalException;
+      if (maxSegmentEnd - maxSegmentBegin > 1) {
+
+        LOG.debug("Block compaction: {} blocks for {}",
+            maxSegmentEnd - maxSegmentBegin, key);
+
+        // download synchronously all the blocks from the azure storage
+        ByteArrayOutputStreamInternal blockOutputStream
+            = new ByteArrayOutputStreamInternal(maxBlockSize.get());
+
+        try {
+          long length = maxSegmentOffsetEnd - maxSegmentOffsetBegin;
+          blob.downloadRange(maxSegmentOffsetBegin, length, blockOutputStream,
+              new BlobRequestOptions(), opContext);
+        } catch(StorageException ex) {
+          LOG.error(
+              "Storage exception encountered during block compaction phase"
+                  + " : {} Storage Exception : {} Error Code: {}",
+              key, ex, ex.getErrorCode());
+          throw new AzureException(
+              "Encountered Exception while committing append blocks " + ex, ex);
+        }
+
+        // upload synchronously new block to the azure storage
+        String blockId = generateBlockId();
+
+        ByteBuffer byteBuffer = ByteBuffer.wrap(
+            blockOutputStream.getByteArray());
+        byteBuffer.position(blockOutputStream.size());
+
+        writeBlockRequestInternal(blockId, byteBuffer, false);
+
+        // replace blocks from the longest segment with new block id
+        blockEntries.subList(maxSegmentBegin + 1, maxSegmentEnd - 1).clear();
+        BlockEntry newBlock = blockEntries.get(maxSegmentBegin);
+        newBlock.setId(blockId);
+        newBlock.setSearchMode(BlockSearchMode.LATEST);
+        newBlock.setSize(maxSegmentOffsetEnd - maxSegmentOffsetBegin);
       }
     }
   }
 
   /**
-   * A ThreadFactory that creates uploader thread with
-   * meaningful names helpful for debugging purposes.
+   * Prepare block upload command and queue the command in thread pool executor.
    */
-  class UploaderThreadFactory implements ThreadFactory {
+  private synchronized void addBlockUploadCommand() throws IOException {
+
+    maybeThrowFirstError();
+
+    if (blobExist && lease.isFreed()) {
+      throw new AzureException(String.format(
+          "Attempting to upload a block on blob : %s "
+              + " that does not have lease on the Blob. Failing upload", key));
+    }
+
+    int blockSize = outBuffer.position();
+    if (blockSize > 0) {
+      UploadCommand command = new UploadBlockCommand(generateBlockId(),
+          outBuffer);
+      activeBlockCommands.add(command);
+
+      blobLength += blockSize;
+      outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get());
+
+      ioThreadPool.execute(new WriteRequest(command));
 
-    @Override
-    public Thread newThread(Runnable r) {
-      Thread t = new Thread(r);
-      t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key,
-          threadSequenceNumber.getAndIncrement()));
-      return t;
     }
   }
 
   /**
-   * A deamon thread that renews the Append lease on the blob.
-   * The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing
-   * the lease. If an error is encountered while renewing the lease
-   * then an lease is released by this thread, which fails all other
-   * operations.
+   * Prepare block list commit command and queue the command in thread pool
+   * executor.
    */
-  private class AppendRenewer implements Runnable {
+  private synchronized UploadCommand addFlushCommand() throws IOException {
 
-    @Override
-    public void run() {
+    maybeThrowFirstError();
 
-      while (!leaseFreed) {
+    if (blobExist && lease.isFreed()) {
+      throw new AzureException(
+          String.format("Attempting to upload block list on blob : %s"
+              + " that does not have lease on the Blob. Failing upload", key));
+    }
 
-        try {
-          Thread.sleep(LEASE_RENEWAL_PERIOD);
-        } catch (InterruptedException ie) {
-          LOG.debug("Appender Renewer thread interrupted");
-          Thread.currentThread().interrupt();
-        }
+    UploadCommand command = new UploadBlockListCommand();
+    activeBlockCommands.add(command);
 
-        Log.getLog().debug("Attempting to renew append lease on {}", key);
+    ioThreadPool.execute(new WriteRequest(command));
 
-        try {
-          if (!leaseFreed) {
-            // Update the blob metadata to renew the append lease
-            if (!updateBlobAppendMetadata(true, true)) {
-              LOG.error("Unable to re-acquire append lease on the Blob {} ", key);
-              leaseFreed = true;
-            }
-          }
-        } catch (StorageException ex) {
+    return command;
+  }
 
-          LOG.debug("Lease renewal for Blob : {} encountered "
-              + "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode());
+  /**
+   * Runnable instance that uploads the block of data to azure storage.
+   */
+  private class WriteRequest implements Runnable {
+    private final UploadCommand command;
 
-          // We swallow the exception here because if the blob metadata is not updated for
-          // APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and
-          // continue forward if it needs to append.
-          leaseFreed = true;
-        }
+    WriteRequest(UploadCommand command) {
+      this.command = command;
+    }
+
+    @Override
+    public void run() {
+
+      try {
+        command.dump();
+        long startTime = System.nanoTime();
+        command.execute();
+        command.setCompleted();
+        LOG.debug("command finished for {} ms",
+            TimeUnit.NANOSECONDS.toMillis(
+                    System.nanoTime() - startTime));
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      } catch (Exception ex) {
+        LOG.debug(
+                "Encountered exception during execution of command for Blob :"
+                        + " {} Exception : {}", key, ex);
+        firstError.compareAndSet(null, new AzureException(ex));
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 0bde124..280c0e0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
 import org.apache.hadoop.fs.azure.security.Constants;
@@ -352,9 +354,9 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     /**
-     * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 
+     * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote
      * method.
-     * 
+     *
      * Produce a string in double quotes with backslash sequences in all the
      * right places. A backslash will be inserted within </, allowing JSON
      * text to be delivered in HTML. In JSON text, a string cannot contain a
@@ -947,11 +949,11 @@ public class NativeAzureFileSystem extends FileSystem {
     }
   }
 
-  private class NativeAzureFsOutputStream extends OutputStream {
-    // We should not override flush() to actually close current block and flush
-    // to DFS, this will break applications that assume flush() is a no-op.
-    // Applications are advised to use Syncable.hflush() for that purpose.
-    // NativeAzureFsOutputStream needs to implement Syncable if needed.
+  /**
+   * Azure output stream; wraps an inner stream of different types.
+   */
+  public class NativeAzureFsOutputStream extends OutputStream
+      implements Syncable, StreamCapabilities {
     private String key;
     private String keyEncoded;
     private OutputStream out;
@@ -983,6 +985,48 @@ public class NativeAzureFileSystem extends FileSystem {
       setEncodedKey(anEncodedKey);
     }
 
+    /**
+     * Get a reference to the wrapped output stream.
+     *
+     * @return the underlying output stream
+     */
+    @InterfaceAudience.LimitedPrivate({"HDFS"})
+    public OutputStream getOutStream() {
+      return out;
+    }
+
+    @Override  // Syncable
+    public void hflush() throws IOException {
+      if (out instanceof Syncable) {
+        ((Syncable) out).hflush();
+      } else {
+        flush();
+      }
+    }
+
+    @Override  // Syncable
+    public void hsync() throws IOException {
+      if (out instanceof Syncable) {
+        ((Syncable) out).hsync();
+      } else {
+        flush();
+      }
+    }
+
+    /**
+     * Propagate probe of stream capabilities to nested stream
+     * (if supported), else return false.
+     * @param capability string to query the stream support for.
+     * @return true if the nested stream supports the specific capability.
+     */
+    @Override // StreamCapability
+    public boolean hasCapability(String capability) {
+      if (out instanceof StreamCapabilities) {
+        return ((StreamCapabilities) out).hasCapability(capability);
+      }
+      return false;
+    }
+
     @Override
     public synchronized void close() throws IOException {
       if (out != null) {
@@ -990,8 +1034,11 @@ public class NativeAzureFileSystem extends FileSystem {
         // before returning to the caller.
         //
         out.close();
-        restoreKey();
-        out = null;
+        try {
+          restoreKey();
+        } finally {
+          out = null;
+        }
       }
     }
 
@@ -1045,10 +1092,10 @@ public class NativeAzureFileSystem extends FileSystem {
     /**
      * Writes <code>len</code> from the specified byte array starting at offset
      * <code>off</code> to the output stream. The general contract for write(b,
-     * off, len) is that some of the bytes in the array <code>
-     * b</code b> are written to the output stream in order; element
-     * <code>b[off]</code> is the first byte written and
-     * <code>b[off+len-1]</code> is the last byte written by this operation.
+     * off, len) is that some of the bytes in the array <code>b</code>
+     * are written to the output stream in order; element <code>b[off]</code>
+     * is the first byte written and <code>b[off+len-1]</code> is the last
+     * byte written by this operation.
      * 
      * @param b
      *          Byte array to be written.
@@ -1749,7 +1796,7 @@ public class NativeAzureFileSystem extends FileSystem {
     OutputStream bufOutStream;
     if (store.isPageBlobKey(key)) {
       // Store page blobs directly in-place without renames.
-      bufOutStream = store.storefile(key, permissionStatus);
+      bufOutStream = store.storefile(key, permissionStatus, key);
     } else {
       // This is a block blob, so open the output blob stream based on the
       // encoded key.
@@ -1777,7 +1824,7 @@ public class NativeAzureFileSystem extends FileSystem {
       // these
       // blocks.
       bufOutStream = new NativeAzureFsOutputStream(store.storefile(
-          keyEncoded, permissionStatus), key, keyEncoded);
+          keyEncoded, permissionStatus, key), key, keyEncoded);
     }
     // Construct the data output stream from the buffered output stream.
     FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 1c7309f..57a729d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -50,8 +50,9 @@ interface NativeFileSystemStore {
 
   InputStream retrieve(String key, long byteRangeStart) throws IOException;
 
-  DataOutputStream storefile(String key, PermissionStatus permissionStatus)
-      throws AzureException;
+  DataOutputStream storefile(String keyEncoded,
+      PermissionStatus permissionStatus,
+      String key) throws AzureException;
 
   boolean isPageBlobKey(String key);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
index 5dbb6bc..7c2722e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
@@ -519,7 +519,7 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
 
     @Override
     public SelfRenewingLease acquireLease() throws StorageException {
-      return new SelfRenewingLease(this);
+      return new SelfRenewingLease(this, false);
     }
   }
 
@@ -557,10 +557,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
     }
 
     @Override
-    public void uploadBlock(String blockId, InputStream sourceStream,
+    public void uploadBlock(String blockId, AccessCondition accessCondition,
+        InputStream sourceStream,
         long length, BlobRequestOptions options,
         OperationContext opContext) throws IOException, StorageException {
-      ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
+      ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length,
+          accessCondition, options, opContext);
     }
 
     @Override
@@ -593,4 +595,4 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
           null, options, opContext);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
index 00d5e99..10956f7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
@@ -30,6 +30,8 @@ import com.microsoft.azure.storage.blob.CloudBlob;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.microsoft.azure.storage.StorageErrorCodeStrings.LEASE_ALREADY_PRESENT;
+
 /**
  * An Azure blob lease that automatically renews itself indefinitely
  * using a background thread. Use it to synchronize distributed processes,
@@ -66,7 +68,7 @@ public class SelfRenewingLease {
   @VisibleForTesting
   static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
 
-  public SelfRenewingLease(CloudBlobWrapper blobWrapper)
+  public SelfRenewingLease(CloudBlobWrapper blobWrapper, boolean throwIfPresent)
       throws StorageException {
 
     this.leaseFreed = false;
@@ -79,10 +81,14 @@ public class SelfRenewingLease {
         leaseID = blob.acquireLease(LEASE_TIMEOUT, null);
       } catch (StorageException e) {
 
+        if (throwIfPresent && e.getErrorCode().equals(LEASE_ALREADY_PRESENT)) {
+          throw e;
+        }
+
         // Throw again if we don't want to keep waiting.
         // We expect it to be that the lease is already present,
         // or in some cases that the blob does not exist.
-        if (!"LeaseAlreadyPresent".equals(e.getErrorCode())) {
+        if (!LEASE_ALREADY_PRESENT.equals(e.getErrorCode())) {
           LOG.info(
             "Caught exception when trying to get lease on blob "
             + blobWrapper.getUri().toString() + ". " + e.getMessage());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index 8b6b082..e03d731 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -665,6 +665,7 @@ abstract class StorageInterface {
      *
      * @param blockId      A String that represents the Base-64 encoded block ID. Note for a given blob
      *                     the length of all Block IDs must be identical.
+     * @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob.
      * @param sourceStream An {@link InputStream} object that represents the input stream to write to the
      *                     block blob.
      * @param length       A long which represents the length, in bytes, of the stream data,
@@ -678,7 +679,7 @@ abstract class StorageInterface {
      * @throws IOException  If an I/O error occurred.
      * @throws StorageException If a storage service error occurred.
      */
-    void uploadBlock(String blockId, InputStream sourceStream,
+    void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream,
         long length, BlobRequestOptions options,
         OperationContext opContext) throws IOException, StorageException;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index d3d0370..41a4dbb 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -277,7 +277,7 @@ class StorageInterfaceImpl extends StorageInterface {
 
       return new CloudBlockBlobWrapperImpl(container.getBlockBlobReference(relativePath));
     }
-    
+
     @Override
     public CloudBlobWrapper getPageBlobReference(String relativePath)
         throws URISyntaxException, StorageException {
@@ -286,7 +286,7 @@ class StorageInterfaceImpl extends StorageInterface {
     }
 
   }
-  
+
   abstract static class CloudBlobWrapperImpl implements CloudBlobWrapper {
     private final CloudBlob blob;
 
@@ -441,10 +441,10 @@ class StorageInterfaceImpl extends StorageInterface {
 
     @Override
     public SelfRenewingLease acquireLease() throws StorageException {
-      return new SelfRenewingLease(this);
+      return new SelfRenewingLease(this, false);
     }
   }
-  
+
 
   //
   // CloudBlockBlobWrapperImpl
@@ -479,10 +479,10 @@ class StorageInterfaceImpl extends StorageInterface {
     }
 
     @Override
-    public void uploadBlock(String blockId, InputStream sourceStream,
+    public void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream,
         long length, BlobRequestOptions options,
         OperationContext opContext) throws IOException, StorageException {
-      ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
+      ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, accessCondition, options, opContext);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
index a52fdb7..fc8796b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * Support the Syncable interface on top of a DataOutputStream.
@@ -38,6 +39,16 @@ public class SyncableDataOutputStream extends DataOutputStream
     super(out);
   }
 
+  /**
+   * Get a reference to the wrapped output stream.
+   *
+   * @return the underlying output stream
+   */
+  @InterfaceAudience.LimitedPrivate({"HDFS"})
+  public OutputStream getOutStream() {
+    return out;
+  }
+
   @Override
   public boolean hasCapability(String capability) {
     if (out instanceof StreamCapabilities) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 758650d..466bf0b 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -153,6 +153,40 @@ line argument:
 
 ```
 
+### Block Blob with Compaction Support and Configuration
+
+Block blobs are the default kind of blob and are good for most big-data use
+cases. However, block blobs have strict limit of 50,000 blocks per blob.
+To prevent reaching the limit WASB, by default, does not upload new block to
+the service after every `hflush()` or `hsync()`.
+
+For most of the cases, combining data from multiple `write()` calls in
+blocks of 4Mb is a good optimization. But, in others cases, like HBase log files,
+every call to `hflush()` or `hsync()` must upload the data to the service.
+
+Block blobs with compaction upload the data to the cloud service after every
+`hflush()`/`hsync()`. To mitigate the limit of 50000 blocks, `hflush()
+`/`hsync()` runs once compaction process, if number of blocks in the blob
+is above 32,000.
+
+Block compaction search and replaces a sequence of small blocks with one big
+block. That means there is associated cost with block compaction: reading
+small blocks back to the client and writing it again as one big block.
+
+In order to have the files you create be block blobs with block compaction
+enabled, the client must set the configuration variable
+`fs.azure.block.blob.with.compaction.dir` to a comma-separated list of
+folder names.
+
+For example:
+
+```xml
+<property>
+  <name>fs.azure.block.blob.with.compaction.dir</name>
+  <value>/hbase/WALs,/data/myblobfiles</value>
+</property>
+```
+
 ### Page Blob Support and Configuration
 
 The Azure Blob Storage interface for Hadoop supports two kinds of blobs,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index 4f26d9f..e0ae7b4 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -551,7 +551,8 @@ public class MockStorageInterface extends StorageInterface {
       throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
     }
     @Override
-    public void uploadBlock(String blockId, InputStream sourceStream,
+    public void uploadBlock(String blockId, AccessCondition accessCondition,
+        InputStream sourceStream,
         long length, BlobRequestOptions options,
         OperationContext opContext) throws IOException, StorageException {
       throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13eda500/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
index 7ea7534..a10a366 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
@@ -107,7 +107,8 @@ public class TestAzureConcurrentOutOfBandIo {
           //
           outputStream = writerStorageAccount.getStore().storefile(
               key,
-              new PermissionStatus("", "", FsPermission.getDefault()));
+              new PermissionStatus("", "", FsPermission.getDefault()),
+              key);
 
           Arrays.fill(dataBlockWrite, (byte) (i % 256));
           for (int j = 0; j < NUMBER_OF_BLOCKS; j++) {
@@ -141,7 +142,8 @@ public class TestAzureConcurrentOutOfBandIo {
    // reading.  This eliminates the race between the reader and writer threads.
    OutputStream outputStream = testAccount.getStore().storefile(
        "WASB_String.txt",
-       new PermissionStatus("", "", FsPermission.getDefault()));
+       new PermissionStatus("", "", FsPermission.getDefault()),
+           "WASB_String.txt");
    Arrays.fill(dataBlockWrite, (byte) 255);
    for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
      outputStream.write(dataBlockWrite);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message