hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From x...@apache.org
Subject hadoop git commit: HDFS-11780. Ozone: KSM: Add putKey. Contributed by Chen Liang.
Date Fri, 26 May 2017 05:11:22 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 67da8be74 -> e641bee7b


HDFS-11780. Ozone: KSM: Add putKey. Contributed by Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e641bee7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e641bee7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e641bee7

Branch: refs/heads/HDFS-7240
Commit: e641bee7b7770fc30b9f6bbc688c6025b601e5bd
Parents: 67da8be
Author: Xiaoyu Yao <xyao@apache.org>
Authored: Thu May 25 22:06:17 2017 -0700
Committer: Xiaoyu Yao <xyao@apache.org>
Committed: Thu May 25 22:06:17 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/ksm/helpers/KsmKeyArgs.java   |  88 +++++++++++
 .../apache/hadoop/ksm/helpers/KsmKeyInfo.java   | 156 +++++++++++++++++++
 .../ksm/protocol/KeySpaceManagerProtocol.java   |   7 +
 ...ceManagerProtocolClientSideTranslatorPB.java |  38 +++++
 .../org/apache/hadoop/scm/ScmConfigKeys.java    |   2 +-
 .../main/proto/KeySpaceManagerProtocol.proto    |  33 ++++
 .../org/apache/hadoop/ozone/ksm/KSMMetrics.java |  19 +++
 .../org/apache/hadoop/ozone/ksm/KeyManager.java |  45 ++++++
 .../apache/hadoop/ozone/ksm/KeyManagerImpl.java | 109 +++++++++++++
 .../hadoop/ozone/ksm/KeySpaceManager.java       |  50 ++++++
 .../hadoop/ozone/ksm/MetadataManager.java       |   9 ++
 .../hadoop/ozone/ksm/MetadataManagerImpl.java   |   8 +
 .../ozone/ksm/exceptions/KSMException.java      |   1 +
 ...ceManagerProtocolServerSideTranslatorPB.java |  31 ++++
 .../ozone/scm/block/BlockManagerImpl.java       |   4 +-
 .../web/storage/DistributedStorageHandler.java  |  38 ++++-
 .../hadoop/ozone/ksm/TestKeySpaceManager.java   |  37 ++++-
 17 files changed, 665 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyArgs.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyArgs.java
