hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bra...@apache.org
Subject [02/18] hadoop git commit: HDDS-675. Add blocking buffer and use watchApi for flush/close in OzoneClient. Contributed by Shashikant Banerjee.
Date Wed, 14 Nov 2018 13:07:37 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index 43517ae..935423d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
-import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -27,11 +26,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.ozone.HddsDatanodeService;
-import org.apache.hadoop.hdds.scm.container.common.helpers.
-    StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -55,15 +49,17 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.slf4j.event.Level;
 
 import java.io.IOException;
-import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 /**
  * Tests Close Container Exception handling by Ozone Client.
@@ -79,7 +75,6 @@ public class TestCloseContainerHandlingByClient {
   private static String volumeName;
   private static String bucketName;
   private static String keyString;
-  private static int maxRetries;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -91,15 +86,14 @@ public class TestCloseContainerHandlingByClient {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
-    maxRetries = 100;
-    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, maxRetries);
-    conf.set(OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL, "200ms");
     chunkSize = (int) OzoneConsts.MB;
     blockSize = 4 * chunkSize;
-    conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize);
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setQuietMode(false);
     conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4));
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(3).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7).build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
@@ -121,44 +115,29 @@ public class TestCloseContainerHandlingByClient {
     }
   }
 
-  private static String fixedLengthString(String string, int length) {
-    return String.format("%1$" + length + "s", string);
-  }
-
   @Test
   public void testBlockWritesWithFlushAndClose() throws Exception {
     String keyName = "standalone";
-    OzoneOutputStream key =
-        createKey(keyName, ReplicationType.STAND_ALONE, 0);
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     // write data more than 1 chunk
-    byte[] data =
-        fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    byte[] data = ContainerTestHelper
+        .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
     key.write(data);
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
     key.write(data);
     key.flush();
     key.close();
     // read the key from OM again and match the length.The length will still
     // be the equal to the original data size.
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
-    List<OmKeyLocationInfo> keyLocationInfos =
-        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
-    //we have written two blocks
-    Assert.assertEquals(2, keyLocationInfos.size());
-    OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
-    Assert.assertEquals(data.length - (data.length % chunkSize),
-        omKeyLocationInfo.getLength());
-    Assert.assertEquals(data.length + (data.length % chunkSize),
-        keyLocationInfos.get(1).getLength());
     Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
 
     // Written the same data twice
@@ -170,37 +149,24 @@ public class TestCloseContainerHandlingByClient {
   @Test
   public void testBlockWritesCloseConsistency() throws Exception {
     String keyName = "standalone2";
-    OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0);
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     // write data more than 1 chunk
-    byte[] data =
-        fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    byte[] data = ContainerTestHelper
+        .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
     key.write(data);
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
     key.close();
     // read the key from OM again and match the length.The length will still
     // be the equal to the original data size.
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
-    List<OmKeyLocationInfo> keyLocationInfos =
-        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
-    // Though we have written only block initially, the close will hit
-    // closeContainerException and remaining data in the chunkOutputStream
-    // buffer will be copied into a different allocated block and will be
-    // committed.
-    Assert.assertEquals(2, keyLocationInfos.size());
-    OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
-    Assert.assertEquals(data.length - (data.length % chunkSize),
-        omKeyLocationInfo.getLength());
-    Assert.assertEquals(data.length % chunkSize,
-        keyLocationInfos.get(1).getLength());
     Assert.assertEquals(data.length, keyInfo.getDataSize());
     validateData(keyName, data);
   }
@@ -210,29 +176,30 @@ public class TestCloseContainerHandlingByClient {
 
     String keyName = "standalone3";
     OzoneOutputStream key =
-        createKey(keyName, ReplicationType.STAND_ALONE, (4 * blockSize));
+        createKey(keyName, ReplicationType.RATIS, (4 * blockSize));
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) key.getOutputStream();
     // With the initial size provided, it should have preallocated 4 blocks
     Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
-    // write data for 3 blocks and 1 more chunk
-    byte[] data = fixedLengthString(keyString, (3 * blockSize)).getBytes();
+    // write data more than 1 chunk
+    byte[] data =
+        ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize))
+            .getBytes();
     Assert.assertEquals(data.length, 3 * blockSize);
     key.write(data);
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key,
-        HddsProtos.ReplicationType.STAND_ALONE);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
     // write 1 more block worth of data. It will fail and new block will be
     // allocated
-    key.write(fixedLengthString(keyString, blockSize).getBytes());
+    key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize)
+        .getBytes());
 
     key.close();
     // read the key from OM again and match the length.The length will still
@@ -253,10 +220,10 @@ public class TestCloseContainerHandlingByClient {
 
   @Test
   public void testMultiBlockWrites2() throws Exception {
-    String keyName = "standalone4";
+    String keyName = "ratis2";
     long dataLength;
     OzoneOutputStream key =
-        createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize);
+        createKey(keyName, ReplicationType.RATIS, 4 * blockSize);
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) key.getOutputStream();
 
@@ -264,21 +231,21 @@ public class TestCloseContainerHandlingByClient {
     // With the initial size provided, it should have pre allocated 4 blocks
     Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
     String dataString =
-        fixedLengthString(keyString, (3 * blockSize + chunkSize));
+        ContainerTestHelper.getFixedLengthString(keyString, (2 * blockSize));
     byte[] data = dataString.getBytes();
     key.write(data);
     // 3 block are completely written to the DataNode in 3 blocks.
     // Data of length half of chunkSize resides in the chunkOutput stream buffer
-    String dataString2 = fixedLengthString(keyString, chunkSize * 1 / 2);
+    String dataString2 =
+        ContainerTestHelper.getFixedLengthString(keyString, chunkSize * 1 / 2);
     key.write(dataString2.getBytes());
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
-        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
 
     key.close();
     // read the key from OM again and match the length.The length will still
@@ -290,9 +257,8 @@ public class TestCloseContainerHandlingByClient {
     // closeContainerException and remaining data in the chunkOutputStream
     // buffer will be copied into a different allocated block and will be
     // committed.
-    Assert.assertEquals(5, keyLocationInfos.size());
-    dataLength = 3 * blockSize + (long) (1.5 * chunkSize);
-    Assert.assertEquals(dataLength, keyInfo.getDataSize());
+    Assert.assertEquals(dataString.concat(dataString2).getBytes().length,
+        keyInfo.getDataSize());
     validateData(keyName, dataString.concat(dataString2).getBytes());
   }
 
@@ -301,14 +267,14 @@ public class TestCloseContainerHandlingByClient {
 
     String keyName = "standalone5";
     int keyLen = 4 * blockSize;
-    OzoneOutputStream key =
-        createKey(keyName, ReplicationType.RATIS, keyLen);
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, keyLen);
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) key.getOutputStream();
     // With the initial size provided, it should have preallocated 4 blocks
     Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
     // write data 3 blocks and one more chunk
-    byte[] writtenData = fixedLengthString(keyString, keyLen).getBytes();
+    byte[] writtenData =
+        ContainerTestHelper.getFixedLengthString(keyString, keyLen).getBytes();
     byte[] data = Arrays.copyOfRange(writtenData, 0, 3 * blockSize + chunkSize);
     Assert.assertEquals(data.length, 3 * blockSize + chunkSize);
     key.write(data);
@@ -316,17 +282,14 @@ public class TestCloseContainerHandlingByClient {
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setType(HddsProtos.ReplicationType.RATIS)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key,
-        HddsProtos.ReplicationType.RATIS);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
     // write 3 more chunks worth of data. It will fail and new block will be
     // allocated. This write completes 4 blocks worth of data written to key
-    data = Arrays
-        .copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
+    data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
     key.write(data);
 
     key.close();
@@ -345,8 +308,6 @@ public class TestCloseContainerHandlingByClient {
     // closeContainerException and remaining data in the chunkOutputStream
     // buffer will be copied into a different allocated block and will be
     // committed.
-    Assert.assertEquals(5, keyLocationInfos.size());
-    Assert.assertEquals(4 * blockSize, keyInfo.getDataSize());
     long length = 0;
     for (OmKeyLocationInfo locationInfo : keyLocationInfos) {
       length += locationInfo.getLength();
@@ -378,9 +339,9 @@ public class TestCloseContainerHandlingByClient {
       cluster.getStorageContainerManager().getEventQueue()
           .fireEvent(SCMEvents.CLOSE_CONTAINER,
               ContainerID.valueof(containerID));
-      ContainerInfo container = cluster.getStorageContainerManager()
-          .getContainerManager()
-          .getContainer(ContainerID.valueof(containerID));
+      ContainerInfo container =
+          cluster.getStorageContainerManager().getContainerManager()
+              .getContainer(ContainerID.valueof(containerID));
       Pipeline pipeline =
           cluster.getStorageContainerManager().getPipelineManager()
               .getPipeline(container.getPipelineID());
@@ -406,8 +367,8 @@ public class TestCloseContainerHandlingByClient {
           .isContainerPresent(cluster, containerID, dn))) {
         for (DatanodeDetails datanodeDetails : datanodes) {
           GenericTestUtils.waitFor(() -> ContainerTestHelper
-                  .isContainerClosed(cluster, containerID, datanodeDetails),
-              500, 15 * 1000);
+                  .isContainerClosed(cluster, containerID, datanodeDetails), 500,
+              15 * 1000);
           //double check if it's really closed
           // (waitFor also throws an exception)
           Assert.assertTrue(ContainerTestHelper
@@ -425,29 +386,31 @@ public class TestCloseContainerHandlingByClient {
   public void testDiscardPreallocatedBlocks() throws Exception {
     String keyName = "discardpreallocatedblocks";
     OzoneOutputStream key =
-        createKey(keyName, ReplicationType.STAND_ALONE, 2 * blockSize);
+        createKey(keyName, ReplicationType.RATIS, 2 * blockSize);
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) key.getOutputStream();
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     // With the initial size provided, it should have pre allocated 4 blocks
     Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
-    String dataString = fixedLengthString(keyString, (1 * blockSize));
+    String dataString =
+        ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
     byte[] data = dataString.getBytes();
     key.write(data);
     List<OmKeyLocationInfo> locationInfos =
         new ArrayList<>(groupOutputStream.getLocationInfoList());
     long containerID = locationInfos.get(0).getContainerID();
-    ContainerInfo container = cluster.getStorageContainerManager()
-        .getContainerManager()
-        .getContainer(ContainerID.valueof(containerID));
+    ContainerInfo container =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(ContainerID.valueof(containerID));
     Pipeline pipeline =
         cluster.getStorageContainerManager().getPipelineManager()
             .getPipeline(container.getPipelineID());
     List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(1, datanodes.size());
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
-    dataString = fixedLengthString(keyString, (1 * blockSize));
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+    dataString =
+        ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
     data = dataString.getBytes();
     key.write(data);
     Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
@@ -466,40 +429,28 @@ public class TestCloseContainerHandlingByClient {
 
   private OzoneOutputStream createKey(String keyName, ReplicationType type,
       long size) throws Exception {
-    ReplicationFactor factor =
-        type == ReplicationType.STAND_ALONE ? ReplicationFactor.ONE :
-            ReplicationFactor.THREE;
-    return objectStore.getVolume(volumeName).getBucket(bucketName)
-        .createKey(keyName, size, type, factor);
+    return ContainerTestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
   }
 
   private void validateData(String keyName, byte[] data) throws Exception {
-    byte[] readData = new byte[data.length];
-    OzoneInputStream is =
-        objectStore.getVolume(volumeName).getBucket(bucketName)
-            .readKey(keyName);
-    is.read(readData);
-    MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha1.update(data);
-    MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha2.update(readData);
-    Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
-    is.close();
+    ContainerTestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
   }
 
   @Test
   public void testBlockWriteViaRatis() throws Exception {
     String keyName = "ratis";
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
-    byte[] data =
-        fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    byte[] data = ContainerTestHelper
+        .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
     key.write(data);
 
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
         setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(HddsProtos.ReplicationFactor.THREE)
-        .setKeyName(keyName).build();
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
@@ -510,79 +461,10 @@ public class TestCloseContainerHandlingByClient {
     // The write will fail but exception will be handled and length will be
     // updated correctly in OzoneManager once the steam is closed
     key.close();
-    // read the key from OM again and match the length.The length will still
-    // be the equal to the original data size.
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
-    List<OmKeyLocationInfo> keyLocationInfos =
-        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
-    //we have written two blocks
-    Assert.assertEquals(2, keyLocationInfos.size());
-    OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
-    Assert.assertEquals(data.length - (data.length % chunkSize),
-        omKeyLocationInfo.getLength());
-    Assert.assertEquals(data.length + (data.length % chunkSize),
-        keyLocationInfos.get(1).getLength());
-    Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
     String dataString = new String(data);
     dataString.concat(dataString);
+    Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
     validateData(keyName, dataString.getBytes());
   }
-
-  @Test
-  public void testRetriesOnBlockNotCommittedException() throws Exception {
-    String keyName = "blockcommitexceptiontest";
-    OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0);
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) key.getOutputStream();
-    GenericTestUtils.setLogLevel(ChunkGroupOutputStream.LOG, Level.TRACE);
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(ChunkGroupOutputStream.LOG);
-
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
-    String dataString = fixedLengthString(keyString, (3 * chunkSize));
-    key.write(dataString.getBytes());
-    List<OmKeyLocationInfo> locationInfos =
-        groupOutputStream.getLocationInfoList();
-    long containerID = locationInfos.get(0).getContainerID();
-    ContainerInfo container = cluster.getStorageContainerManager()
-        .getContainerManager()
-        .getContainer(ContainerID.valueof(containerID));
-    Pipeline pipeline =
-        cluster.getStorageContainerManager().getPipelineManager()
-            .getPipeline(container.getPipelineID());
-    List<DatanodeDetails> datanodes = pipeline.getNodes();
-    Assert.assertEquals(1, datanodes.size());
-    // move the container on the datanode to Closing state, this will ensure
-    // closing the key will hit BLOCK_NOT_COMMITTED_EXCEPTION while trying
-    // to fetch the committed length
-    for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) {
-      if (datanodes.get(0).equals(datanodeService.getDatanodeDetails())) {
-        datanodeService.getDatanodeStateMachine().getContainer()
-            .getContainerSet().getContainer(containerID).getContainerData()
-            .setState(ContainerProtos.ContainerDataProto.State.CLOSED);
-      }
-    }
-    dataString = fixedLengthString(keyString, (chunkSize * 1 / 2));
-    key.write(dataString.getBytes());
-    try {
-      key.close();
-      Assert.fail("Expected Exception not thrown");
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe instanceof StorageContainerException);
-      Assert.assertTrue(((StorageContainerException) ioe).getResult()
-          == ContainerProtos.Result.BLOCK_NOT_COMMITTED);
-    }
-    // It should retry only for max retries times
-    for (int i = 1; i <= maxRetries; i++) {
-      Assert.assertTrue(logCapturer.getOutput()
-          .contains("Retrying GetCommittedBlockLength request"));
-      Assert.assertTrue(logCapturer.getOutput().contains("Already tried " + i));
-    }
-    Assert.assertTrue(logCapturer.getOutput()
-        .contains("GetCommittedBlockLength request failed."));
-    Assert.assertTrue(logCapturer.getOutput().contains(
-        "retries get failed due to exceeded maximum allowed retries number"
-            + ": " + maxRetries));
-    logCapturer.stopCapturing();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 73bff6f..c1827c9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.
-    common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
@@ -141,15 +139,8 @@ public class TestContainerStateMachineFailures {
             .getContainer().getContainerSet()
             .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
             .getContainerPath()));
-    try {
-      // flush will throw an exception for the second write as the container
-      // dir has been deleted.
-      key.flush();
-      Assert.fail("Expected exception not thrown");
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe.getCause() instanceof StorageContainerException);
-    }
 
+    key.close();
     // Make sure the container is marked unhealthy
     Assert.assertTrue(
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
@@ -157,14 +148,5 @@ public class TestContainerStateMachineFailures {
             .getContainer(omKeyLocationInfo.getContainerID())
             .getContainerState()
             == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
-    try {
-      // subsequent requests will fail with unhealthy container exception
-      key.close();
-      Assert.fail("Expected exception not thrown");
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe instanceof StorageContainerException);
-      Assert.assertTrue(((StorageContainerException) ioe).getResult()
-          == ContainerProtos.Result.BLOCK_NOT_COMMITTED);
-    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
new file mode 100644
index 0000000..dc6747f
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests Close Container Exception handling by Ozone Client.
+ */
+public class TestFailureHandlingByClient {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static int chunkSize;
+  private static int blockSize;
+  private static String volumeName;
+  private static String bucketName;
+  private static String keyString;
+  private static int maxRetries;
+
+  /**
+   * TODO: we will spawn new MiniOzoneCluster every time for each unit test
+   * invocation. Need to use the same instance for all tests.
+   */
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    maxRetries = 100;
+    chunkSize = (int) OzoneConsts.MB;
+    blockSize = 4 * chunkSize;
+    conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize);
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, 1);
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, 2);
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5,
+        TimeUnit.SECONDS);
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setQuietMode(false);
+    conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4));
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(6).build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "datanodefailurehandlingtest";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  // TODO: currently, shutting down 2 datanodes in Ratis leads to
+  // watchForCommit Api in RaftClient to hand=g forever. Once that gets
+  // fixed, we need to execute the tets with 2 node failures.
+
+  @Test
+  public void testBlockWritesWithDnFailures() throws Exception {
+    String keyName = "ratis3";
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    byte[] data =
+        ContainerTestHelper
+        .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    key.write(data);
+
+    // get the name of a valid container
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertTrue(locationInfoList.size() == 1);
+    long containerId = locationInfoList.get(0).getContainerID();
+    ContainerInfo container = cluster.getStorageContainerManager()
+        .getContainerManager()
+        .getContainer(ContainerID.valueof(containerId));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
+    cluster.shutdownHddsDatanode(datanodes.get(0));
+    // cluster.shutdownHddsDatanode(datanodes.get(1));
+    // The write will fail but exception will be handled and length will be
+    // updated correctly in OzoneManager once the steam is closed
+    key.close();
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    Assert.assertEquals(data.length, keyInfo.getDataSize());
+    validateData(keyName, data);
+    cluster.restartHddsDatanode(datanodes.get(0), true);
+  }
+
+  @Test
+  public void testMultiBlockWritesWithDnFailures() throws Exception {
+    String keyName = "ratis3";
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    String data =
+        ContainerTestHelper
+        .getFixedLengthString(keyString, blockSize + chunkSize);
+    key.write(data.getBytes());
+
+    // get the name of a valid container
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertTrue(locationInfoList.size() == 2);
+    long containerId = locationInfoList.get(1).getContainerID();
+    ContainerInfo container = cluster.getStorageContainerManager()
+        .getContainerManager()
+        .getContainer(ContainerID.valueof(containerId));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
+    cluster.shutdownHddsDatanode(datanodes.get(0));
+
+    //  cluster.shutdownHddsDatanode(datanodes.get(1));
+    // The write will fail but exception will be handled and length will be
+    // updated correctly in OzoneManager once the steam is closed
+    key.write(data.getBytes());
+    key.close();
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize());
+    validateData(keyName, data.concat(data).getBytes());
+    cluster.restartHddsDatanode(datanodes.get(0), true);
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    return ContainerTestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+  }
+
+  private void validateData(String keyName, byte[] data) throws Exception {
+    ContainerTestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 7d002c3..7e9bab5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -21,10 +21,14 @@ package org.apache.hadoop.ozone.container;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -632,4 +636,34 @@ public final class ContainerTestHelper {
     return false;
   }
 
+  public static OzoneOutputStream createKey(String keyName,
+      ReplicationType type, long size, ObjectStore objectStore,
+      String volumeName, String bucketName) throws Exception {
+    org.apache.hadoop.hdds.client.ReplicationFactor factor =
+        type == ReplicationType.STAND_ALONE ?
+            org.apache.hadoop.hdds.client.ReplicationFactor.ONE :
+            org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
+    return objectStore.getVolume(volumeName).getBucket(bucketName)
+        .createKey(keyName, size, type, factor);
+  }
+
+  public static void validateData(String keyName, byte[] data,
+      ObjectStore objectStore, String volumeName, String bucketName)
+      throws Exception {
+    byte[] readData = new byte[data.length];
+    OzoneInputStream is =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .readKey(keyName);
+    is.read(readData);
+    MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    sha1.update(data);
+    MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    sha2.update(readData);
+    Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
+    is.close();
+  }
+
+  public static String getFixedLengthString(String string, int length) {
+    return String.format("%1$" + length + "s", string);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index f7ba979..3a15b21 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -468,7 +468,7 @@ public class TestOzoneContainer {
             client.getPipeline(), blockID, 1024);
 
         CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
-            response = client.sendCommandAsync(smallFileRequest);
+            response = client.sendCommandAsync(smallFileRequest).getResponse();
         computeResults.add(response);
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
index aa1df4c..ce91dbd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
@@ -119,7 +119,8 @@ public class TestXceiverClientMetrics {
             smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest(
                 client.getPipeline(), blockID, 1024);
             CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
-                response = client.sendCommandAsync(smallFileRequest);
+                response =
+                client.sendCommandAsync(smallFileRequest).getResponse();
             computeResults.add(response);
           }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java
index 0f49ade..9680243 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java
@@ -62,7 +62,7 @@ public class TestOzoneRestWithMiniCluster {
   private static OzoneConfiguration conf;
   private static ClientProtocol client;
   private static ReplicationFactor replicationFactor = ReplicationFactor.ONE;
-  private static ReplicationType replicationType = ReplicationType.STAND_ALONE;
+  private static ReplicationType replicationType = ReplicationType.RATIS;
 
   @Rule
   public ExpectedException exception = ExpectedException.none();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index a8df114..6a433b5 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -22,8 +22,7 @@ import com.google.common.base.Strings;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -33,7 +32,6 @@ import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts.Versioning;
 import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
@@ -63,6 +61,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A {@link StorageHandler} implementation that distributes object storage
@@ -80,10 +79,10 @@ public final class DistributedStorageHandler implements StorageHandler {
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
   private int chunkSize;
-  private final boolean useRatis;
-  private final HddsProtos.ReplicationType type;
-  private final HddsProtos.ReplicationFactor factor;
-  private final RetryPolicy retryPolicy;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long watchTimeout;
+  private final long blockSize;
 
   /**
    * Creates a new DistributedStorageHandler.
@@ -100,17 +99,6 @@ public final class DistributedStorageHandler implements StorageHandler {
     this.ozoneManagerClient = ozoneManagerClient;
     this.storageContainerLocationClient = storageContainerLocation;
     this.xceiverClientManager = new XceiverClientManager(conf);
-    this.useRatis = conf.getBoolean(
-        ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
-        ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
-
-    if(useRatis) {
-      type = HddsProtos.ReplicationType.RATIS;
-      factor = HddsProtos.ReplicationFactor.THREE;
-    } else {
-      type = HddsProtos.ReplicationType.STAND_ALONE;
-      factor = HddsProtos.ReplicationFactor.ONE;
-    }
 
     chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
         ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT);
@@ -118,7 +106,6 @@ public final class DistributedStorageHandler implements StorageHandler {
         OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT);
     groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS,
         OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
-    retryPolicy = OzoneClientUtils.createRetryPolicy(conf);
     if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
       LOG.warn("The chunk size ({}) is not allowed to be more than"
               + " the maximum size ({}),"
@@ -126,6 +113,18 @@ public final class DistributedStorageHandler implements StorageHandler {
           chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
       chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
     }
+    streamBufferFlushSize =
+        conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
+            OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT);
+    streamBufferMaxSize =
+        conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
+            OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT);
+    blockSize = conf.getLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB,
+        OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
+    watchTimeout =
+        conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+            OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
   }
 
   @Override
@@ -420,7 +419,10 @@ public final class DistributedStorageHandler implements StorageHandler {
             .setRequestID(args.getRequestID())
             .setType(xceiverClientManager.getType())
             .setFactor(xceiverClientManager.getFactor())
-            .setRetryPolicy(retryPolicy)
+            .setStreamBufferFlushSize(streamBufferFlushSize)
+            .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setBlockSize(blockSize)
+            .setWatchTimeout(watchTimeout)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index a2df50d..3cf4416 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,6 +45,7 @@ public class TestDataValidate {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
@@ -86,6 +88,8 @@ public class TestDataValidate {
     randomKeyGenerator.setNumOfKeys(1);
     randomKeyGenerator.setKeySize(20971520);
     randomKeyGenerator.setValidateWrites(true);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
     randomKeyGenerator.call();
     Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
     Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
@@ -101,6 +105,8 @@ public class TestDataValidate {
     randomKeyGenerator.setNumOfBuckets(5);
     randomKeyGenerator.setNumOfKeys(10);
     randomKeyGenerator.setValidateWrites(true);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
     randomKeyGenerator.call();
     Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated());
     Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
index d21d399..e5bb8ae 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,6 +45,7 @@ public class TestRandomKeyGenerator {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
   }
@@ -65,6 +67,8 @@ public class TestRandomKeyGenerator {
     randomKeyGenerator.setNumOfVolumes(2);
     randomKeyGenerator.setNumOfBuckets(5);
     randomKeyGenerator.setNumOfKeys(10);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
     randomKeyGenerator.call();
     Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated());
     Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated());
@@ -81,6 +85,8 @@ public class TestRandomKeyGenerator {
     randomKeyGenerator.setNumOfKeys(10);
     randomKeyGenerator.setNumOfThreads(10);
     randomKeyGenerator.setKeySize(10240);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
     randomKeyGenerator.call();
     Assert.assertEquals(10, randomKeyGenerator.getNumberOfVolumesCreated());
     Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message