hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cli...@apache.org
Subject hadoop git commit: HDFS-11920. Ozone : add key partition. Contributed by Chen Liang.
Date Mon, 31 Jul 2017 23:01:53 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 1f5353d7b -> 42ab44d34


HDFS-11920. Ozone : add key partition. Contributed by Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/42ab44d3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/42ab44d3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/42ab44d3

Branch: refs/heads/HDFS-7240
Commit: 42ab44d34e7f8b9e39168ed68a178e5ba68f46b8
Parents: 1f5353d
Author: Chen Liang <cliang@apache.org>
Authored: Mon Jul 31 16:01:39 2017 -0700
Committer: Chen Liang <cliang@apache.org>
Committed: Mon Jul 31 16:01:39 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/ksm/helpers/KsmKeyInfo.java   |  62 ++--
 .../hadoop/ksm/helpers/KsmKeyLocationInfo.java  | 137 +++++++++
 .../main/proto/KeySpaceManagerProtocol.proto    |  17 +-
 .../apache/hadoop/ozone/OzoneClientImpl.java    |  80 +----
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |   4 +
 .../java/org/apache/hadoop/ozone/OzoneKey.java  |  43 +--
 .../hadoop/ozone/io/OzoneInputStream.java       |   5 +-
 .../hadoop/ozone/io/OzoneOutputStream.java      |  10 +-
 .../apache/hadoop/ozone/ksm/KeyManagerImpl.java |  41 ++-
 .../hadoop/ozone/ksm/KeySpaceManager.java       |   3 +-
 .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java |   6 +-
 .../hadoop/ozone/web/handlers/KeyHandler.java   |   1 +
 .../web/storage/ChunkGroupInputStream.java      | 211 +++++++++++++
 .../web/storage/ChunkGroupOutputStream.java     | 304 +++++++++++++++++++
 .../web/storage/DistributedStorageHandler.java  | 107 +------
 .../src/main/resources/ozone-default.xml        |   8 +
 .../hadoop/ozone/ksm/TestChunkStreams.java      | 174 +++++++++++
 .../ksm/TestMultipleContainerReadWrite.java     | 210 +++++++++++++
 18 files changed, 1171 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
index 05748e8..f46ec89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.ksm.helpers;
 