new file mode 100644
index 0000000..a034ed3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyArgs.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ksm.helpers;
+
+/**
+ * Args for key. Client use this to specify key's attributes on  key creation
+ * (putKey()).
+ */
+public final class KsmKeyArgs {
+  private final String volumeName;
+  private final String bucketName;
+  private final String keyName;
+
+  private final long dataSize;
+
+  private KsmKeyArgs(String volumeName, String bucketName, String keyName,
+      long dataSize) {
+    this.volumeName = volumeName;
+    this.bucketName = bucketName;
+    this.keyName = keyName;
+    this.dataSize = dataSize;
+  }
+
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  public String getBucketName() {
+    return bucketName;
+  }
+
+  public String getKeyName() {
+    return keyName;
+  }
+
+  public long getDataSize() {
+    return dataSize;
+  }
+
+  /**
+   * Builder class of KsmKeyArgs.
+   */
+  public static class Builder {
+    private String volumeName;
+    private String bucketName;
+    private String keyName;
+    private long dataSize;
+
+    public Builder setVolumeName(String volume) {
+      this.volumeName = volume;
+      return this;
+    }
+
+    public Builder setBucketName(String bucket) {
+      this.bucketName = bucket;
+      return this;
+    }
+
+    public Builder setKeyName(String key) {
+      this.keyName = key;
+      return this;
+    }
+
+    public Builder setDataSize(long size) {
+      this.dataSize = size;
+      return this;
+    }
+
+    public KsmKeyArgs build() {
+      return new KsmKeyArgs(volumeName, bucketName, keyName, dataSize);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
new file mode 100644
index 0000000..76d22be
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ksm.helpers;
+
+
+import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
+
+/**
+ * Args for key block. The block instance for the key requested in putKey.
+ * This is returned from KSM to client, and client use class to talk to
+ * datanode. Also, this is the metadata written to ksm.db on server side.
+ */
+public final class KsmKeyInfo {
+  private final String volumeName;
+  private final String bucketName;
+  // name of key client specified
+  private final String keyName;
+  private final String containerName;
+  // name of the block id SCM assigned for the key
+  private final String blockID;
+  private final long dataSize;
+  private final boolean shouldCreateContainer;
+
+  private KsmKeyInfo(String volumeName, String bucketName, String keyName,
+      long dataSize, String blockID, String containerName,
+      boolean shouldCreateContainer) {
+    this.volumeName = volumeName;
+    this.bucketName = bucketName;
+    this.keyName = keyName;
+    this.containerName = containerName;
+    this.blockID = blockID;
+    this.dataSize = dataSize;
+    this.shouldCreateContainer = shouldCreateContainer;
+  }
+
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  public String getBucketName() {
+    return bucketName;
+  }
+
+  public String getKeyName() {
+    return keyName;
+  }
+
+  public String getBlockID() {
+    return blockID;
+  }
+
+  public String getContainerName() {
+    return containerName;
+  }
+
+  public long getDataSize() {
+    return dataSize;
+  }
+
+  public boolean getShouldCreateContainer() {
+    return shouldCreateContainer;
+  }
+
+  /**
+   * Builder of KsmKeyInfo.
+   */
+  public static class Builder {
+    private String volumeName;
+    private String bucketName;
+    private String keyName;
+    private String containerName;
+    private String blockID;
+    private long dataSize;
+    private boolean shouldCreateContainer;
+
+    public Builder setVolumeName(String volume) {
+      this.volumeName = volume;
+      return this;
+    }
+
+    public Builder setBucketName(String bucket) {
+      this.bucketName = bucket;
+      return this;
+    }
+
+    public Builder setKeyName(String key) {
+      this.keyName = key;
+      return this;
+    }
+
+    public Builder setBlockID(String block) {
+      this.blockID = block;
+      return this;
+    }
+
+    public Builder setContainerName(String container) {
+      this.containerName = container;
+      return this;
+    }
+
+    public Builder setDataSize(long size) {
+      this.dataSize = size;
+      return this;
+    }
+
+    public Builder setShouldCreateContainer(boolean create) {
+      this.shouldCreateContainer = create;
+      return this;
+    }
+
+    public KsmKeyInfo build() {
+      return new KsmKeyInfo(
+          volumeName, bucketName, keyName, dataSize, blockID, containerName,
+          shouldCreateContainer);
+    }
+  }
+
+  public KeyInfo getProtobuf() {
+    return KeyInfo.newBuilder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(dataSize)
+        .setBlockKey(blockID)
+        .setContainerName(containerName)
+        .setShouldCreateContainer(shouldCreateContainer)
+        .build();
+  }
+
+  public static KsmKeyInfo getFromProtobuf(KeyInfo keyInfo) {
+    return new KsmKeyInfo(
+        keyInfo.getVolumeName(),
+        keyInfo.getBucketName(),
+        keyInfo.getKeyName(),
+        keyInfo.getDataSize(),
+        keyInfo.getBlockKey(),
+        keyInfo.getContainerName(),
+        keyInfo.getShouldCreateContainer());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
index 8d52c01..4a759da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.ksm.protocol;
 
 import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import java.io.IOException;
 import java.util.List;
@@ -113,4 +115,9 @@ public interface KeySpaceManagerProtocol {
   KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
       throws IOException;
 
+  /**
+   * Allocate a block to a container, the block is returned to the client.
+   */
+  KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
index d5bbd37..5fb2844 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
 import org.apache.hadoop.ozone.protocol.proto
@@ -40,6 +42,12 @@ import org.apache.hadoop.ozone.protocol.proto
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateVolumeResponse;
 import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateKeyResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.SetVolumePropertyResponse;
@@ -314,6 +322,36 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
 
 
   /**
+   * Allocate a block for a key, then use the returned meta info to talk to data
+   * node to actually write the key.
+   * @param args the args for the key to be allocated
+   * @return a handler to the key, returned client
+   * @throws IOException
+   */
+  @Override
+  public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
+    CreateKeyRequest.Builder req = CreateKeyRequest.newBuilder();
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(args.getVolumeName())
+        .setBucketName(args.getBucketName())
+        .setKeyName(args.getKeyName())
+        .setDataSize(args.getDataSize()).build();
+    req.setKeyArgs(keyArgs);
+
+    final CreateKeyResponse resp;
+    try {
+      resp = rpcProxy.createKey(NULL_RPC_CONTROLLER, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    if (resp.getStatus() != Status.OK) {
+      throw new IOException("Get key block failed, error:" +
+          resp.getStatus());
+    }
+    return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());
+  }
+
+  /**
    * Return the proxy object underlying this protocol translator.
    *
    * @return the proxy object underlying this protocol translator.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index d76565b..9553171 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -165,7 +165,7 @@ public final class ScmConfigKeys {
 
   public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE =
       "ozone.scm.container.provision_batch_size";
-  public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 10;
+  public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1;
 
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
index a54ad40..a5d09e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
@@ -210,6 +210,33 @@ message InfoBucketResponse {
 
 }
 
+
+message KeyArgs {
+    required string volumeName = 1;
+    required string bucketName = 2;
+    required string keyName = 3;
+    required uint64 dataSize = 4;
+}
+
+message KeyInfo {
+    required string volumeName = 1;
+    required string bucketName = 2;
+    required string keyName = 3;
+    required uint64 dataSize = 4;
+    required string blockKey = 5;
+    required string containerName = 6;
+    required bool shouldCreateContainer = 7;
+}
+
+message CreateKeyRequest {
+    required KeyArgs keyArgs = 1;
+}
+
+message CreateKeyResponse {
+    required Status status = 1;
+    optional KeyInfo keyInfo = 2;
+}
+
 /**
  The KSM service that takes care of Ozone namespace.
 */
@@ -261,4 +288,10 @@ service KeySpaceManagerService {
     */
     rpc infoBucket(InfoBucketRequest)
         returns(InfoBucketResponse);
+
+    /**
+        Get key block.
+    */
+    rpc createKey(CreateKeyRequest)
+        returns(CreateKeyResponse);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
index 7979b7f..37cbb64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
@@ -33,6 +33,7 @@ public class KSMMetrics {
   private @Metric MutableCounterLong numVolumeInfos;
   private @Metric MutableCounterLong numBucketCreates;
   private @Metric MutableCounterLong numBucketInfos;
+  private @Metric MutableCounterLong numKeyBlockAllocate;
 
   // Failure Metrics
   private @Metric MutableCounterLong numVolumeCreateFails;
@@ -40,6 +41,7 @@ public class KSMMetrics {
   private @Metric MutableCounterLong numVolumeInfoFails;
   private @Metric MutableCounterLong numBucketCreateFails;
   private @Metric MutableCounterLong numBucketInfoFails;
+  private @Metric MutableCounterLong numKeyBlockAllocateFails;
 
   public KSMMetrics() {
   }
@@ -91,6 +93,14 @@ public class KSMMetrics {
     numBucketInfoFails.incr();
   }
 
+  public void incNumKeyBlockAllocates() {
+    numKeyBlockAllocate.incr();
+  }
+
+  public void incNumKeyBlockAllocateFails() {
+    numKeyBlockAllocateFails.incr();
+  }
+
   @VisibleForTesting
   public long getNumVolumeCreates() {
     return numVolumeCreates.value();
@@ -141,4 +151,13 @@ public class KSMMetrics {
     return numBucketInfoFails.value();
   }
 
+  @VisibleForTesting
+  public long getNumKeyBlockAllocates() {
+    return numKeyBlockAllocate.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyBlockAllocateFailes() {
+    return numKeyBlockAllocateFails.value();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
new file mode 100644
index 0000000..466de27
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+
+import java.io.IOException;
+
+/**
+ * Handles key level commands.
+ */
+public interface KeyManager {
+  /**
+   * Given the args of a key to put, return a pipeline for the key. Writes
+   * the key to pipeline mapping to meta data.
+   *
+   * Note that this call only allocate a block for key, and adds the
+   * corresponding entry to metadata. The block will be returned to client side
+   * handler DistributedStorageHandler. Which will make another call to
+   * datanode to create container (if needed) and writes the key.
+   *
+   * In case that the container creation or key write failed on
+   * DistributedStorageHandler, this key's metadata will still stay in KSM.
+   *
+   * @param args the args of the key provided by client.
+   * @return a KsmKeyInfo instance client uses to talk to container.
+   * @throws Exception
+   */
+  KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
new file mode 100644
index 0000000..ee06745
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
+import org.iq80.leveldb.DBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Implementation of keyManager.
+ */
+public class KeyManagerImpl implements KeyManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(KeyManagerImpl.class);
+
+  /**
+   * A SCM block client, used to talk to SCM to allocate block during putKey.
+   */
+  private final ScmBlockLocationProtocol scmBlockClient;
+  private final MetadataManager metadataManager;
+
+  public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
+      MetadataManager metadataManager) {
+    this.scmBlockClient = scmBlockClient;
+    this.metadataManager = metadataManager;
+  }
+
+  @Override
+  public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
+    Preconditions.checkNotNull(args);
+    metadataManager.writeLock().lock();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+    try {
+      byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
+      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+      byte[] keyKey = metadataManager.getDBKeyForKey(
+          volumeName, bucketName, keyName);
+
+      //Check if the volume exists
+      if(metadataManager.get(volumeKey) == null) {
+        LOG.error("volume not found: {}", volumeName);
+        throw new KSMException("Volume not found",
+            KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+      }
+      //Check if bucket already exists
+      if(metadataManager.get(bucketKey) == null) {
+        LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
+        throw new KSMException("Bucket not found",
+            KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+      }
+      // TODO throw exception if key exists, may change to support key
+      // overwrite in the future
+      //Check if key already exists.
+      if(metadataManager.get(keyKey) != null) {
+        LOG.error("key already exist: {}/{}/{} ", volumeName, bucketName,
+            keyName);
+        throw new KSMException("Key already exist",
+            KSMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS);
+      }
+
+      AllocatedBlock allocatedBlock =
+          scmBlockClient.allocateBlock(args.getDataSize());
+      KsmKeyInfo keyBlock = new KsmKeyInfo.Builder()
+          .setVolumeName(args.getVolumeName())
+          .setBucketName(args.getBucketName())
+          .setKeyName(args.getKeyName())
+          .setDataSize(args.getDataSize())
+          .setBlockID(allocatedBlock.getKey())
+          .setContainerName(allocatedBlock.getPipeline().getContainerName())
+          .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+          .build();
+      metadataManager.put(keyKey, keyBlock.getProtobuf().toByteArray());
+      LOG.debug("Key {} allocated in volume {} bucket {}",
+          keyName, volumeName, bucketName);
+      return keyBlock;
+    } catch (DBException ex) {
+      LOG.error("Key allocation failed for volume:{} bucket:{} key:{}",
+          volumeName, bucketName, keyName, ex);
+      throw new KSMException(ex.getMessage(),
+          KSMException.ResultCodes.FAILED_INTERNAL_ERROR);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index ba3d2c3..6682301 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -20,17 +20,25 @@ package org.apache.hadoop.ozone.ksm;
 import com.google.protobuf.BlockingService;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
 import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.protocolPB
     .KeySpaceManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.scm.StorageContainerManager;
+import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +71,7 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
   private final MetadataManager metadataManager;
   private final VolumeManager volumeManager;
   private final BucketManager bucketManager;
+  private final KeyManager keyManager;
   private final KSMMetrics metrics;
 
   public KeySpaceManager(OzoneConfiguration conf) throws IOException {
@@ -85,6 +94,31 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
     volumeManager = new VolumeManagerImpl(metadataManager, conf);
     bucketManager = new BucketManagerImpl(metadataManager);
     metrics = KSMMetrics.create();
+    keyManager = new KeyManagerImpl(getScmBlockClient(conf), metadataManager);
+  }
+
+  /**
+   * Create a scm block client, used by putKey() and getKey().
+   *
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  private ScmBlockLocationProtocol getScmBlockClient(OzoneConfiguration conf)
+      throws IOException {
+    RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long scmVersion =
+        RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
+    InetSocketAddress scmBlockAddress =
+        OzoneClientUtils.getScmAddressForBlockClients(conf);
+    ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
+        new ScmBlockLocationProtocolClientSideTranslatorPB(
+            RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
+                scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
+                NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+    return scmBlockLocationClient;
   }
 
   /**
@@ -373,4 +407,20 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
     }
   }
 
+  /**
+   * Allocate a key block.
+   * @param args - attributes of the key.
+   * @return
+   * @throws IOException
+   */
+  @Override
+  public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
+    try {
+      metrics.incNumKeyBlockAllocates();
+      return keyManager.allocateKey(args);
+    } catch (Exception ex) {
+      metrics.incNumKeyBlockAllocateFails();
+      throw ex;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
index 407d46a..78d0193 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
@@ -95,4 +95,13 @@ public interface MetadataManager {
    * @param bucket - Bucket name
    */
   byte[] getBucketKey(String volume, String bucket);
+
+  /**
+   * Given a volume, bucket and a key, return the corresponding DB key.
+   * @param volume - volume name
+   * @param bucket - bucket name
+   * @param key - key name
+   * @return bytes of DB key.
+   */
+  byte[] getDBKeyForKey(String volume, String bucket, String key);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
index 0a91bc0..fdd035a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
@@ -105,6 +105,14 @@ public class MetadataManagerImpl implements  MetadataManager {
     return DFSUtil.string2Bytes(bucketKeyString);
   }
 
+  @Override
+  public byte[] getDBKeyForKey(String volume, String bucket, String key) {
+    String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
+        + OzoneConsts.KSM_BUCKET_PREFIX + bucket + OzoneConsts.KSM_KEY_PREFIX
+        + key;
+    return DFSUtil.string2Bytes(keyKeyString);
+  }
+
   /**
    * Returns the read lock used on Metadata DB.
    * @return readLock

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
index 63c42e8..0deb7d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
@@ -102,6 +102,7 @@ public class KSMException extends IOException {
     FAILED_USER_NOT_FOUND,
     FAILED_BUCKET_ALREADY_EXISTS,
     FAILED_BUCKET_NOT_FOUND,
+    FAILED_KEY_ALREADY_EXISTS,
     FAILED_INTERNAL_ERROR
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
index 33c4af4..634245c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocolPB;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
 import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
@@ -36,6 +38,12 @@ import org.apache.hadoop.ozone.protocol.proto
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateVolumeResponse;
 import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateKeyResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.SetVolumePropertyResponse;
@@ -211,4 +219,27 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
     }
     return resp.build();
   }
+
+  @Override
+  public CreateKeyResponse createKey(
+      RpcController controller, CreateKeyRequest request
+  ) throws ServiceException {
+    CreateKeyResponse.Builder resp =
+        CreateKeyResponse.newBuilder();
+    try {
+      KeyArgs keyArgs = request.getKeyArgs();
+      KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
+          .setVolumeName(keyArgs.getVolumeName())
+          .setBucketName(keyArgs.getBucketName())
+          .setKeyName(keyArgs.getKeyName())
+          .setDataSize(keyArgs.getDataSize())
+          .build();
+      KsmKeyInfo keyInfo = impl.allocateKey(ksmKeyArgs);
+      resp.setKeyInfo(keyInfo.getProtobuf());
+      resp.setStatus(Status.OK);
+    } catch (IOException e) {
+      resp.setStatus(exceptionToResponseStatus(e));
+    }
+    return resp.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index 3e36593..80027db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -202,7 +202,7 @@ public class BlockManagerImpl implements BlockManager {
    */
   @Override
   public AllocatedBlock allocateBlock(final long size) throws IOException {
-    boolean createContainer = false;
+    boolean createContainer;
     Pipeline pipeline;
     if (size < 0 || size > containerSize) {
       throw new SCMException("Unsupported block size",
@@ -223,11 +223,13 @@ public class BlockManagerImpl implements BlockManager {
           throw new SCMException("Unable to allocate container for the block",
               FAILED_TO_ALLOCATE_CONTAINER);
         }
+          createContainer = true;
       } else {
         candidates = openContainers.entrySet().parallelStream()
             .filter(e -> (e.getValue() + size < containerSize))
             .map(e -> e.getKey())
             .collect(Collectors.toList());
+        createContainer = false;
       }
 
       if (candidates.size() == 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index a7d1fdc..5b89b16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset
     .LengthInputStream;
 import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocolPB
     .KeySpaceManagerProtocolClientSideTranslatorPB;
@@ -53,6 +55,7 @@ import org.apache.hadoop.ozone.web.response.*;
 import org.apache.hadoop.scm.XceiverClientSpi;
 import org.apache.hadoop.scm.storage.ChunkInputStream;
 import org.apache.hadoop.scm.storage.ChunkOutputStream;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -279,15 +282,29 @@ public final class DistributedStorageHandler implements StorageHandler
{
   @Override
   public OutputStream newKeyWriter(KeyArgs args) throws IOException,
       OzoneException {
+    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+        .setVolumeName(args.getVolumeName())
+        .setBucketName(args.getBucketName())
+        .setKeyName(args.getKeyName())
+        .setDataSize(args.getSize())
+        .build();
+    // contact KSM to allocate a block for key.
     String containerKey = buildContainerKey(args.getVolumeName(),
         args.getBucketName(), args.getKeyName());
-    KeyInfo key = new KeyInfo();
-    key.setKeyName(args.getKeyName());
-    key.setCreatedOn(dateToString(new Date()));
-    XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
-    return new ChunkOutputStream(containerKey, key.getKeyName(),
-        xceiverClientManager, xceiverClient, args.getRequestID(),
-        chunkSize);
+    KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
+    // TODO the following createContainer and key writes may fail, in which
+    // case we should revert the above allocateKey to KSM.
+    String containerName = keyInfo.getContainerName();
+    XceiverClientSpi xceiverClient = getContainer(containerName);
+    if (keyInfo.getShouldCreateContainer()) {
+      LOG.debug("Need to create container {} for key: {}/{}/{}", containerName,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+      ContainerProtocolCalls.createContainer(
+          xceiverClient, args.getRequestID());
+    }
+    // establish a connection to the container to write the key
+    return new ChunkOutputStream(containerKey, args.getKeyName(),
+        xceiverClientManager, xceiverClient, args.getRequestID(), chunkSize);
   }
 
   @Override
@@ -334,6 +351,13 @@ public final class DistributedStorageHandler implements StorageHandler
{
     throw new UnsupportedOperationException("listKeys not implemented");
   }
 
+  private XceiverClientSpi getContainer(String containerName)
+      throws IOException {
+    Pipeline pipeline =
+        storageContainerLocationClient.getContainer(containerName);
+    return xceiverClientManager.acquireClient(pipeline);
+  }
+
   /**
    * Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline}
    * of nodes capable of serving container protocol operations.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e641bee7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
index db96ca7..df702ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.ksm;
 
 
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -25,12 +26,14 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
 import org.apache.hadoop.ozone.web.handlers.UserArgs;
 import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
 import org.apache.hadoop.ozone.web.request.OzoneQuota;
 import org.apache.hadoop.ozone.web.response.BucketInfo;
 import org.apache.hadoop.ozone.web.response.VolumeInfo;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -38,6 +41,8 @@ import org.junit.Test;
 
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedList;
 import java.util.Random;
 
 /**
@@ -65,7 +70,8 @@ public class TestKeySpaceManager {
     cluster = new MiniOzoneCluster.Builder(conf)
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
     storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
-    userArgs = new UserArgs(null, null, null, null, null, null);
+    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
     ksmMetrics = cluster.getKeySpaceManager().getMetrics();
   }
 
@@ -190,4 +196,33 @@ public class TestKeySpaceManager {
     Assert.assertEquals(0, ksmMetrics.getNumBucketCreateFails());
     Assert.assertEquals(0, ksmMetrics.getNumBucketInfoFails());
   }
+
+  @Test
+  public void testGetKeyWriter() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+    Assert.assertEquals(0, ksmMetrics.getNumKeyBlockAllocates());
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String dataString = RandomStringUtils.randomAscii(100);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(4096);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    Assert.assertEquals(1, ksmMetrics.getNumKeyBlockAllocates());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message