hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bra...@apache.org
Subject [03/18] hadoop git commit: HDDS-675. Add blocking buffer and use watchApi for flush/close in OzoneClient. Contributed by Shashikant Banerjee.
Date Wed, 14 Nov 2018 13:07:38 GMT
HDDS-675. Add blocking buffer and use watchApi for flush/close in OzoneClient. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/671fd652
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/671fd652
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/671fd652

Branch: refs/heads/HDFS-13891
Commit: 671fd6524b2640474de2bc3b8dbaa0a3cf7fcf01
Parents: 75291e6
Author: Shashikant Banerjee <shashikant@apache.org>
Authored: Tue Nov 13 23:39:14 2018 +0530
Committer: Shashikant Banerjee <shashikant@apache.org>
Committed: Tue Nov 13 23:39:14 2018 +0530

----------------------------------------------------------------------
 .../hadoop/hdds/scm/XceiverClientGrpc.java      |  28 +-
 .../hadoop/hdds/scm/XceiverClientRatis.java     |  65 ++-
 .../hdds/scm/storage/ChunkOutputStream.java     | 448 +++++++++++++++----
 .../hdds/scm/XceiverClientAsyncReply.java       |  98 ++++
 .../hadoop/hdds/scm/XceiverClientSpi.java       |  12 +-
 .../scm/storage/ContainerProtocolCalls.java     |  57 ++-
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  24 +-
 .../common/src/main/resources/ozone-default.xml |  26 +-
 .../keyvalue/impl/BlockManagerImpl.java         |   3 +
 .../hadoop/ozone/client/OzoneClientUtils.java   |  27 --
 .../ozone/client/io/ChunkGroupOutputStream.java | 337 +++++++-------
 .../hadoop/ozone/client/rpc/RpcClient.java      |  27 +-
 .../apache/hadoop/ozone/MiniOzoneCluster.java   |  45 +-
 .../hadoop/ozone/MiniOzoneClusterImpl.java      |  19 +
 .../apache/hadoop/ozone/RatisTestHelper.java    |   2 +-
 .../rpc/TestCloseContainerHandlingByClient.java | 252 +++--------
 .../rpc/TestContainerStateMachineFailures.java  |  20 +-
 .../client/rpc/TestFailureHandlingByClient.java | 213 +++++++++
 .../ozone/container/ContainerTestHelper.java    |  34 ++
 .../container/ozoneimpl/TestOzoneContainer.java |   2 +-
 .../ozone/scm/TestXceiverClientMetrics.java     |   3 +-
 .../ozone/web/TestOzoneRestWithMiniCluster.java |   2 +-
 .../web/storage/DistributedStorageHandler.java  |  42 +-
 .../hadoop/ozone/freon/TestDataValidate.java    |   6 +
 .../ozone/freon/TestRandomKeyGenerator.java     |   6 +
 25 files changed, 1248 insertions(+), 550 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index cc34e27..9acd832 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
@@ -47,6 +48,7 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * A Client for the storageContainer protocol.
@@ -163,7 +165,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
         // In case the command gets retried on a 2nd datanode,
         // sendCommandAsyncCall will create a new channel and async stub
         // in case these don't exist for the specific datanode.
-        responseProto = sendCommandAsync(request, dn).get();
+        responseProto = sendCommandAsync(request, dn).getResponse().get();
         if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
           break;
         }
@@ -197,13 +199,23 @@ public class XceiverClientGrpc extends XceiverClientSpi {
    * @throws IOException
    */
   @Override
-  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
+  public XceiverClientAsyncReply sendCommandAsync(
       ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException {
-    return sendCommandAsync(request, pipeline.getFirstNode());
+    XceiverClientAsyncReply asyncReply =
+        sendCommandAsync(request, pipeline.getFirstNode());
+
+    // TODO : for now make this API sync in nature as async requests are
+    // served out of order over XceiverClientGrpc. This needs to be fixed
+    // if this API is to be used for I/O path. Currently, this is not
+    // used for Read/Write Operation but for tests.
+    if (!HddsUtils.isReadOnly(request)) {
+      asyncReply.getResponse().get();
+    }
+    return asyncReply;
   }
 
-  private CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
+  private XceiverClientAsyncReply sendCommandAsync(
       ContainerCommandRequestProto request, DatanodeDetails dn)
       throws IOException, ExecutionException, InterruptedException {
     if (closed) {
@@ -257,7 +269,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
             });
     requestObserver.onNext(request);
     requestObserver.onCompleted();
-    return replyFuture;
+    return new XceiverClientAsyncReply(replyFuture);
   }
 
   private void reconnect(DatanodeDetails dn)
@@ -288,6 +300,12 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     // For stand alone pipeline, there is no notion called destroy pipeline.
   }
 
+  @Override
+  public void watchForCommit(long index, long timeout)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    // there is no notion of watch for commit index in standalone pipeline
+  };
+
   /**
    * Returns pipeline Type.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index f38fd3b..e4b711a 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -50,9 +50,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -192,9 +195,22 @@ public final class XceiverClientRatis extends XceiverClientSpi {
         getClient().sendAsync(() -> byteString);
   }
 
-  public void watchForCommit(long index, long timeout) throws Exception {
-    getClient().sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED)
-        .get(timeout, TimeUnit.MILLISECONDS);
+  @Override
+  public void watchForCommit(long index, long timeout)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    // TODO: Create a new Raft client instance to watch
+    CompletableFuture<RaftClientReply> replyFuture = getClient()
+        .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
+    try {
+      replyFuture.get(timeout, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException toe) {
+      LOG.warn("3 way commit failed ", toe);
+      getClient()
+          .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
+          .get(timeout, TimeUnit.MILLISECONDS);
+      LOG.info("Could not commit " + index + " to all the nodes."
+          + "Committed by majority.");
+    }
   }
   /**
    * Sends a given command to server gets a waitable future back.
@@ -204,18 +220,37 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    * @throws IOException
    */
   @Override