-
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 /**
  * Args for key block. The block instance for the key requested in putKey.
  * This is returned from KSM to client, and client use class to talk to
@@ -30,25 +32,19 @@ public final class KsmKeyInfo {
   private final String bucketName;
   // name of key client specified
   private final String keyName;
-  private final String containerName;
-  // name of the block id SCM assigned for the key
-  private final String blockID;
   private final long dataSize;
-  private final boolean shouldCreateContainer;
+  private List<KsmKeyLocationInfo> keyLocationList;
   private final long creationTime;
   private final long modificationTime;
 
   private KsmKeyInfo(String volumeName, String bucketName, String keyName,
-      long dataSize, String blockID, String containerName,
-      boolean shouldCreateContainer, long creationTime,
+      List<KsmKeyLocationInfo> locationInfos, long dataSize, long creationTime,
       long modificationTime) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
-    this.containerName = containerName;
-    this.blockID = blockID;
     this.dataSize = dataSize;
-    this.shouldCreateContainer = shouldCreateContainer;
+    this.keyLocationList = locationInfos;
     this.creationTime = creationTime;
     this.modificationTime = modificationTime;
   }
@@ -65,20 +61,12 @@ public final class KsmKeyInfo {
     return keyName;
   }
 
-  public String getBlockID() {
-    return blockID;
-  }
-
-  public String getContainerName() {
-    return containerName;
-  }
-
   public long getDataSize() {
     return dataSize;
   }
 
-  public boolean getShouldCreateContainer() {
-    return shouldCreateContainer;
+  public List<KsmKeyLocationInfo> getKeyLocationList() {
+    return keyLocationList;
   }
 
   public long getCreationTime() {
@@ -96,10 +84,8 @@ public final class KsmKeyInfo {
     private String volumeName;
     private String bucketName;
     private String keyName;
-    private String containerName;
-    private String blockID;
     private long dataSize;
-    private boolean shouldCreateContainer;
+    private List<KsmKeyLocationInfo> ksmKeyLocationInfos;
     private long creationTime;
     private long modificationTime;
 
@@ -118,13 +104,9 @@ public final class KsmKeyInfo {
       return this;
     }
 
-    public Builder setBlockID(String block) {
-      this.blockID = block;
-      return this;
-    }
-
-    public Builder setContainerName(String container) {
-      this.containerName = container;
+    public Builder setKsmKeyLocationInfos(
+        List<KsmKeyLocationInfo> ksmKeyLocationInfoList) {
+      this.ksmKeyLocationInfos = ksmKeyLocationInfoList;
       return this;
     }
 
@@ -133,11 +115,6 @@ public final class KsmKeyInfo {
       return this;
     }
 
-    public Builder setShouldCreateContainer(boolean create) {
-      this.shouldCreateContainer = create;
-      return this;
-    }
-
     public Builder setCreationTime(long creationTime) {
       this.creationTime = creationTime;
       return this;
@@ -150,8 +127,8 @@ public final class KsmKeyInfo {
 
     public KsmKeyInfo build() {
       return new KsmKeyInfo(
-          volumeName, bucketName, keyName, dataSize, blockID, containerName,
-          shouldCreateContainer, creationTime, modificationTime);
+          volumeName, bucketName, keyName, ksmKeyLocationInfos,
+          dataSize, creationTime, modificationTime);
     }
   }
 
@@ -161,9 +138,8 @@ public final class KsmKeyInfo {
         .setBucketName(bucketName)
         .setKeyName(keyName)
         .setDataSize(dataSize)
-        .setBlockKey(blockID)
-        .setContainerName(containerName)
-        .setShouldCreateContainer(shouldCreateContainer)
+        .addAllKeyLocationList(keyLocationList.stream()
+            .map(KsmKeyLocationInfo::getProtobuf).collect(Collectors.toList()))
         .setCreationTime(creationTime)
         .setModificationTime(modificationTime)
         .build();
@@ -174,10 +150,10 @@ public final class KsmKeyInfo {
         keyInfo.getVolumeName(),
         keyInfo.getBucketName(),
         keyInfo.getKeyName(),
+        keyInfo.getKeyLocationListList().stream()
+            .map(KsmKeyLocationInfo::getFromProtobuf)
+            .collect(Collectors.toList()),
         keyInfo.getDataSize(),
-        keyInfo.getBlockKey(),
-        keyInfo.getContainerName(),
-        keyInfo.getShouldCreateContainer(),
         keyInfo.getCreationTime(),
         keyInfo.getModificationTime());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyLocationInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyLocationInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyLocationInfo.java
new file mode 100644
index 0000000..62d20f6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyLocationInfo.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ksm.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyLocation;
+
+/**
+ * One key can be too huge to fit in one container. In which case it gets split
+ * into a number of subkeys. This class represents one such subkey instance.
+ */
+public final class KsmKeyLocationInfo {
+  private final String containerName;
+  // name of the block id SCM assigned for the key
+  private final String blockID;
+  private final boolean shouldCreateContainer;
+  // the id of this subkey in all the subkeys.
+  private final int index;
+  private final long length;
+  private final long offset;
+
+  private KsmKeyLocationInfo(String containerName,
+      String blockID, boolean shouldCreateContainer, int index,
+      long length, long offset) {
+    this.containerName = containerName;
+    this.blockID = blockID;
+    this.shouldCreateContainer = shouldCreateContainer;
+    this.index = index;
+    this.length = length;
+    this.offset = offset;
+  }
+
+  public String getContainerName() {
+    return containerName;
+  }
+
+  public String getBlockID() {
+    return blockID;
+  }
+
+  public boolean getShouldCreateContainer() {
+    return shouldCreateContainer;
+  }
+
+  public int getIndex() {
+    return index;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  /**
+   * Builder of KsmKeyLocationInfo.
+   */
+  public static class Builder {
+    private String containerName;
+    private String blockID;
+    private boolean shouldCreateContainer;
+    // the id of this subkey in all the subkeys.
+    private int index;
+    private long length;
+    private long offset;
+    public Builder setContainerName(String container) {
+      this.containerName = container;
+      return this;
+    }
+
+    public Builder setBlockID(String block) {
+      this.blockID = block;
+      return this;
+    }
+
+    public Builder setShouldCreateContainer(boolean create) {
+      this.shouldCreateContainer = create;
+      return this;
+    }
+
+    public Builder setIndex(int id) {
+      this.index = id;
+      return this;
+    }
+
+    public Builder setLength(long len) {
+      this.length = len;
+      return this;
+    }
+
+    public Builder setOffset(long off) {
+      this.offset = off;
+      return this;
+    }
+
+    public KsmKeyLocationInfo build() {
+      return new KsmKeyLocationInfo(containerName, blockID,
+          shouldCreateContainer, index, length, offset);
+    }
+  }
+
+  public KeyLocation getProtobuf() {
+    return KeyLocation.newBuilder()
+        .setContainerName(containerName)
+        .setBlockID(blockID)
+        .setShouldCreateContainer(shouldCreateContainer)
+        .setIndex(index)
+        .setLength(length)
+        .setOffset(offset)
+        .build();
+  }
+
+  public static KsmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
+    return new KsmKeyLocationInfo(
+        keyLocation.getContainerName(),
+        keyLocation.getBlockID(),
+        keyLocation.getShouldCreateContainer(),
+        keyLocation.getIndex(),
+        keyLocation.getLength(),
+        keyLocation.getOffset());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
index f641898..956e7fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
@@ -225,16 +225,23 @@ message KeyArgs {
     optional uint64 dataSize = 4;
 }
 
+message KeyLocation {
+    required string blockID = 1;
+    required string containerName = 2;
+    required bool shouldCreateContainer = 3;
+    required uint64 offset = 4;
+    required uint64 length = 5;
+    required uint32 index = 6;
+}
+
 message KeyInfo {
     required string volumeName = 1;
     required string bucketName = 2;
     required string keyName = 3;
     required uint64 dataSize = 4;
-    required string blockKey = 5;
-    required string containerName = 6;
-    required bool shouldCreateContainer = 7;
-    required uint64 creationTime = 8;
-    required uint64 modificationTime = 9;
+    repeated KeyLocation keyLocationList = 5;
+    required uint64 creationTime = 6;
+    required uint64 modificationTime = 7;
 }
 
 message LocateKeyRequest {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
index 3c79778..3bd74a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
@@ -20,12 +20,7 @@ package org.apache.hadoop.ozone;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
-    .ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
-    .ContainerProtos.GetKeyResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
-    .ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
@@ -39,24 +34,20 @@ import org.apache.hadoop.ksm.protocolPB
 import org.apache.hadoop.ksm.protocolPB
     .KeySpaceManagerProtocolPB;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream;
 import org.apache.hadoop.ozone.io.OzoneInputStream;
 import org.apache.hadoop.ozone.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts.Versioning;
 import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.scm.protocolPB
     .StorageContainerLocationProtocolPB;
-import org.apache.hadoop.scm.storage.ChunkInputStream;
-import org.apache.hadoop.scm.storage.ChunkOutputStream;
-import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -480,39 +471,11 @@ public class OzoneClientImpl implements OzoneClient, Closeable {
         .setDataSize(size)
         .build();
 
-    String containerKey = buildContainerKey(volumeName, bucketName, keyName);
     KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
-    // TODO: the following createContainer and key writes may fail, in which
-    // case we should revert the above allocateKey to KSM.
-    String containerName = keyInfo.getContainerName();
-    XceiverClientSpi xceiverClient = getContainer(containerName);
-    if (keyInfo.getShouldCreateContainer()) {
-      LOG.debug("Need to create container {} for key: {}/{}/{}", containerName,
-          volumeName, bucketName, keyName);
-      ContainerProtocolCalls.createContainer(xceiverClient, requestId);
-    }
-    // establish a connection to the container to write the key
-    ChunkOutputStream outputStream = new ChunkOutputStream(containerKey,
-        keyName, xceiverClientManager, xceiverClient, requestId, chunkSize);
-    return new OzoneOutputStream(outputStream);
-  }
-
-  /**
-   * Creates a container key from any number of components by combining all
-   * components with a delimiter.
-   *
-   * @param parts container key components
-   * @return container key
-   */
-  private static String buildContainerKey(String... parts) {
-    return '/' + StringUtils.join('/', parts);
-  }
-
-  private XceiverClientSpi getContainer(String containerName)
-      throws IOException {
-    Pipeline pipeline =
-        storageContainerLocationClient.getContainer(containerName);
-    return xceiverClientManager.acquireClient(pipeline);
+    ChunkGroupOutputStream  groupOutputStream =
+        ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
+        storageContainerLocationClient, chunkSize, requestId);
+    return new OzoneOutputStream(groupOutputStream);
   }
 
   @Override
@@ -529,29 +492,12 @@ public class OzoneClientImpl implements OzoneClient, Closeable {
         .setKeyName(keyName)
         .build();
     KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
-    String containerKey = buildContainerKey(volumeName,
-        bucketName, keyName);
-    String containerName = keyInfo.getContainerName();
-    XceiverClientSpi xceiverClient = getContainer(containerName);
-    boolean success = false;
-    try {
-      LOG.debug("get key accessing {} {}",
-          xceiverClient.getPipeline().getContainerName(), containerKey);
-      KeyData containerKeyData = KeyData.newBuilder().setContainerName(
-          xceiverClient.getPipeline().getContainerName())
-          .setName(containerKey).build();
-      GetKeyResponseProto response = ContainerProtocolCalls
-          .getKey(xceiverClient, containerKeyData, requestId);
-      List<ChunkInfo> chunks = response.getKeyData().getChunksList();
-      success = true;
-      return new OzoneInputStream(new ChunkInputStream(
-          containerKey, xceiverClientManager, xceiverClient,
-          chunks, requestId));
-    } finally {
-      if (!success) {
-        xceiverClientManager.releaseClient(xceiverClient);
-      }
-    }
+    LengthInputStream lengthInputStream =
+        ChunkGroupInputStream.getFromKsmKeyInfo(
+        keyInfo, xceiverClientManager, storageContainerLocationClient,
+        requestId);
+    return new OzoneInputStream(
+        (ChunkGroupInputStream)lengthInputStream.getWrappedStream());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 1822a2a..8c7ac7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -73,6 +73,10 @@ public final class OzoneConfigKeys {
   public static final String OZONE_KEY_CACHE = "ozone.key.cache.size";
   public static final int OZONE_KEY_CACHE_DEFAULT = 1024;
 
+  public static final String OZONE_SCM_BLOCK_SIZE_KEY =
+      "ozone.scm.block.size";
+  public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256 * OzoneConsts.MB;
+
   /**
    * Ozone administrator users delimited by comma.
    * If not set, only the user who launches an ozone service will be the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java
index a99ba0e..b6b62d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
+
+import java.util.List;
 
 /**
  * A class that encapsulates OzoneKey.
@@ -38,19 +41,15 @@ public class OzoneKey {
    */
   private final String keyName;
   /**
-   * Name of the Container the Key resides in.
-   */
-  private final String containerName;
-  /**
-   * Name of the block id SCM assigned for the key.
-   */
-  private final String blockID;
-  /**
    * Size of the data.
    */
   private final long dataSize;
 
   /**
+   * All the locations of this key, in an ordered list.
+   */
+  private final List<KsmKeyLocationInfo> keyLocations;
+  /**
    * Constructs OzoneKey from KsmKeyInfo.
    *
    * @param ksmKeyInfo
@@ -59,9 +58,8 @@ public class OzoneKey {
     this.volumeName = ksmKeyInfo.getVolumeName();
     this.bucketName = ksmKeyInfo.getBucketName();
     this.keyName = ksmKeyInfo.getKeyName();
-    this.containerName = ksmKeyInfo.getContainerName();
-    this.blockID = ksmKeyInfo.getBlockID();
     this.dataSize = ksmKeyInfo.getDataSize();
+    this.keyLocations = ksmKeyInfo.getKeyLocationList();
   }
 
   /**
@@ -92,29 +90,20 @@ public class OzoneKey {
   }
 
   /**
-   * Returns Container Name associated with the Key.
-   *
-   * @return containerName
-   */
-  public String getContainerName() {
-    return containerName;
-  }
-
-  /**
-   * Returns BlockID associated with the Key.
+   * Returns the size of the data.
    *
-   * @return blockID
+   * @return dataSize
    */
-  public String getBlockID() {
-    return blockID;
+  public long getDataSize() {
+    return dataSize;
   }
 
   /**
-   * Returns the size of the data.
+   * Retruns the list of the key locations.
    *
-   * @return dataSize
+   * @return key locations
    */
-  public long getDataSize() {
-    return dataSize;
+  public List<KsmKeyLocationInfo> getKeyLocations() {
+    return keyLocations;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java
index 0813868..65d5d1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.io;
 
+import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream;
 import org.apache.hadoop.scm.storage.ChunkInputStream;
 
 import java.io.IOException;
@@ -28,14 +29,14 @@ import java.io.InputStream;
  */
 public class OzoneInputStream extends InputStream {
 
-  private final ChunkInputStream inputStream;
+  private final ChunkGroupInputStream inputStream;
 
   /**
    * Constructs OzoneInputStream with ChunkInputStream.
    *
    * @param inputStream
    */
-  public OzoneInputStream(ChunkInputStream inputStream) {
+  public OzoneInputStream(ChunkGroupInputStream inputStream) {
     this.inputStream = inputStream;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java
index f473292..2421c4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java
@@ -17,25 +17,25 @@
 
 package org.apache.hadoop.ozone.io;
 
-import org.apache.hadoop.scm.storage.ChunkOutputStream;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream;
 
 import java.io.IOException;
 import java.io.OutputStream;
 
 /**
  * OzoneOutputStream is used to write data into Ozone.
- * It uses SCM's {@link ChunkOutputStream} for writing the data.
+ * It uses SCM's {@link ChunkGroupOutputStream} for writing the data.
  */
 public class OzoneOutputStream extends OutputStream {
 
-  private final ChunkOutputStream outputStream;
+  private final ChunkGroupOutputStream outputStream;
 
   /**
-   * Constructs OzoneOutputStream with ChunkOutputStream.
+   * Constructs OzoneOutputStream with ChunkGroupOutputStream.
    *
    * @param outputStream
    */
-  public OzoneOutputStream(ChunkOutputStream outputStream) {
+  public OzoneOutputStream(ChunkGroupOutputStream outputStream) {
     this.outputStream = outputStream;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
index d3b7e48..48e1049 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.ksm;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
@@ -31,8 +33,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_KEY;
+
 /**
  * Implementation of keyManager.
  */
@@ -45,11 +51,14 @@ public class KeyManagerImpl implements KeyManager {
    */
   private final ScmBlockLocationProtocol scmBlockClient;
   private final MetadataManager metadataManager;
+  private final long scmBlockSize;
 
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
-      MetadataManager metadataManager) {
+      MetadataManager metadataManager, OzoneConfiguration conf) {
     this.scmBlockClient = scmBlockClient;
     this.metadataManager = metadataManager;
+    this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_KEY,
+        OZONE_SCM_BLOCK_SIZE_DEFAULT);
   }
 
   @Override
@@ -92,17 +101,37 @@ public class KeyManagerImpl implements KeyManager {
       // with a actual SCM block.
       // TODO : Review this decision later. We can get away with only a
       // metadata entry in case of 0 length key.
-      AllocatedBlock allocatedBlock =
-          scmBlockClient.allocateBlock(Math.max(args.getDataSize(), 1));
+      long targetSize = args.getDataSize();
+      List<KsmKeyLocationInfo> subKeyInfos = new ArrayList<>();
+      int idx = 0;
+      long offset = 0;
+
+      // in case targetSize == 0, subKeyInfos will be an empty list
+      while (targetSize > 0) {
+        long allocateSize = Math.min(targetSize, scmBlockSize);
+        AllocatedBlock allocatedBlock =
+            scmBlockClient.allocateBlock(allocateSize);
+        KsmKeyLocationInfo subKeyInfo = new KsmKeyLocationInfo.Builder()
+            .setContainerName(allocatedBlock.getPipeline().getContainerName())
+            .setBlockID(allocatedBlock.getKey())
+            .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+            .setIndex(idx)
+            .setLength(allocateSize)
+            .setOffset(offset)
+            .build();
+        idx += 1;
+        offset += allocateSize;
+        targetSize -= allocateSize;
+        subKeyInfos.add(subKeyInfo);
+      }
+
       long currentTime = Time.now();
       KsmKeyInfo keyBlock = new KsmKeyInfo.Builder()
           .setVolumeName(args.getVolumeName())
           .setBucketName(args.getBucketName())
           .setKeyName(args.getKeyName())
           .setDataSize(args.getDataSize())
-          .setBlockID(allocatedBlock.getKey())
-          .setContainerName(allocatedBlock.getPipeline().getContainerName())
-          .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+          .setKsmKeyLocationInfos(subKeyInfos)
           .setCreationTime(currentTime)
           .setModificationTime(currentTime)
           .build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 435b243..94c0975 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -98,7 +98,8 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
     volumeManager = new VolumeManagerImpl(metadataManager, conf);
     bucketManager = new BucketManagerImpl(metadataManager);
     metrics = KSMMetrics.create();
-    keyManager = new KeyManagerImpl(getScmBlockClient(conf), metadataManager);
+    keyManager = new KeyManagerImpl(
+        getScmBlockClient(conf), metadataManager, conf);
     httpServer = new KeySpaceManagerHttpServer(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index 30a2d22..9e4053e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -400,11 +400,13 @@ public class SQLCLI  extends Configured implements Tool {
       break;
     case KEY:
       KeyInfo keyInfo = KeyInfo.parseFrom(value);
+      // TODO : the two fields container name and block id are no longer used,
+      // need to revisit this later.
       String insertKeyInfo =
           String.format(INSERT_KEY_INFO, keyInfo.getVolumeName(),
               keyInfo.getBucketName(), keyInfo.getKeyName(),
-              keyInfo.getDataSize(), keyInfo.getBlockKey(),
-              keyInfo.getContainerName());
+              keyInfo.getDataSize(), "EMPTY",
+              "EMPTY");
       executeSQL(conn, insertKeyInfo);
       break;
     default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyHandler.java
index 0655a0f..66aeb62 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyHandler.java
@@ -166,6 +166,7 @@ public class KeyHandler implements Keys {
         String contentLenString = getContentLength(headers, args);
         String newLen = contentLenString.replaceAll("\"", "");
         int contentLen = Integer.parseInt(newLen);
+        args.setSize(contentLen);
 
         MessageDigest md5 = MessageDigest.getInstance("MD5");
         int bytesRead = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupInputStream.java
new file mode 100644
index 0000000..12df012
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupInputStream.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.web.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ChunkInputStream;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Maintaining a list of ChunkInputStream. Read based on offset.
+ */
+public class ChunkGroupInputStream extends InputStream {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ChunkGroupInputStream.class);
+
+  private static final int EOF = -1;
+
+  private final ArrayList<ChunkInputStreamEntry> streamEntries;
+  private int currentStreamIndex;
+
+  public ChunkGroupInputStream() {
+    streamEntries = new ArrayList<>();
+    currentStreamIndex = 0;
+  }
+
+  @VisibleForTesting
+  public synchronized int getCurrentStreamIndex() {
+    return currentStreamIndex;
+  }
+
+  @VisibleForTesting
+  public long getRemainingOfIndex(int index) {
+    return streamEntries.get(index).getRemaining();
+  }
+
+  /**
+   * Append another stream to the end of the list.
+   *
+   * @param stream the stream instance.
+   * @param length the max number of bytes that should be written to this
+   *               stream.
+   */
+  public synchronized void addStream(InputStream stream, long length) {
+    streamEntries.add(new ChunkInputStreamEntry(stream, length));
+  }
+
+
+  @Override
+  public synchronized int read() throws IOException {
+    if (streamEntries.size() <= currentStreamIndex) {
+      throw new IndexOutOfBoundsException();
+    }
+    ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex);
+    int data = entry.read();
+    return data;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return 0;
+    }
+    int totalReadLen = 0;
+    while (len > 0) {
+      if (streamEntries.size() <= currentStreamIndex) {
+        return totalReadLen == 0 ? EOF : totalReadLen;
+      }
+      ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
+      int readLen = Math.min(len, (int)current.getRemaining());
+      int actualLen = current.read(b, off, readLen);
+      // this means the underlying stream has nothing at all, return
+      if (actualLen == EOF) {
+        return totalReadLen > 0? totalReadLen : EOF;
+      }
+      totalReadLen += actualLen;
+      // this means there is no more data to read beyond this point, return
+      if (actualLen != readLen) {
+        return totalReadLen;
+      }
+      off += readLen;
+      len -= readLen;
+      if (current.getRemaining() <= 0) {
+        currentStreamIndex += 1;
+      }
+    }
+    return totalReadLen;
+  }
+
+  private static class ChunkInputStreamEntry extends InputStream {
+
+    private final InputStream inputStream;
+    private final long length;
+    private long currentPosition;
+
+
+    ChunkInputStreamEntry(InputStream chunkInputStream, long length) {
+      this.inputStream = chunkInputStream;
+      this.length = length;
+      this.currentPosition = 0;
+    }
+
+    synchronized long getRemaining() {
+      return length - currentPosition;
+    }
+
+    @Override
+    public synchronized int read(byte[] b, int off, int len)
+        throws IOException {
+      int readLen = inputStream.read(b, off, len);
+      currentPosition += readLen;
+      return readLen;
+    }
+
+    @Override
+    public synchronized int read() throws IOException {
+      int data = inputStream.read();
+      currentPosition += 1;
+      return data;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+      inputStream.close();
+    }
+  }
+
+  public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
+      XceiverClientManager xceiverClientManager,
+      StorageContainerLocationProtocolClientSideTranslatorPB
+          storageContainerLocationClient, String requestId)
+      throws IOException {
+    int index = 0;
+    long length = 0;
+    String containerKey;
+    ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
+    for (KsmKeyLocationInfo ksmKeyLocationInfo : keyInfo.getKeyLocationList()) {
+      // check index as sanity check
+      Preconditions.checkArgument(index++ == ksmKeyLocationInfo.getIndex());
+      String containerName = ksmKeyLocationInfo.getContainerName();
+      Pipeline pipeline =
+          storageContainerLocationClient.getContainer(containerName);
+      XceiverClientSpi xceiverClient =
+          xceiverClientManager.acquireClient(pipeline);
+      boolean success = false;
+      containerKey = ksmKeyLocationInfo.getBlockID();
+      try {
+        LOG.debug("get key accessing {} {}",
+            xceiverClient.getPipeline().getContainerName(), containerKey);
+        ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
+            .containerKeyDataForRead(
+                xceiverClient.getPipeline().getContainerName(), containerKey);
+        ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls
+            .getKey(xceiverClient, containerKeyData, requestId);
+        List<ContainerProtos.ChunkInfo> chunks =
+            response.getKeyData().getChunksList();
+        for (ContainerProtos.ChunkInfo chunk : chunks) {
+          length += chunk.getLen();
+        }
+        success = true;
+        ChunkInputStream inputStream = new ChunkInputStream(
+            containerKey, xceiverClientManager, xceiverClient,
+            chunks, requestId);
+        groupInputStream.addStream(inputStream,
+            ksmKeyLocationInfo.getLength());
+      } finally {
+        if (!success) {
+          xceiverClientManager.releaseClient(xceiverClient);
+        }
+      }
+    }
+    return new LengthInputStream(groupInputStream, length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java
new file mode 100644
index 0000000..f479f0a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.web.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ChunkOutputStream;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+/**
+ * Maintaining a list of ChunkInputStream. Write based on offset.
+ *
+ * Note that this may write to multiple containers in one write call. In case
+ * that first container succeeded but later ones failed, the succeeded writes
+ * are not rolled back.
+ *
+ * TODO : currently not support multi-thread access.
+ */
+public class ChunkGroupOutputStream extends OutputStream {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ChunkGroupOutputStream.class);
+
+  // array list's get(index) is O(1)
+  private final ArrayList<ChunkOutputStreamEntry> streamEntries;
+  private int currentStreamIndex;
+  private long totalSize;
+  private long byteOffset;
+
+  public ChunkGroupOutputStream() {
+    this.streamEntries = new ArrayList<>();
+    this.currentStreamIndex = 0;
+    this.totalSize = 0;
+    this.byteOffset = 0;
+  }
+
+  @VisibleForTesting
+  public long getByteOffset() {
+    return byteOffset;
+  }
+
+  /**
+   * Append another stream to the end of the list. Note that the streams are not
+   * actually created to this point, only enough meta data about the stream is
+   * stored. When something is to be actually written to the stream, the stream
+   * will be created (if not already).
+   *
+   * @param containerKey the key to store in the container
+   * @param key the ozone key
+   * @param xceiverClientManager xceiver manager instance
+   * @param xceiverClient xceiver manager instance
+   * @param requestID the request id
+   * @param chunkSize the chunk size for this key chunks
+   * @param length the total length of this key
+   */
+  public synchronized void addStream(String containerKey, String key,
+      XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
+      String requestID, int chunkSize, long length) {
+    streamEntries.add(new ChunkOutputStreamEntry(containerKey, key,
+        xceiverClientManager, xceiverClient, requestID, chunkSize, length));
+    totalSize += length;
+  }
+
+  @VisibleForTesting
+  public synchronized void addStream(OutputStream outputStream, long length) {
+    streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
+    totalSize += length;
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    if (streamEntries.size() <= currentStreamIndex) {
+      throw new IndexOutOfBoundsException();
+    }
+    ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
+    entry.write(b);
+    if (entry.getRemaining() <= 0) {
+      currentStreamIndex += 1;
+    }
+    byteOffset += 1;
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to streams.
+   *
+   * NOTE: Throws exception if the data could not fit into the remaining space.
+   * In which case nothing will be written.
+   * TODO:May need to revisit this behaviour.
+   *
+   * @param b byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public synchronized void write(byte[] b, int off, int len)
+      throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+    if (streamEntries.size() <= currentStreamIndex) {
+      throw new IOException("Write out of stream range! stream index:" +
+          currentStreamIndex);
+    }
+    if (totalSize - byteOffset < len) {
+      throw new IOException("Can not write " + len + " bytes with only " +
+          (totalSize - byteOffset) + " byte space");
+    }
+    while (len > 0) {
+      // in theory, this condition should never violate due the check above
+      // still do a sanity check.
+      Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+      ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
+      int writeLen = Math.min(len, (int)current.getRemaining());
+      current.write(b, off, writeLen);
+      if (current.getRemaining() <= 0) {
+        currentStreamIndex += 1;
+      }
+      len -= writeLen;
+      off += writeLen;
+      byteOffset += writeLen;
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    for (int i = 0; i <= currentStreamIndex; i++) {
+      streamEntries.get(i).flush();
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    for (ChunkOutputStreamEntry entry : streamEntries) {
+      entry.close();
+    }
+  }
+
+  private static class ChunkOutputStreamEntry extends OutputStream {
+    private OutputStream outputStream;
+    private final String containerKey;
+    private final String key;
+    private final XceiverClientManager xceiverClientManager;
+    private final XceiverClientSpi xceiverClient;
+    private final String requestId;
+    private final int chunkSize;
+    // total number of bytes that should be written to this stream
+    private final long length;
+    // the current position of this stream 0 <= currentPosition < length
+    private long currentPosition;
+
+    ChunkOutputStreamEntry(String containerKey, String key,
+        XceiverClientManager xceiverClientManager,
+        XceiverClientSpi xceiverClient, String requestId, int chunkSize,
+        long length) {
+      this.outputStream = null;
+      this.containerKey = containerKey;
+      this.key = key;
+      this.xceiverClientManager = xceiverClientManager;
+      this.xceiverClient = xceiverClient;
+      this.requestId = requestId;
+      this.chunkSize = chunkSize;
+
+      this.length = length;
+      this.currentPosition = 0;
+    }
+
+    /**
+     * For testing purpose, taking a some random created stream instance.
+     * @param  outputStream a existing writable output stream
+     * @param  length the length of data to write to the stream
+     */
+    ChunkOutputStreamEntry(OutputStream outputStream, long length) {
+      this.outputStream = outputStream;
+      this.containerKey = null;
+      this.key = null;
+      this.xceiverClientManager = null;
+      this.xceiverClient = null;
+      this.requestId = null;
+      this.chunkSize = -1;
+
+      this.length = length;
+      this.currentPosition = 0;
+    }
+
+    long getLength() {
+      return length;
+    }
+
+    long getRemaining() {
+      return length - currentPosition;
+    }
+
+    private synchronized void checkStream() {
+      if (this.outputStream == null) {
+        this.outputStream = new ChunkOutputStream(containerKey,
+            key, xceiverClientManager, xceiverClient,
+            requestId, chunkSize);
+      }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      checkStream();
+      outputStream.write(b);
+      this.currentPosition += 1;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      checkStream();
+      outputStream.write(b, off, len);
+      this.currentPosition += len;
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (this.outputStream != null) {
+        this.outputStream.flush();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (this.outputStream != null) {
+        this.outputStream.close();
+      }
+    }
+  }
+
+  public static ChunkGroupOutputStream getFromKsmKeyInfo(
+      KsmKeyInfo keyInfo, XceiverClientManager xceiverClientManager,
+      StorageContainerLocationProtocolClientSideTranslatorPB
+          storageContainerLocationClient,
+      int chunkSize, String requestId) throws IOException {
+    // TODO: the following createContainer and key writes may fail, in which
+    // case we should revert the above allocateKey to KSM.
+    // check index as sanity check
+    int index = 0;
+    String containerKey;
+    ChunkGroupOutputStream groupOutputStream = new ChunkGroupOutputStream();
+    for (KsmKeyLocationInfo subKeyInfo : keyInfo.getKeyLocationList()) {
+      containerKey = subKeyInfo.getBlockID();
+
+      Preconditions.checkArgument(index++ == subKeyInfo.getIndex());
+      String containerName = subKeyInfo.getContainerName();
+      Pipeline pipeline =
+          storageContainerLocationClient.getContainer(containerName);
+      XceiverClientSpi xceiverClient =
+          xceiverClientManager.acquireClient(pipeline);
+      // create container if needed
+      // TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
+      // always true.
+      boolean shouldCreate = true;
+      if (shouldCreate) {
+        try {
+          ContainerProtocolCalls.createContainer(xceiverClient, requestId);
+        } catch (StorageContainerException sce) {
+          LOG.warn("Create container failed with {}", containerName, sce);
+        }
+      }
+
+      groupOutputStream.addStream(containerKey, keyInfo.getKeyName(),
+          xceiverClientManager, xceiverClient, requestId, chunkSize,
+          subKeyInfo.getLength());
+    }
+    return groupOutputStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index 1bf5b11..51affd3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -19,12 +19,6 @@
 package org.apache.hadoop.ozone.web.storage;
 
 import com.google.common.base.Strings;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
-    .ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
-    .ContainerProtos.GetKeyResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
-    .ContainerProtos.KeyData;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset
     .LengthInputStream;
 import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
@@ -37,12 +31,12 @@ import org.apache.hadoop.ksm.protocolPB
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
 import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.web.request.OzoneQuota;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.protocolPB
@@ -61,21 +55,12 @@ import org.apache.hadoop.ozone.web.response.ListBuckets;
 import org.apache.hadoop.ozone.web.response.BucketInfo;
 import org.apache.hadoop.ozone.web.response.KeyInfo;
 import org.apache.hadoop.ozone.web.response.ListKeys;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.scm.storage.ChunkInputStream;
-import org.apache.hadoop.scm.storage.ChunkOutputStream;
-import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-import java.util.Locale;
 import java.util.List;
 
 /**
@@ -104,9 +89,9 @@ public final class DistributedStorageHandler implements StorageHandler {
    */
   public DistributedStorageHandler(OzoneConfiguration conf,
       StorageContainerLocationProtocolClientSideTranslatorPB
-                                       storageContainerLocation,
+          storageContainerLocation,
       KeySpaceManagerProtocolClientSideTranslatorPB
-                                       keySpaceManagerClient) {
+          keySpaceManagerClient) {
     this.keySpaceManagerClient = keySpaceManagerClient;
     this.storageContainerLocationClient = storageContainerLocation;
     this.xceiverClientManager = new XceiverClientManager(conf);
@@ -119,8 +104,8 @@ public final class DistributedStorageHandler implements StorageHandler {
         KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
     if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
       LOG.warn("The chunk size ({}) is not allowed to be more than"
-          + " the maximum size ({}),"
-          + " resetting to the maximum size.",
+              + " the maximum size ({}),"
+              + " resetting to the maximum size.",
           chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
       chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
     }
@@ -159,7 +144,7 @@ public final class DistributedStorageHandler implements StorageHandler {
   public void setVolumeQuota(VolumeArgs args, boolean remove)
       throws IOException, OzoneException {
     long quota = remove ? OzoneConsts.MAX_QUOTA_IN_BYTES :
-                                   args.getQuota().sizeInBytes();
+        args.getQuota().sizeInBytes();
     keySpaceManagerClient.setQuota(args.getVolumeName(), quota);
   }
 
@@ -397,22 +382,11 @@ public final class DistributedStorageHandler implements StorageHandler {
         .setDataSize(args.getSize())
         .build();
     // contact KSM to allocate a block for key.
-    String containerKey = buildContainerKey(args.getVolumeName(),
-        args.getBucketName(), args.getKeyName());
     KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
-    // TODO the following createContainer and key writes may fail, in which
-    // case we should revert the above allocateKey to KSM.
-    String containerName = keyInfo.getContainerName();
-    XceiverClientSpi xceiverClient = getContainer(containerName);
-    if (keyInfo.getShouldCreateContainer()) {
-      LOG.debug("Need to create container {} for key: {}/{}/{}", containerName,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
-      ContainerProtocolCalls.createContainer(
-          xceiverClient, args.getRequestID());
-    }
-    // establish a connection to the container to write the key
-    return new ChunkOutputStream(containerKey, args.getKeyName(),
-        xceiverClientManager, xceiverClient, args.getRequestID(), chunkSize);
+    ChunkGroupOutputStream groupOutputStream =
+        ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
+            storageContainerLocationClient, chunkSize, args.getRequestID());
+    return new OzoneOutputStream(groupOutputStream);
   }
 
   @Override
@@ -431,33 +405,9 @@ public final class DistributedStorageHandler implements StorageHandler {
         .setDataSize(args.getSize())
         .build();
     KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
-    String containerKey = buildContainerKey(args.getVolumeName(),
-        args.getBucketName(), args.getKeyName());
-    String containerName = keyInfo.getContainerName();
-    XceiverClientSpi xceiverClient = getContainer(containerName);
-    boolean success = false;
-    try {
-      LOG.debug("get key accessing {} {}",
-          xceiverClient.getPipeline().getContainerName(), containerKey);
-      KeyData containerKeyData = OzoneContainerTranslation
-          .containerKeyDataForRead(
-              xceiverClient.getPipeline().getContainerName(), containerKey);
-      GetKeyResponseProto response = ContainerProtocolCalls
-          .getKey(xceiverClient, containerKeyData, args.getRequestID());
-      long length = 0;
-      List<ChunkInfo> chunks = response.getKeyData().getChunksList();
-      for (ChunkInfo chunk : chunks) {
-        length += chunk.getLen();
-      }
-      success = true;
-      return new LengthInputStream(new ChunkInputStream(
-          containerKey, xceiverClientManager, xceiverClient,
-          chunks, args.getRequestID()), length);
-    } finally {
-      if (!success) {
-        xceiverClientManager.releaseClient(xceiverClient);
-      }
-    }
+    return ChunkGroupInputStream.getFromKsmKeyInfo(
+        keyInfo, xceiverClientManager, storageContainerLocationClient,
+        args.getRequestID());
   }
 
   @Override
@@ -535,37 +485,6 @@ public final class DistributedStorageHandler implements StorageHandler {
     }
   }
 
-  private XceiverClientSpi getContainer(String containerName)
-      throws IOException {
-    Pipeline pipeline =
-        storageContainerLocationClient.getContainer(containerName);
-    return xceiverClientManager.acquireClient(pipeline);
-  }
-
-  /**
-   * Creates a container key from any number of components by combining all
-   * components with a delimiter.
-   *
-   * @param parts container key components
-   * @return container key
-   */
-  private static String buildContainerKey(String... parts) {
-    return '/' + StringUtils.join('/', parts);
-  }
-
-  /**
-   * Formats a date in the expected string format.
-   *
-   * @param date the date to format
-   * @return formatted string representation of date
-   */
-  private static String dateToString(Date date) {
-    SimpleDateFormat sdf =
-        new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US);
-    sdf.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE));
-    return sdf.format(date);
-  }
-
   /**
    * Closes DistributedStorageHandler.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index cf14013..be9a48a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -813,4 +813,12 @@
       Port used by cblock to connect to SCM.
     </description>
   </property>
+
+  <property>
+    <name>ozone.scm.block.size</name>
+    <value>268435456</value>
+    <description>
+      The default size of a scm block in bytes.
+    </description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java
new file mode 100644
index 0000000..2c936df
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This class tests ChunkGroupInputStream and ChunkGroupOutStream.
+ */
+public class TestChunkStreams {
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * This test uses ByteArrayOutputStream as the underlying stream to test
+   * the correctness of ChunkGroupOutputStream.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testWriteGroupOutputStream() throws Exception {
+    try (ChunkGroupOutputStream groupOutputStream =
+             new ChunkGroupOutputStream()) {
+      ArrayList<OutputStream> outputStreams = new ArrayList<>();
+
+      // 5 byte streams, each 100 bytes. write 500 bytes means writing to each
+      // of them with 100 bytes.
+      for (int i = 0; i < 5; i++) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream(100);
+        outputStreams.add(out);
+        groupOutputStream.addStream(out, 100);
+      }
+      assertEquals(0, groupOutputStream.getByteOffset());
+
+      String dataString = RandomStringUtils.randomAscii(500);
+      byte[] data = dataString.getBytes();
+      groupOutputStream.write(data, 0, data.length);
+      assertEquals(500, groupOutputStream.getByteOffset());
+
+      String res = "";
+      int offset = 0;
+      for (OutputStream stream : outputStreams) {
+        String subString = stream.toString();
+        res += subString;
+        assertEquals(dataString.substring(offset, offset + 100), subString);
+        offset += 100;
+      }
+      assertEquals(dataString, res);
+    }
+  }
+
+  @Test
+  public void testErrorWriteGroupOutputStream() throws Exception {
+    try (ChunkGroupOutputStream groupOutputStream =
+             new ChunkGroupOutputStream()) {
+      ArrayList<OutputStream> outputStreams = new ArrayList<>();
+
+      // 5 byte streams, each 100 bytes. write 500 bytes means writing to each
+      // of them with 100 bytes. all 5 streams makes up a ChunkGroupOutputStream
+      // with a total of 500 bytes in size
+      for (int i = 0; i < 5; i++) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream(100);
+        outputStreams.add(out);
+        groupOutputStream.addStream(out, 100);
+      }
+      assertEquals(0, groupOutputStream.getByteOffset());
+
+      // first writes of 100 bytes should succeed
+      groupOutputStream.write(RandomStringUtils.randomAscii(100).getBytes());
+      assertEquals(100, groupOutputStream.getByteOffset());
+
+      // second writes of 500 bytes should fail, as there should be only 400
+      // bytes space left
+      // TODO : if we decide to take the 400 bytes instead in the future,
+      // other add more informative error code rather than exception, need to
+      // change this part.
+      exception.expect(IOException.class);
+      exception.expectMessage(
+          "Can not write 500 bytes with only 400 byte space");
+      groupOutputStream.write(RandomStringUtils.randomAscii(500).getBytes());
+      assertEquals(100, groupOutputStream.getByteOffset());
+    }
+  }
+
+  @Test
+  public void testReadGroupInputStream() throws Exception {
+    try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
+      ArrayList<InputStream> inputStreams = new ArrayList<>();
+
+      String dataString = RandomStringUtils.randomAscii(500);
+      byte[] buf = dataString.getBytes();
+      int offset = 0;
+      for (int i = 0; i < 5; i++) {
+        ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100);
+        inputStreams.add(in);
+        offset += 100;
+        groupInputStream.addStream(in, 100);
+      }
+
+      byte[] resBuf = new byte[500];
+      int len = groupInputStream.read(resBuf, 0, 500);
+
+      assertEquals(500, len);
+      assertEquals(dataString, new String(resBuf));
+    }
+  }
+
+  @Test
+  public void testErrorReadGroupInputStream() throws Exception {
+    try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
+      ArrayList<InputStream> inputStreams = new ArrayList<>();
+
+      String dataString = RandomStringUtils.randomAscii(500);
+      byte[] buf = dataString.getBytes();
+      int offset = 0;
+      for (int i = 0; i < 5; i++) {
+        ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100);
+        inputStreams.add(in);
+        offset += 100;
+        groupInputStream.addStream(in, 100);
+      }
+
+      byte[] resBuf = new byte[600];
+      // read 300 bytes first
+      int len = groupInputStream.read(resBuf, 0, 340);
+      assertEquals(3, groupInputStream.getCurrentStreamIndex());
+      assertEquals(60, groupInputStream.getRemainingOfIndex(3));
+      assertEquals(340, len);
+      assertEquals(dataString.substring(0, 340),
+          new String(resBuf).substring(0, 340));
+
+      // read following 300 bytes, but only 200 left
+      len = groupInputStream.read(resBuf, 340, 260);
+      assertEquals(5, groupInputStream.getCurrentStreamIndex());
+      assertEquals(0, groupInputStream.getRemainingOfIndex(4));
+      assertEquals(160, len);
+      assertEquals(dataString, new String(resBuf).substring(0, 500));
+
+      // further read should get EOF
+      len = groupInputStream.read(resBuf, 0, 1);
+      // reached EOF, further read should get -1
+      assertEquals(-1, len);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java
new file mode 100644
index 0000000..c310055
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.ScmConfigKeys;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test key write/read where a key can span multiple containers.
+ */
+public class TestMultipleContainerReadWrite {
+  private static MiniOzoneCluster cluster = null;
+  private static StorageHandler storageHandler;
+  private static UserArgs userArgs;
+  private static OzoneConfiguration conf;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    // set to as small as 100 bytes per block.
+    conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_KEY, 100);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 5);
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = new MiniOzoneCluster.Builder(conf)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testWriteRead() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String dataString = RandomStringUtils.randomAscii(500);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(500);
+
+    try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
+      outputStream.write(dataString.getBytes());
+    }
+
+    byte[] data = new byte[dataString.length()];
+    try (InputStream inputStream = storageHandler.newKeyReader(keyArgs)) {
+      inputStream.read(data, 0, data.length);
+    }
+    assertEquals(dataString, new String(data));
+    // checking whether container meta data has the chunk file persisted.
+    MetricsRecordBuilder containerMetrics = getMetrics(
+        "StorageContainerMetrics");
+    assertCounter("numWriteChunk", 5L, containerMetrics);
+    assertCounter("numReadChunk", 5L, containerMetrics);
+  }
+
+  @Test
+  public void testErrorWrite() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String dataString1 = RandomStringUtils.randomAscii(100);
+    String dataString2 = RandomStringUtils.randomAscii(500);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(500);
+
+    try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
+      // first write will write succeed
+      outputStream.write(dataString1.getBytes());
+      // second write
+      exception.expect(IOException.class);
+      exception.expectMessage(
+          "Can not write 500 bytes with only 400 byte space");
+      outputStream.write(dataString2.getBytes());
+    }
+  }
+
+  @Test
+  public void testPartialRead() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String dataString = RandomStringUtils.randomAscii(500);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(500);
+
+    try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
+      outputStream.write(dataString.getBytes());
+    }
+
+    byte[] data = new byte[600];
+    try (InputStream inputStream = storageHandler.newKeyReader(keyArgs)) {
+      int readLen = inputStream.read(data, 0, 340);
+      assertEquals(340, readLen);
+      assertEquals(dataString.substring(0, 340),
+          new String(data).substring(0, 340));
+
+      readLen = inputStream.read(data, 340, 260);
+      assertEquals(160, readLen);
+      assertEquals(dataString, new String(data).substring(0, 500));
+
+      readLen = inputStream.read(data, 500, 1);
+      assertEquals(-1, readLen);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message