-  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
+  public XceiverClientAsyncReply sendCommandAsync(
       ContainerCommandRequestProto request) {
-    return sendRequestAsync(request).whenComplete((reply, e) ->
-          LOG.debug("received reply {} for request: {} exception: {}", request,
-              reply, e))
-        .thenApply(reply -> {
-          try {
-            return ContainerCommandResponseProto.parseFrom(
-                reply.getMessage().getContent());
-          } catch (InvalidProtocolBufferException e) {
-            throw new CompletionException(e);
-          }
-        });
+    XceiverClientAsyncReply asyncReply = new XceiverClientAsyncReply(null);
+    CompletableFuture<RaftClientReply> raftClientReply =
+        sendRequestAsync(request);
+    Collection<XceiverClientAsyncReply.CommitInfo> commitInfos =
+        new ArrayList<>();
+    CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
+        raftClientReply.whenComplete((reply, e) -> LOG
+            .debug("received reply {} for request: {} exception: {}", request,
+                reply, e))
+            .thenApply(reply -> {
+              try {
+                ContainerCommandResponseProto response =
+                    ContainerCommandResponseProto
+                        .parseFrom(reply.getMessage().getContent());
+                reply.getCommitInfos().forEach(e -> {
+                  XceiverClientAsyncReply.CommitInfo commitInfo =
+                      new XceiverClientAsyncReply.CommitInfo(
+                          e.getServer().getAddress(), e.getCommitIndex());
+                  commitInfos.add(commitInfo);
+                  asyncReply.setCommitInfos(commitInfos);
+                  asyncReply.setLogIndex(reply.getLogIndex());
+                });
+                return response;
+              } catch (InvalidProtocolBufferException e) {
+                throw new CompletionException(e);
+              }
+            });
+    asyncReply.setResponse(containerCommandResponse);
+    return asyncReply;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
index 4e881c4..bdc6a83 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
@@ -17,10 +17,10 @@
  */
 
 package org.apache.hadoop.hdds.scm.storage;
-
-
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -29,16 +29,24 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.UUID;
-
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
-    .putBlock;
+    .putBlockAsync;
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
-    .writeChunk;
+    .writeChunkAsync;
 
 /**
  * An {@link OutputStream} used by the REST service in combination with the
@@ -57,6 +65,8 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
  * through to the container.
  */
 public class ChunkOutputStream extends OutputStream {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ChunkOutputStream.class);
 
   private BlockID blockID;
   private final String key;
@@ -64,67 +74,97 @@ public class ChunkOutputStream extends OutputStream {
   private final BlockData.Builder containerBlockData;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
-  private ByteBuffer buffer;
   private final String streamId;
   private int chunkIndex;
   private int chunkSize;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long watchTimeout;
+  private ByteBuffer buffer;
+  // The IOException will be set by response handling thread in case there is an
+  // exception received in the response. If the exception is set, the next
+  // request will fail upfront.
+  private IOException ioException;
+  private ExecutorService responseExecutor;
+
+  // position of the buffer where the last flush was attempted
+  private int lastFlushPos;
+
+  // position of the buffer till which the flush was successfully
+  // acknowledged by all nodes in pipeline
+  private int lastSuccessfulFlushIndex;
+
+  // list to hold up all putBlock futures
+  private List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
+      futureList;
+  // list maintaining commit indexes for putBlocks
+  private List<Long> commitIndexList;
 
   /**
    * Creates a new ChunkOutputStream.
    *
-   * @param blockID block ID
-   * @param key chunk key
+   * @param blockID              block ID
+   * @param key                  chunk key
    * @param xceiverClientManager client manager that controls client
-   * @param xceiverClient client to perform container calls
-   * @param traceID container protocol call args
-   * @param chunkSize chunk size
+   * @param xceiverClient        client to perform container calls
+   * @param traceID              container protocol call args
+   * @param chunkSize            chunk size
    */
   public ChunkOutputStream(BlockID blockID, String key,
-       XceiverClientManager xceiverClientManager,
-       XceiverClientSpi xceiverClient, String traceID, int chunkSize) {
+      XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
+      String traceID, int chunkSize, long streamBufferFlushSize,
+      long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer) {
     this.blockID = blockID;
     this.key = key;
     this.traceID = traceID;
     this.chunkSize = chunkSize;
-    KeyValue keyValue = KeyValue.newBuilder()
-        .setKey("TYPE").setValue("KEY").build();
-    this.containerBlockData = BlockData.newBuilder()
-        .setBlockID(blockID.getDatanodeBlockIDProtobuf())
-        .addMetadata(keyValue);
+    KeyValue keyValue =
+        KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
+    this.containerBlockData =
+        BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
+            .addMetadata(keyValue);
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClient = xceiverClient;
-    this.buffer = ByteBuffer.allocate(chunkSize);
     this.streamId = UUID.randomUUID().toString();
     this.chunkIndex = 0;
-  }
+    this.streamBufferFlushSize = streamBufferFlushSize;
+    this.streamBufferMaxSize = streamBufferMaxSize;
+    this.watchTimeout = watchTimeout;
+    this.buffer = buffer;
+    this.ioException = null;
 
-  public ByteBuffer getBuffer() {
-    return buffer;
+    // A single thread executor handle the responses of async requests
+    responseExecutor = Executors.newSingleThreadExecutor();
+    commitIndexList = new ArrayList<>();
+    lastSuccessfulFlushIndex = 0;
+    futureList = new ArrayList<>();
+    lastFlushPos = 0;
   }
 
   public BlockID getBlockID() {
     return blockID;
   }
 
+  public int getLastSuccessfulFlushIndex() {
+    return lastSuccessfulFlushIndex;
+  }
+
+
   @Override
   public void write(int b) throws IOException {
     checkOpen();
-    int rollbackPosition = buffer.position();
-    int rollbackLimit = buffer.limit();
-    buffer.put((byte)b);
-    if (buffer.position() == chunkSize) {
-      flushBufferToChunk(rollbackPosition, rollbackLimit);
-    }
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
   }
 
   @Override
-  public void write(byte[] b, int off, int len)
-      throws IOException {
+  public void write(byte[] b, int off, int len) throws IOException {
     if (b == null) {
       throw new NullPointerException();
     }
-    if ((off < 0) || (off > b.length) || (len < 0) ||
-        ((off + len) > b.length) || ((off + len) < 0)) {
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
       throw new IndexOutOfBoundsException();
     }
     if (len == 0) {
@@ -132,93 +172,300 @@ public class ChunkOutputStream extends OutputStream {
     }
     checkOpen();
     while (len > 0) {
-      int writeLen = Math.min(chunkSize - buffer.position(), len);
-      int rollbackPosition = buffer.position();
-      int rollbackLimit = buffer.limit();
+      int writeLen;
+      writeLen = Math.min(chunkSize - buffer.position() % chunkSize, len);
       buffer.put(b, off, writeLen);
-      if (buffer.position() == chunkSize) {
-        flushBufferToChunk(rollbackPosition, rollbackLimit);
+      if (buffer.position() % chunkSize == 0) {
+        int pos = buffer.position() - chunkSize;
+        int limit = buffer.position();
+        writeChunk(pos, limit);
       }
       off += writeLen;
       len -= writeLen;
+      if (buffer.position() >= streamBufferFlushSize
+          && buffer.position() % streamBufferFlushSize == 0) {
+
+        lastFlushPos = buffer.position();
+        futureList.add(handlePartialFlush());
+      }
+      if (buffer.position() >= streamBufferMaxSize
+          && buffer.position() % streamBufferMaxSize == 0) {
+        handleFullBuffer();
+      }
+    }
+  }
+
+  /**
+   * Will be called on the retryPath in case closedContainerException/
+   * TimeoutException.
+   * @param len length of data to write
+   * @throws IOException if error occured
+   */
+
+  // In this case, the data is already cached in the buffer.
+  public void writeOnRetry(int len) throws IOException {
+    if (len == 0) {
+      return;
+    }
+    int off = 0;
+    checkOpen();
+    while (len > 0) {
+      int writeLen;
+      writeLen = Math.min(chunkSize, len);
+      if (writeLen == chunkSize) {
+        int pos = off;
+        int limit = pos + chunkSize;
+        writeChunk(pos, limit);
+      }
+      off += writeLen;
+      len -= writeLen;
+      if (off % streamBufferFlushSize == 0) {
+        lastFlushPos = off;
+        futureList.add(handlePartialFlush());
+      }
+      if (off % streamBufferMaxSize == 0) {
+        handleFullBuffer();
+      }
+    }
+  }
+
+  private void handleResponse(
+      ContainerProtos.ContainerCommandResponseProto response,
+      XceiverClientAsyncReply asyncReply) {
+    validateResponse(response);
+    discardBuffer(asyncReply);
+  }
+
+  private void discardBuffer(XceiverClientAsyncReply asyncReply) {
+    if (!commitIndexList.isEmpty()) {
+      long index = commitIndexList.get(0);
+      if (checkIfBufferDiscardRequired(asyncReply, index)) {
+        updateFlushIndex();
+      }
+    }
+  }
+
+  /**
+   * just update the lastSuccessfulFlushIndex. Since we have allocated
+   * the buffer more than the streamBufferMaxSize, we can keep on writing
+   * to the buffer. In case of failure, we will read the data starting from
+   * lastSuccessfulFlushIndex.
+   */
+  private void updateFlushIndex() {
+    lastSuccessfulFlushIndex += streamBufferFlushSize;
+    LOG.debug("Discarding buffer till pos " + lastSuccessfulFlushIndex);
+    if (!commitIndexList.isEmpty()) {
+      commitIndexList.remove(0);
+      futureList.remove(0);
+    }
+
+  }
+  /**
+   * Check if the last commitIndex stored at the beginning of the
+   * commitIndexList is less than equal to current commitInfo indexes.
+   * If its true, the buffer has been successfully flushed till the
+   * last position where flush happened.
+   */
+  private boolean checkIfBufferDiscardRequired(
+      XceiverClientAsyncReply asyncReply, long commitIndex) {
+    if (asyncReply.getCommitInfos() != null) {
+      for (XceiverClientAsyncReply.CommitInfo info : asyncReply
+          .getCommitInfos()) {
+        if (info.getCommitIndex() < commitIndex) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * This is a blocking call.It will wait for the flush till the commit index
+   * at the head of the commitIndexList gets replicated to all or majority.
+   * @throws IOException
+   */
+  private void handleFullBuffer() throws IOException {
+    if (!commitIndexList.isEmpty()) {
+      watchForCommit(commitIndexList.get(0));
+    }
+  }
+
+  /**
+   * calls watchForCommit API of the Ratis Client. For Standalone client,
+   * it is a no op.
+   * @param commitIndex log index to watch for
+   * @throws IOException IOException in case watch gets timed out
+   */
+  private void watchForCommit(long commitIndex) throws IOException {
+    checkOpen();
+    Preconditions.checkState(!commitIndexList.isEmpty());
+    try {
+      xceiverClient.watchForCommit(commitIndex, watchTimeout);
+    } catch (TimeoutException | InterruptedException | ExecutionException e) {
+      LOG.warn("watchForCommit failed for index " + commitIndex, e);
+      throw new IOException(
+          "Unexpected Storage Container Exception: " + e.toString(), e);
+    }
+  }
+
+  private CompletableFuture<ContainerProtos.
+      ContainerCommandResponseProto> handlePartialFlush()
+      throws IOException {
+    String requestId =
+        traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID;
+    try {
+      XceiverClientAsyncReply asyncReply =
+          putBlockAsync(xceiverClient, containerBlockData.build(), requestId);
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+          asyncReply.getResponse();
+
+      return future.thenApplyAsync(e -> {
+        handleResponse(e, asyncReply);
+        // if the ioException is not set, putBlock is successful
+        if (ioException == null) {
+          LOG.debug(
+              "Adding index " + asyncReply.getLogIndex() + " commitList size "
+                  + commitIndexList.size());
+          BlockID responseBlockID = BlockID.getFromProtobuf(
+              e.getPutBlock().getCommittedBlockLength().getBlockID());
+          Preconditions.checkState(blockID.getContainerBlockID()
+              .equals(responseBlockID.getContainerBlockID()));
+          // updates the bcsId of the block
+          blockID = responseBlockID;
+          long index = asyncReply.getLogIndex();
+          // for standalone protocol, logIndex will always be 0.
+          if (index != 0) {
+            commitIndexList.add(index);
+          } else {
+            updateFlushIndex();
+          }
+        }
+        return e;
+      }, responseExecutor);
+    } catch (IOException | InterruptedException | ExecutionException e) {
+      throw new IOException(
+          "Unexpected Storage Container Exception: " + e.toString(), e);
     }
   }
 
   @Override
   public void flush() throws IOException {
-    checkOpen();
-    if (buffer.position() > 0) {
-      int rollbackPosition = buffer.position();
-      int rollbackLimit = buffer.limit();
-      flushBufferToChunk(rollbackPosition, rollbackLimit);
+    if (xceiverClientManager != null && xceiverClient != null
+        && buffer != null) {
+      checkOpen();
+      if (buffer.position() > 0 && lastSuccessfulFlushIndex != buffer
+          .position()) {
+        try {
+
+          // flush the last chunk data residing on the buffer
+          if (buffer.position() % chunkSize > 0) {
+            int pos = buffer.position() - (buffer.position() % chunkSize);
+            writeChunk(pos, buffer.position());
+          }
+          if (lastFlushPos != buffer.position()) {
+            lastFlushPos = buffer.position();
+            handlePartialFlush();
+          }
+          CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+              futureList.toArray(new CompletableFuture[futureList.size()]));
+          combinedFuture.get();
+          // just check again if the exception is hit while waiting for the
+          // futures to ensure flush has indeed succeeded
+          checkOpen();
+        } catch (InterruptedException | ExecutionException e) {
+          throw new IOException(
+              "Unexpected Storage Container Exception: " + e.toString(), e);
+        }
+      }
     }
   }
 
+  private void writeChunk(int pos, int limit) throws IOException {
+    // Please note : We are not flipping the slice when we write since
+    // the slices are pointing the buffer start and end as needed for
+    // the chunk write. Also please note, Duplicate does not create a
+    // copy of data, it only creates metadata that points to the data
+    // stream.
+    ByteBuffer chunk = buffer.duplicate();
+    chunk.position(pos);
+    chunk.limit(limit);
+    writeChunkToContainer(chunk);
+  }
+
   @Override
   public void close() throws IOException {
     if (xceiverClientManager != null && xceiverClient != null
         && buffer != null) {
-      if (buffer.position() > 0) {
-        writeChunkToContainer();
-      }
       try {
-        ContainerProtos.PutBlockResponseProto responseProto =
-            putBlock(xceiverClient, containerBlockData.build(), traceID);
-        BlockID responseBlockID = BlockID.getFromProtobuf(
-            responseProto.getCommittedBlockLength().getBlockID());
-        Preconditions.checkState(blockID.getContainerBlockID()
-            .equals(responseBlockID.getContainerBlockID()));
-        // updates the bcsId of the block
-        blockID = responseBlockID;
-      } catch (IOException e) {
+        if (buffer.position() > lastFlushPos) {
+          int pos = buffer.position() - (buffer.position() % chunkSize);
+          writeChunk(pos, buffer.position());
+          futureList.add(handlePartialFlush());
+        }
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+            futureList.toArray(new CompletableFuture[futureList.size()]));
+
+        // wait for all the transactions to complete
+        combinedFuture.get();
+
+        // irrespective of whether the commitIndexList is empty or not,
+        // ensure there is no exception set(For Standalone Protocol)
+        checkOpen();
+        if (!commitIndexList.isEmpty()) {
+          // wait for the last commit index in the commitIndexList to get
+          // committed to all or majority of nodes in case timeout happens.
+          long lastIndex = commitIndexList.get(commitIndexList.size() - 1);
+          LOG.debug(
+              "waiting for last flush Index " + lastIndex + " to catch up");
+          watchForCommit(lastIndex);
+          updateFlushIndex();
+        }
+      } catch (InterruptedException | ExecutionException e) {
         throw new IOException(
             "Unexpected Storage Container Exception: " + e.toString(), e);
       } finally {
         cleanup();
       }
     }
+    // clear the buffer
+    buffer.clear();
+  }
+
+  private void validateResponse(
+      ContainerProtos.ContainerCommandResponseProto responseProto) {
+    try {
+      ContainerProtocolCalls.validateContainerResponse(responseProto);
+    } catch (StorageContainerException sce) {
+      ioException = new IOException(
+          "Unexpected Storage Container Exception: " + sce.toString(), sce);
+    }
   }
 
   public void cleanup() {
-    xceiverClientManager.releaseClient(xceiverClient);
+    if (xceiverClientManager != null) {
+      xceiverClientManager.releaseClient(xceiverClient);
+    }
     xceiverClientManager = null;
     xceiverClient = null;
-    buffer = null;
+    if (futureList != null) {
+      futureList.clear();
+    }
+    futureList = null;
+    commitIndexList = null;
+    responseExecutor.shutdown();
   }
 
   /**
-   * Checks if the stream is open.  If not, throws an exception.
+   * Checks if the stream is open or exception has occured.
+   * If not, throws an exception.
    *
    * @throws IOException if stream is closed
    */
   private void checkOpen() throws IOException {
     if (xceiverClient == null) {
       throw new IOException("ChunkOutputStream has been closed.");
-    }
-  }
-
-  /**
-   * Attempts to flush buffered writes by writing a new chunk to the container.
-   * If successful, then clears the buffer to prepare to receive writes for a
-   * new chunk.
-   *
-   * @param rollbackPosition position to restore in buffer if write fails
-   * @param rollbackLimit limit to restore in buffer if write fails
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  private void flushBufferToChunk(int rollbackPosition,
-      int rollbackLimit) throws IOException {
-    boolean success = false;
-    try {
-      writeChunkToContainer();
-      success = true;
-    } finally {
-      if (success) {
-        buffer.clear();
-      } else {
-        buffer.position(rollbackPosition);
-        buffer.limit(rollbackLimit);
-      }
+    } else if (ioException != null) {
+      throw ioException;
     }
   }
 
@@ -228,23 +475,32 @@ public class ChunkOutputStream extends OutputStream {
    *
    * @throws IOException if there is an I/O error while performing the call
    */
-  private void writeChunkToContainer() throws IOException {
-    buffer.flip();
-    ByteString data = ByteString.copyFrom(buffer);
-    ChunkInfo chunk = ChunkInfo
-        .newBuilder()
-        .setChunkName(
-            DigestUtils.md5Hex(key) + "_stream_"
-                + streamId + "_chunk_" + ++chunkIndex)
-        .setOffset(0)
-        .setLen(data.size())
-        .build();
+  private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
+    int effectiveChunkSize = chunk.remaining();
+    ByteString data = ByteString.copyFrom(chunk);
+    ChunkInfo chunkInfo = ChunkInfo.newBuilder().setChunkName(
+        DigestUtils.md5Hex(key) + "_stream_" + streamId + "_chunk_"
+            + ++chunkIndex).setOffset(0).setLen(effectiveChunkSize).build();
+    // generate a unique requestId
+    String requestId =
+        traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo
+            .getChunkName();
     try {
-      writeChunk(xceiverClient, chunk, blockID, data, traceID);
-    } catch (IOException e) {
+      XceiverClientAsyncReply asyncReply =
+          writeChunkAsync(xceiverClient, chunkInfo, blockID, data, requestId);
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+          asyncReply.getResponse();
+      future.thenApplyAsync(e -> {
+        handleResponse(e, asyncReply);
+        return e;
+      }, responseExecutor);
+    } catch (IOException | InterruptedException | ExecutionException e) {
       throw new IOException(
           "Unexpected Storage Container Exception: " + e.toString(), e);
     }
-    containerBlockData.addChunks(chunk);
+    LOG.debug(
+        "writing chunk " + chunkInfo.getChunkName() + " blockID " + blockID
+            + " length " + chunk.remaining());
+    containerBlockData.addChunks(chunkInfo);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java
new file mode 100644
index 0000000..0d7e1bc
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class represents the Async reply from XceiverClient.
+ */
+public class XceiverClientAsyncReply {
+
+  private CompletableFuture<ContainerCommandResponseProto> response;
+  private Long logIndex;
+  private Collection<CommitInfo> commitInfos;
+
+  public XceiverClientAsyncReply(
+      CompletableFuture<ContainerCommandResponseProto> response) {
+    this(response, 0, null);
+  }
+
+  public XceiverClientAsyncReply(
+      CompletableFuture<ContainerCommandResponseProto> response, long index,
+      Collection<CommitInfo> commitInfos) {
+    this.commitInfos = commitInfos;
+    this.logIndex = index;
+    this.response = response;
+  }
+
+  /**
+   * A class having details about latest commitIndex for each server in the
+   * Ratis pipeline. For Standalone pipeline, commitInfo will be null.
+   */
+  public static class CommitInfo {
+
+    private final String server;
+
+    private final Long commitIndex;
+
+    public CommitInfo(String server, long commitIndex) {
+      this.server = server;
+      this.commitIndex = commitIndex;
+    }
+
+    public String getServer() {
+      return server;
+    }
+
+    public long getCommitIndex() {
+      return commitIndex;
+    }
+  }
+
+  public Collection<CommitInfo> getCommitInfos() {
+    return commitInfos;
+  }
+
+  public CompletableFuture<ContainerCommandResponseProto> getResponse() {
+    return response;
+  }
+
+  public long getLogIndex() {
+    return logIndex;
+  }
+
+  public void setCommitInfos(Collection<CommitInfo> commitInfos) {
+    this.commitInfos = commitInfos;
+  }
+
+  public void setLogIndex(Long logIndex) {
+    this.logIndex = logIndex;
+  }
+
+  public void setResponse(
+      CompletableFuture<ContainerCommandResponseProto> response) {
+    this.response = response;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index b36315e..9eb49ae 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -98,7 +98,10 @@ public abstract class XceiverClientSpi implements Closeable {
   public ContainerCommandResponseProto sendCommand(
       ContainerCommandRequestProto request) throws IOException {
     try {
-      return sendCommandAsync(request).get();
+      XceiverClientAsyncReply reply;
+      reply = sendCommandAsync(request);
+      ContainerCommandResponseProto responseProto = reply.getResponse().get();
+      return responseProto;
     } catch (ExecutionException | InterruptedException e) {
       throw new IOException("Failed to command " + request, e);
     }
@@ -111,7 +114,7 @@ public abstract class XceiverClientSpi implements Closeable {
    * @return Response to the command
    * @throws IOException
    */
-  public abstract CompletableFuture<ContainerCommandResponseProto>
+  public abstract XceiverClientAsyncReply
       sendCommandAsync(ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException;
 
@@ -132,4 +135,7 @@ public abstract class XceiverClientSpi implements Closeable {
    * @return - {Stand_Alone, Ratis or Chained}
    */
   public abstract HddsProtos.ReplicationType getPipelineType();
+
+  public abstract void watchForCommit(long index, long timeout)
+      throws InterruptedException, ExecutionException, TimeoutException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index c1d90a5..04f4cbc 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .BlockNotCommittedException;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
 import org.apache.hadoop.hdds.client.BlockID;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Implementation of all container protocol calls performed by Container
@@ -163,6 +165,31 @@ public final class ContainerProtocolCalls  {
   }
 
   /**
+   * Calls the container protocol to put a container block.
+   *
+   * @param xceiverClient client to perform call
+   * @param containerBlockData block data to identify container
+   * @param traceID container protocol call args
+   * @return putBlockResponse
+   * @throws Exception if there is an error while performing the call
+   */
+  public static XceiverClientAsyncReply putBlockAsync(
+      XceiverClientSpi xceiverClient, BlockData containerBlockData,
+      String traceID)
+      throws IOException, InterruptedException, ExecutionException {
+    PutBlockRequestProto.Builder createBlockRequest =
+        PutBlockRequestProto.newBuilder().setBlockData(containerBlockData);
+    String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+    ContainerCommandRequestProto request =
+        ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
+            .setContainerID(containerBlockData.getBlockID().getContainerID())
+            .setTraceID(traceID).setDatanodeUuid(id)
+            .setPutBlock(createBlockRequest).build();
+    xceiverClient.sendCommand(request);
+    return xceiverClient.sendCommandAsync(request);
+  }
+
+  /**
    * Calls the container protocol to read a chunk.
    *
    * @param xceiverClient client to perform call
@@ -200,7 +227,7 @@ public final class ContainerProtocolCalls  {
    * @param blockID ID of the block
    * @param data the data of the chunk to write
    * @param traceID container protocol call args
-   * @throws IOException if there is an I/O error while performing the call
+   * @throws Exception if there is an error while performing the call
    */
   public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
       BlockID blockID, ByteString data, String traceID)
@@ -224,6 +251,32 @@ public final class ContainerProtocolCalls  {
   }
 
   /**
+   * Calls the container protocol to write a chunk.
+   *
+   * @param xceiverClient client to perform call
+   * @param chunk information about chunk to write
+   * @param blockID ID of the block
+   * @param data the data of the chunk to write
+   * @param traceID container protocol call args
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  public static XceiverClientAsyncReply writeChunkAsync(
+      XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
+      ByteString data, String traceID)
+      throws IOException, ExecutionException, InterruptedException {
+    WriteChunkRequestProto.Builder writeChunkRequest =
+        WriteChunkRequestProto.newBuilder()
+            .setBlockID(blockID.getDatanodeBlockIDProtobuf())
+            .setChunkData(chunk).setData(data);
+    String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+    ContainerCommandRequestProto request =
+        ContainerCommandRequestProto.newBuilder().setCmdType(Type.WriteChunk)
+            .setContainerID(blockID.getContainerID()).setTraceID(traceID)
+            .setDatanodeUuid(id).setWriteChunk(writeChunkRequest).build();
+    return xceiverClient.sendCommandAsync(request);
+  }
+
+  /**
    * Allows writing a small file using single RPC. This takes the container
    * name, block name and data to write sends all that data to the container
    * using a single RPC. This API is designed to be used for files which are
@@ -420,7 +473,7 @@ public final class ContainerProtocolCalls  {
    * @param response container protocol call response
    * @throws IOException if the container protocol call failed
    */
-  private static void validateContainerResponse(
+  public static void validateContainerResponse(
       ContainerCommandResponseProto response
   ) throws StorageContainerException {
     if (response.getResult() == ContainerProtos.Result.SUCCESS) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 9776817..8a5762f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -112,6 +112,22 @@ public final class OzoneConfigKeys {
   public static final String OZONE_CLIENT_PROTOCOL =
       "ozone.client.protocol";
 
+  public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE =
+      "ozone.client.stream.buffer.flush.size";
+
+  public static final long OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT = 64;
+
+  public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE =
+      "ozone.client.stream.buffer.max.size";
+
+  public static final long OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT = 128;
+
+  public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT =
+      "ozone.client.watch.request.timeout";
+
+  public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT =
+      "30s";
+
   // This defines the overall connection limit for the connection pool used in
   // RestClient.
   public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
@@ -192,14 +208,6 @@ public final class OzoneConfigKeys {
   public static final int
       OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10;
 
-  public static final String OZONE_CLIENT_MAX_RETRIES =
-      "ozone.client.max.retries";
-  public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 50;
-
-  public static final String OZONE_CLIENT_RETRY_INTERVAL =
-      "ozone.client.retry.interval";
-  public static final String OZONE_CLIENT_RETRY_INTERVAL_DEFAULT = "200ms";
-
   public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
       = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
   public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2ffc2ab..54bffd5 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -335,19 +335,29 @@
     </description>
   </property>
   <property>
-    <name>ozone.client.max.retries</name>
-    <value>50</value>
+    <name>ozone.client.stream.buffer.flush.size</name>
+    <value>64</value>
     <tag>OZONE, CLIENT</tag>
-    <description>Maximum number of retries by Ozone Client on encountering
-      exception while fetching committed block length.
+    <description>Size in mb which determines at what buffer position , a partial
+      flush will be initiated during write. It should be ideally a mutiple
+      of chunkSize.
     </description>
   </property>
   <property>
-    <name>ozone.client.retry.interval</name>
-    <value>200ms</value>
+    <name>ozone.client.stream.buffer.max.size</name>
+    <value>128</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>Size in mb which determines at what buffer position ,
+      write call be blocked till acknowledgement of the fisrt partial flush
+      happens by all servers.
+    </description>
+  </property>
+  <property>
+    <name>ozone.client.watch.request.timeout</name>
+    <value>30s</value>
     <tag>OZONE, CLIENT</tag>
-    <description>Interval between retries by Ozone Client on encountering
-      exception while fetching committed block length.
+    <description>Timeout for the watch API in Ratis client to acknowledge
+      a particular request getting replayed to all servers.
     </description>
   </property>
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index e2e5700..ea0e819 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -121,6 +121,9 @@ public class BlockManagerImpl implements BlockManager {
     container.updateBlockCommitSequenceId(bcsId);
     // Increment keycount here
     container.getContainerData().incrKeyCount();
+    LOG.debug(
+        "Block " + data.getBlockID() + " successfully committed with bcsId "
+            + bcsId + " chunk size " + data.getChunks().size());
     return data.getSize();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
index 40e4d83..be1449f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
@@ -17,23 +17,14 @@
  */
 package org.apache.hadoop.ozone.client;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.rest.response.*;
 
 import java.util.ArrayList;
 import java.util.List;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 /** A utility class for OzoneClient. */
 public final class OzoneClientUtils {
 
@@ -94,24 +85,6 @@ public final class OzoneClientUtils {
     return keyInfo;
   }
 
-  public static RetryPolicy createRetryPolicy(Configuration conf) {
-    int maxRetryCount =
-        conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
-            OZONE_CLIENT_MAX_RETRIES_DEFAULT);
-    long retryInterval = conf.getTimeDuration(OzoneConfigKeys.
-        OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys.
-        OZONE_CLIENT_RETRY_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
-    RetryPolicy basePolicy = RetryPolicies
-        .retryUpToMaximumCountWithFixedSleep(maxRetryCount, retryInterval,
-            TimeUnit.MILLISECONDS);
-    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
-        new HashMap<Class<? extends Exception>, RetryPolicy>();
-    exceptionToPolicyMap.put(BlockNotCommittedException.class, basePolicy);
-    RetryPolicy retryPolicy = RetryPolicies
-        .retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
-            exceptionToPolicyMap);
-    return retryPolicy;
-  }
   /**
    * Returns a KeyInfoDetails object constructed using fields of the input
    * OzoneKeyDetails object.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 450e2dc..5dbe9f6 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -24,11 +24,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -41,18 +40,17 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
 import org.apache.hadoop.hdds.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.storage.ChunkOutputStream;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.ListIterator;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Maintaining a list of ChunkInputStream. Write based on offset.
@@ -71,7 +69,6 @@ public class ChunkGroupOutputStream extends OutputStream {
   // array list's get(index) is O(1)
   private final ArrayList<ChunkOutputStreamEntry> streamEntries;
   private int currentStreamIndex;
-  private long byteOffset;
   private final OzoneManagerProtocolClientSideTranslatorPB omClient;
   private final
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
@@ -81,7 +78,11 @@ public class ChunkGroupOutputStream extends OutputStream {
   private final int chunkSize;
   private final String requestID;
   private boolean closed;
-  private final RetryPolicy retryPolicy;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long watchTimeout;
+  private final long blockSize;
+  private ByteBuffer buffer;
   /**
    * A constructor for testing purpose only.
    */
@@ -96,7 +97,11 @@ public class ChunkGroupOutputStream extends OutputStream {
     chunkSize = 0;
     requestID = null;
     closed = false;
-    retryPolicy = null;
+    streamBufferFlushSize = 0;
+    streamBufferMaxSize = 0;
+    buffer = ByteBuffer.allocate(1);
+    watchTimeout = 0;
+    blockSize = 0;
   }
 
   /**
@@ -127,35 +132,54 @@ public class ChunkGroupOutputStream extends OutputStream {
           new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
               .setLength(streamEntry.currentPosition).setOffset(0)
               .build();
+      LOG.debug("block written " + streamEntry.blockID + ", length "
+          + streamEntry.currentPosition + " bcsID " + streamEntry.blockID
+          .getBlockCommitSequenceId());
       locationInfoList.add(info);
     }
     return locationInfoList;
   }
 
-  public ChunkGroupOutputStream(
-      OpenKeySession handler, XceiverClientManager xceiverClientManager,
+  public ChunkGroupOutputStream(OpenKeySession handler,
+      XceiverClientManager xceiverClientManager,
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
-      OzoneManagerProtocolClientSideTranslatorPB omClient,
-      int chunkSize, String requestId, ReplicationFactor factor,
-      ReplicationType type, RetryPolicy retryPolicy) throws IOException {
+      OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
+      String requestId, ReplicationFactor factor, ReplicationType type,
+      long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout) {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
-    this.byteOffset = 0;
     this.omClient = omClient;
     this.scmClient = scmClient;
     OmKeyInfo info = handler.getKeyInfo();
-    this.keyArgs = new OmKeyArgs.Builder()
-        .setVolumeName(info.getVolumeName())
-        .setBucketName(info.getBucketName())
-        .setKeyName(info.getKeyName())
-        .setType(type)
-        .setFactor(factor)
-        .setDataSize(info.getDataSize()).build();
+    this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
+        .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
+        .setType(type).setFactor(factor).setDataSize(info.getDataSize())
+        .build();
     this.openID = handler.getId();
     this.xceiverClientManager = xceiverClientManager;
     this.chunkSize = chunkSize;
     this.requestID = requestId;
-    this.retryPolicy = retryPolicy;
+    this.streamBufferFlushSize = bufferFlushSize * OzoneConsts.MB;
+    this.streamBufferMaxSize = bufferMaxSize * OzoneConsts.MB;
+    this.blockSize = size * OzoneConsts.MB;
+    this.watchTimeout = watchTimeout;
+
+    Preconditions.checkState(chunkSize > 0);
+    Preconditions.checkState(streamBufferFlushSize > 0);
+    Preconditions.checkState(streamBufferMaxSize > 0);
+    Preconditions.checkState(blockSize > 0);
+    Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
+    Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
+    Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
+
+    // This byteBuffer will be used to cache data until all the blockCommits
+    // (putBlock) gets replicated to all/majority servers. The idea here is to
+    // allocate the buffer of size blockSize so that as and when a chunk is
+    // is replicated to all servers, as a part of discarding the buffer, we
+    // don't necessarily need to run compaction(buffer.compact() on the buffer
+    // to actually discard the acknowledged data. Compaction is inefficient so
+    // it would be a better choice to avoid compaction on the happy I/O path.
+    this.buffer = ByteBuffer.allocate((int) blockSize);
   }
 
   /**
@@ -191,12 +215,13 @@ public class ChunkGroupOutputStream extends OutputStream {
         xceiverClientManager.acquireClient(containerWithPipeline.getPipeline());
     streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
         keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
-        chunkSize, subKeyInfo.getLength()));
+        chunkSize, subKeyInfo.getLength(), streamBufferFlushSize,
+        streamBufferMaxSize, watchTimeout, buffer));
   }
 
   @VisibleForTesting
   public long getByteOffset() {
-    return byteOffset;
+    return getKeyLength();
   }
 
 
@@ -223,21 +248,23 @@ public class ChunkGroupOutputStream extends OutputStream {
   public void write(byte[] b, int off, int len)
       throws IOException {
     checkNotClosed();
-    handleWrite(b, off, len);
+    handleWrite(b, off, len, false, buffer.position());
   }
 
-  private void handleWrite(byte[] b, int off, int len) throws IOException {
+  private void handleWrite(byte[] b, int off, int len, boolean retry,
+      int pos) throws IOException {
     if (b == null) {
       throw new NullPointerException();
     }
-    if ((off < 0) || (off > b.length) || (len < 0) ||
-        ((off + len) > b.length) || ((off + len) < 0)) {
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
       throw new IndexOutOfBoundsException();
     }
     if (len == 0) {
       return;
     }
     int succeededAllocates = 0;
+    int initialPos;
     while (len > 0) {
       if (streamEntries.size() <= currentStreamIndex) {
         Preconditions.checkNotNull(omClient);
@@ -247,8 +274,8 @@ public class ChunkGroupOutputStream extends OutputStream {
           allocateNewBlock(currentStreamIndex);
           succeededAllocates += 1;
         } catch (IOException ioe) {
-          LOG.error("Try to allocate more blocks for write failed, already " +
-              "allocated " + succeededAllocates + " blocks for this write.");
+          LOG.error("Try to allocate more blocks for write failed, already "
+              + "allocated " + succeededAllocates + " blocks for this write.");
           throw ioe;
         }
       }
@@ -257,12 +284,19 @@ public class ChunkGroupOutputStream extends OutputStream {
       Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
       ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
       int writeLen = Math.min(len, (int) current.getRemaining());
+      initialPos = pos < buffer.position() ? pos : buffer.position();
       try {
-        current.write(b, off, writeLen);
+        if (retry) {
+          current.writeOnRetry(len);
+        } else {
+          current.write(b, off, writeLen);
+        }
       } catch (IOException ioe) {
-        if (checkIfContainerIsClosed(ioe)) {
-          handleCloseContainerException(current, currentStreamIndex);
-          continue;
+        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
+          // for the current iteration, current pos - initialPos gives the
+          // amount of data already written to the buffer
+          writeLen = buffer.position() - initialPos;
+          handleException(current, currentStreamIndex);
         } else {
           throw ioe;
         }
@@ -274,57 +308,6 @@ public class ChunkGroupOutputStream extends OutputStream {
       }
       len -= writeLen;
       off += writeLen;
-      byteOffset += writeLen;
-    }
-  }
-
-  private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry)
-      throws IOException {
-    long blockLength;
-    ContainerProtos.GetCommittedBlockLengthResponseProto responseProto;
-    RetryPolicy.RetryAction action;
-    int numRetries = 0;
-    while (true) {
-      try {
-        responseProto = ContainerProtocolCalls
-            .getCommittedBlockLength(streamEntry.xceiverClient,
-                streamEntry.blockID, requestID);
-        blockLength = responseProto.getBlockLength();
-        return blockLength;
-      } catch (StorageContainerException sce) {
-        try {
-          action = retryPolicy.shouldRetry(sce, numRetries, 0, true);
-        } catch (Exception e) {
-          throw e instanceof IOException ? (IOException) e : new IOException(e);
-        }
-        if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
-          if (action.reason != null) {
-            LOG.error(
-                "GetCommittedBlockLength request failed. " + action.reason,
-                sce);
-          }
-          throw sce;
-        }
-
-        // Throw the exception if the thread is interrupted
-        if (Thread.currentThread().isInterrupted()) {
-          LOG.warn("Interrupted while trying for connection");
-          throw sce;
-        }
-        Preconditions.checkArgument(
-            action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
-        try {
-          Thread.sleep(action.delayMillis);
-        } catch (InterruptedException e) {
-          throw (IOException) new InterruptedIOException(
-              "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
-              .initCause(e);
-        }
-        numRetries++;
-        LOG.trace("Retrying GetCommittedBlockLength request. Already tried "
-            + numRetries + " time(s); retry policy is " + retryPolicy);
-        continue;
-      }
     }
   }
 
@@ -373,55 +356,35 @@ public class ChunkGroupOutputStream extends OutputStream {
    *
    * @param streamEntry StreamEntry
    * @param streamIndex Index of the entry
-   * @throws IOException Throws IOexception if Write fails
+   * @throws IOException Throws IOException if Write fails
    */
-  private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
+  private void handleException(ChunkOutputStreamEntry streamEntry,
       int streamIndex) throws IOException {
-    long committedLength = 0;
-    ByteBuffer buffer = streamEntry.getBuffer();
-    if (buffer == null) {
-      // the buffer here will be null only when closeContainerException is
-      // hit while calling putKey during close on chunkOutputStream.
-      // Since closeContainer auto commit pending keys, no need to do
-      // anything here.
-      return;
-    }
-
-    // update currentStreamIndex in case of closed container exception. The
-    // current stream entry cannot be used for further writes because
-    // container is closed.
-    currentStreamIndex += 1;
+    int lastSuccessfulFlushIndex = streamEntry.getLastSuccessfulFlushIndex();
+    int currentPos = buffer.position();
 
-    // In case where not a single chunk of data has been written to the Datanode
-    // yet. This block does not yet exist on the datanode but cached on the
-    // outputStream buffer. No need to call GetCommittedBlockLength here
-    // for this block associated with the stream here.
-    if (streamEntry.currentPosition >= chunkSize
-        || streamEntry.currentPosition != buffer.position()) {
-      committedLength = getCommittedBlockLength(streamEntry);
-      // update the length of the current stream
-      streamEntry.currentPosition = committedLength;
+    // In case of a failure, read the data from the position till the last
+    // acknowledgement happened.
+    if (lastSuccessfulFlushIndex > 0) {
+      buffer.position(lastSuccessfulFlushIndex);
+      buffer.limit(currentPos);
+      buffer.compact();
     }
 
     if (buffer.position() > 0) {
+      //set the correct length for the current stream
+      streamEntry.currentPosition = lastSuccessfulFlushIndex;
       // If the data is still cached in the underlying stream, we need to
-      // allocate new block and write this data in the datanode. The cached
-      // data in the buffer does not exceed chunkSize.
-      Preconditions.checkState(buffer.position() < chunkSize);
-      // readjust the byteOffset value to the length actually been written.
-      byteOffset -= buffer.position();
-      handleWrite(buffer.array(), 0, buffer.position());
+      // allocate new block and write this data in the datanode.
+      currentStreamIndex += 1;
+      handleWrite(buffer.array(), 0, buffer.position(), true,
+          lastSuccessfulFlushIndex);
     }
 
-    // just clean up the current stream. Since the container is already closed,
-    // it will be auto committed. No need to call close again here.
+    // just clean up the current stream.
     streamEntry.cleanup();
-    // This case will arise when while writing the first chunk itself fails.
-    // In such case, the current block associated with the stream has no data
-    // written. Remove it from the current stream list.
-    if (committedLength == 0) {
+    if (lastSuccessfulFlushIndex == 0) {
       streamEntries.remove(streamIndex);
-      Preconditions.checkArgument(currentStreamIndex != 0);
       currentStreamIndex -= 1;
     }
     // discard subsequent pre allocated blocks from the streamEntries list
@@ -430,11 +393,15 @@ public class ChunkGroupOutputStream extends OutputStream {
   }
 
   private boolean checkIfContainerIsClosed(IOException ioe) {
-    return checkIfContainerNotOpenException(ioe) || Optional.of(ioe.getCause())
-        .filter(e -> e instanceof StorageContainerException)
-        .map(e -> (StorageContainerException) e)
-        .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
-        .isPresent();
+    if (ioe.getCause() != null) {
+      return checkIfContainerNotOpenException(ioe) || Optional
+          .of(ioe.getCause())
+          .filter(e -> e instanceof StorageContainerException)
+          .map(e -> (StorageContainerException) e)
+          .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
+          .isPresent();
+    }
+    return false;
   }
 
   private boolean checkIfContainerNotOpenException(IOException ioe) {
@@ -448,6 +415,15 @@ public class ChunkGroupOutputStream extends OutputStream {
     return false;
   }
 
+  private boolean checkIfTimeoutException(IOException ioe) {
+    if (ioe.getCause() != null) {
+      return Optional.of(ioe.getCause())
+          .filter(e -> e instanceof TimeoutException).isPresent();
+    } else {
+      return false;
+    }
+  }
+
   private long getKeyLength() {
     return streamEntries.parallelStream().mapToLong(e -> e.currentPosition)
         .sum();
@@ -495,11 +471,11 @@ public class ChunkGroupOutputStream extends OutputStream {
           entry.flush();
         }
       } catch (IOException ioe) {
-        if (checkIfContainerIsClosed(ioe)) {
+        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
           // This call will allocate a new streamEntry and write the Data.
           // Close needs to be retried on the newly allocated streamEntry as
           // as well.
-          handleCloseContainerException(entry, streamIndex);
+          handleException(entry, streamIndex);
           handleFlushOrClose(close);
         } else {
           throw ioe;
@@ -519,16 +495,24 @@ public class ChunkGroupOutputStream extends OutputStream {
       return;
     }
     closed = true;
-    handleFlushOrClose(true);
-    if (keyArgs != null) {
-      // in test, this could be null
-      removeEmptyBlocks();
-      Preconditions.checkState(byteOffset == getKeyLength());
-      keyArgs.setDataSize(byteOffset);
-      keyArgs.setLocationInfoList(getLocationInfoList());
-      omClient.commitKey(keyArgs, openID);
-    } else {
-      LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
+    try {
+      handleFlushOrClose(true);
+      if (keyArgs != null) {
+        // in test, this could be null
+        removeEmptyBlocks();
+        keyArgs.setDataSize(getKeyLength());
+        keyArgs.setLocationInfoList(getLocationInfoList());
+        omClient.commitKey(keyArgs, openID);
+      } else {
+        LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
+      }
+    } catch (IOException ioe) {
+      throw ioe;
+    } finally {
+      if (buffer != null) {
+        buffer.clear();
+      }
+      buffer = null;
     }
   }
 
@@ -544,7 +528,10 @@ public class ChunkGroupOutputStream extends OutputStream {
     private String requestID;
     private ReplicationType type;
     private ReplicationFactor factor;
-    private RetryPolicy retryPolicy;
+    private long streamBufferFlushSize;
+    private long streamBufferMaxSize;
+    private long blockSize;
+    private long watchTimeout;
 
     public Builder setHandler(OpenKeySession handler) {
       this.openHandler = handler;
@@ -588,16 +575,31 @@ public class ChunkGroupOutputStream extends OutputStream {
       return this;
     }
 
-    public ChunkGroupOutputStream build() throws IOException {
-      return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
-          omClient, chunkSize, requestID, factor, type, retryPolicy);
+    public Builder setStreamBufferFlushSize(long size) {
+      this.streamBufferFlushSize = size;
+      return this;
+    }
+
+    public Builder setStreamBufferMaxSize(long size) {
+      this.streamBufferMaxSize = size;
+      return this;
+    }
+
+    public Builder setBlockSize(long size) {
+      this.blockSize = size;
+      return this;
     }
 
-    public Builder setRetryPolicy(RetryPolicy rPolicy) {
-      this.retryPolicy = rPolicy;
+    public Builder setWatchTimeout(long timeout) {
+      this.watchTimeout = timeout;
       return this;
     }
 
+    public ChunkGroupOutputStream build() throws IOException {
+      return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
+          omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
+          streamBufferMaxSize, blockSize, watchTimeout);
+    }
   }
 
   private static class ChunkOutputStreamEntry extends OutputStream {
@@ -613,10 +615,16 @@ public class ChunkGroupOutputStream extends OutputStream {
     // the current position of this stream 0 <= currentPosition < length
     private long currentPosition;
 
+    private final long streamBufferFlushSize;
+    private final long streamBufferMaxSize;
+    private final long watchTimeout;
+    private ByteBuffer buffer;
+
     ChunkOutputStreamEntry(BlockID blockID, String key,
         XceiverClientManager xceiverClientManager,
         XceiverClientSpi xceiverClient, String requestId, int chunkSize,
-        long length) {
+        long length, long streamBufferFlushSize, long streamBufferMaxSize,
+        long watchTimeout, ByteBuffer buffer) {
       this.outputStream = null;
       this.blockID = blockID;
       this.key = key;
@@ -627,6 +635,10 @@ public class ChunkGroupOutputStream extends OutputStream {
 
       this.length = length;
       this.currentPosition = 0;
+      this.streamBufferFlushSize = streamBufferFlushSize;
+      this.streamBufferMaxSize = streamBufferMaxSize;
+      this.watchTimeout = watchTimeout;
+      this.buffer = buffer;
     }
 
     /**
@@ -645,6 +657,10 @@ public class ChunkGroupOutputStream extends OutputStream {
 
       this.length = length;
       this.currentPosition = 0;
+      streamBufferFlushSize = 0;
+      streamBufferMaxSize = 0;
+      buffer = null;
+      watchTimeout = 0;
     }
 
     long getLength() {
@@ -657,9 +673,10 @@ public class ChunkGroupOutputStream extends OutputStream {
 
     private void checkStream() {
       if (this.outputStream == null) {
-        this.outputStream = new ChunkOutputStream(blockID,
-            key, xceiverClientManager, xceiverClient,
-            requestId, chunkSize);
+        this.outputStream =
+            new ChunkOutputStream(blockID, key, xceiverClientManager,
+                xceiverClient, requestId, chunkSize, streamBufferFlushSize,
+                streamBufferMaxSize, watchTimeout, buffer);
       }
     }
 
@@ -696,15 +713,21 @@ public class ChunkGroupOutputStream extends OutputStream {
       }
     }
 
-    ByteBuffer getBuffer() throws IOException {
+    int getLastSuccessfulFlushIndex() throws IOException {
       if (this.outputStream instanceof ChunkOutputStream) {
         ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
-        return out.getBuffer();
+        blockID = out.getBlockID();
+        return out.getLastSuccessfulFlushIndex();
+      } else if (outputStream == null) {
+        // For a pre allocated block for which no write has been initiated,
+        // the OutputStream will be null here.
+        // In such cases, the default blockCommitSequenceId will be 0
+        return 0;
       }
       throw new IOException("Invalid Output Stream for Key: " + key);
     }
 
-    public void cleanup() {
+    void cleanup() {
       checkStream();
       if (this.outputStream instanceof ChunkOutputStream) {
         ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
@@ -712,6 +735,16 @@ public class ChunkGroupOutputStream extends OutputStream {
       }
     }
 
+    void writeOnRetry(int len) throws IOException {
+      checkStream();
+      if (this.outputStream instanceof ChunkOutputStream) {
+        ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
+        out.writeOnRetry(len);
+        this.currentPosition += len;
+      } else {
+        throw new IOException("Invalid Output Stream for Key: " + key);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index cbb2e49..826f04b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -24,18 +24,17 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.VolumeArgs;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
 import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
@@ -72,6 +71,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -94,7 +94,10 @@ public class RpcClient implements ClientProtocol {
   private final UserGroupInformation ugi;
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
-  private final RetryPolicy retryPolicy;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long blockSize;
+  private final long watchTimeout;
 
    /**
     * Creates RpcClient instance with the given configuration.
@@ -135,7 +138,6 @@ public class RpcClient implements ClientProtocol {
                 Client.getRpcTimeout(conf)));
 
     this.xceiverClientManager = new XceiverClientManager(conf);
-    retryPolicy = OzoneClientUtils.createRetryPolicy(conf);
 
     int configuredChunkSize = conf.getInt(
         ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
@@ -149,6 +151,18 @@ public class RpcClient implements ClientProtocol {
     } else {
       chunkSize = configuredChunkSize;
     }
+    streamBufferFlushSize =
+        conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
+            OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT);
+    streamBufferMaxSize =
+        conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
+            OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT);
+    blockSize = conf.getLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB,
+        OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
+    watchTimeout =
+        conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+            OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
   }
 
   private InetSocketAddress getScmAddressForClient() throws IOException {
@@ -468,7 +482,10 @@ public class RpcClient implements ClientProtocol {
             .setRequestID(requestId)
             .setType(HddsProtos.ReplicationType.valueOf(type.toString()))
             .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
-            .setRetryPolicy(retryPolicy)
+            .setStreamBufferFlushSize(streamBufferFlushSize)
+            .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setWatchTimeout(watchTimeout)
+            .setBlockSize(blockSize)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 15bf8d0..b352e36 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -230,7 +230,10 @@ public interface MiniOzoneCluster {
 
     protected Boolean ozoneEnabled = true;
     protected Boolean randomContainerPort = true;
-
+    protected Optional<Integer> chunkSize = Optional.empty();
+    protected Optional<Long> streamBufferFlushSize = Optional.empty();
+    protected Optional<Long> streamBufferMaxSize = Optional.empty();
+    protected Optional<Long> blockSize = Optional.empty();
     // Use relative smaller number of handlers for testing
     protected int numOfOmHandlers = 20;
     protected int numOfScmHandlers = 20;
@@ -359,6 +362,46 @@ public interface MiniOzoneCluster {
     }
 
     /**
+     * Sets the chunk size.
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setChunkSize(int size) {
+      chunkSize = Optional.of(size);
+      return this;
+    }
+
+    /**
+     * Sets the flush size for stream buffer.
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setStreamBufferFlushSize(long size) {
+      streamBufferFlushSize = Optional.of(size);
+      return this;
+    }
+
+    /**
+     * Sets the max size for stream buffer.
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setStreamBufferMaxSize(long size) {
+      streamBufferMaxSize = Optional.of(size);
+      return this;
+    }
+
+    /**
+     * Sets the block size for stream buffer.
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setBlockSize(long size) {
+      blockSize = Optional.of(size);
+      return this;
+    }
+
+    /**
      * Constructs and returns MiniOzoneCluster.
      *
      * @return {@link MiniOzoneCluster}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 37b6fdc..324e17b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -391,6 +391,25 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
       Path metaDir = Paths.get(path, "ozone-meta");
       Files.createDirectories(metaDir);
       conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
+      if (!chunkSize.isPresent()) {
+        chunkSize = Optional.of(1);
+      }
+      if (!streamBufferFlushSize.isPresent()) {
+        streamBufferFlushSize = Optional.of((long)chunkSize.get());
+      }
+      if (!streamBufferMaxSize.isPresent()) {
+        streamBufferMaxSize = Optional.of(2 * streamBufferFlushSize.get());
+      }
+      if (!blockSize.isPresent()) {
+        blockSize = Optional.of(2 * streamBufferMaxSize.get());
+      }
+      conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
+          (int) (chunkSize.get() * OzoneConsts.MB));
+      conf.setLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
+          streamBufferFlushSize.get());
+      conf.setLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
+          streamBufferMaxSize.get());
+      conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, blockSize.get());
       configureTrace();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index 871f389..0197304 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -50,7 +50,7 @@ public interface RatisTestHelper {
 
   /** For testing Ozone with Ratis. */
   class RatisTestSuite implements Closeable {
-    static final RpcType RPC = SupportedRpcType.NETTY;
+    static final RpcType RPC = SupportedRpcType.GRPC;
     static final int NUM_DATANODES = 3;
 
     private final OzoneConfiguration conf;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message