hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject hadoop git commit: HDFS-11004. Ozone : move Chunk IO and container protocol calls to hdfs-client. Contributed by Chen Liang.
Date Thu, 13 Oct 2016 23:40:02 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 1fc744c6c -> c70775aff


HDFS-11004. Ozone : move Chunk IO and container protocol calls to hdfs-client. Contributed by Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c70775af
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c70775af
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c70775af

Branch: refs/heads/HDFS-7240
Commit: c70775aff6113a3bbaa237923fad3c21a73a7793
Parents: 1fc744c
Author: Anu Engineer <aengineer@apache.org>
Authored: Thu Oct 13 16:34:29 2016 -0700
Committer: Anu Engineer <aengineer@apache.org>
Committed: Thu Oct 13 16:34:29 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/scm/ScmConfigKeys.java    |   3 +
 .../hadoop/scm/storage/ChunkInputStream.java    | 191 ++++++++++++++++
 .../hadoop/scm/storage/ChunkOutputStream.java   | 222 +++++++++++++++++++
 .../scm/storage/ContainerProtocolCalls.java     | 190 ++++++++++++++++
 .../apache/hadoop/scm/storage/package-info.java |  23 ++
 .../ozone/web/storage/ChunkInputStream.java     | 193 ----------------
 .../ozone/web/storage/ChunkOutputStream.java    | 219 ------------------
 .../web/storage/ContainerProtocolCalls.java     | 198 -----------------
 .../web/storage/DistributedStorageHandler.java  |  22 +-
 9 files changed, 641 insertions(+), 620 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index a1b2393..44414ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -29,4 +29,7 @@ public final class ScmConfigKeys {
   public static final String DFS_CONTAINER_IPC_PORT =
       "dfs.container.ipc";
   public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
+
+  // TODO : this is copied from OzoneConsts, may need to move to a better place
+  public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java
new file mode 100644
index 0000000..1206ecd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.scm.storage;
+
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.scm.XceiverClient;
+import org.apache.hadoop.scm.XceiverClientManager;
+
+/**
+ * An {@link InputStream} used by the REST service in combination with the
+ * SCMClient to read the value of a key from a sequence
+ * of container chunks.  All bytes of the key value are stored in container
+ * chunks.  Each chunk may contain multiple underlying {@link ByteBuffer}
+ * instances.  This class encapsulates all state management for iterating
+ * through the sequence of chunks and the sequence of buffers within each chunk.
+ */
+public class ChunkInputStream extends InputStream {
+
+  private static final int EOF = -1;
+
+  private final String key;
+  private final String traceID;
+  private XceiverClientManager xceiverClientManager;
+  private XceiverClient xceiverClient;
+  private List<ChunkInfo> chunks;
+  private int chunkOffset;
+  private List<ByteBuffer> buffers;
+  private int bufferOffset;
+
+  /**
+   * Creates a new ChunkInputStream.
+   *
+   * @param key chunk key
+   * @param xceiverClientManager client manager that controls client
+   * @param xceiverClient client to perform container calls
+   * @param chunks list of chunks to read
+   * @param traceID container protocol call traceID
+   */
+  public ChunkInputStream(String key, XceiverClientManager xceiverClientManager,
+      XceiverClient xceiverClient, List<ChunkInfo> chunks, String traceID) {
+    this.key = key;
+    this.traceID = traceID;
+    this.xceiverClientManager = xceiverClientManager;
+    this.xceiverClient = xceiverClient;
+    this.chunks = chunks;
+    this.chunkOffset = 0;
+    this.buffers = null;
+    this.bufferOffset = 0;
+  }
+
+  @Override
+  public synchronized int read()
+      throws IOException {
+    checkOpen();
+    int available = prepareRead(1);
+    return available == EOF ? EOF : buffers.get(bufferOffset).get();
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    // According to the JavaDocs for InputStream, it is recommended that
+    // subclasses provide an override of bulk read if possible for performance
+    // reasons.  In addition to performance, we need to do it for correctness
+    // reasons.  The Ozone REST service uses PipedInputStream and
+    // PipedOutputStream to relay HTTP response data between a Jersey thread and
+    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
+    // have a subtle dependency (bug?) on the wrapped stream providing separate
+    // implementations of single-byte read and bulk read.  Without this, get key
+    // responses might close the connection before writing all of the bytes
+    // advertised in the Content-Length.
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return 0;
+    }
+    checkOpen();
+    int available = prepareRead(len);
+    if (available == EOF) {
+      return EOF;
+    }
+    buffers.get(bufferOffset).get(b, off, available);
+    return available;
+  }
+
+  @Override
+  public synchronized void close() {
+    if (xceiverClientManager != null && xceiverClient != null) {
+      xceiverClientManager.releaseClient(xceiverClient);
+      xceiverClientManager = null;
+      xceiverClient = null;
+    }
+  }
+
+  /**
+   * Checks if the stream is open.  If not, throws an exception.
+   *
+   * @throws IOException if stream is closed
+   */
+  private synchronized void checkOpen() throws IOException {
+    if (xceiverClient == null) {
+      throw new IOException("ChunkInputStream has been closed.");
+    }
+  }
+
+  /**
+   * Prepares to read by advancing through chunks and buffers as needed until it
+   * finds data to return or encounters EOF.
+   *
+   * @param len desired length of data to read
+   * @return length of data available to read, possibly less than desired length
+   */
+  private synchronized int prepareRead(int len) throws IOException {
+    for (;;) {
+      if (chunks == null || chunks.isEmpty()) {
+        // This must be an empty key.
+        return EOF;
+      } else if (buffers == null) {
+        // The first read triggers fetching the first chunk.
+        readChunkFromContainer(0);
+      } else if (!buffers.isEmpty() &&
+          buffers.get(bufferOffset).hasRemaining()) {
+        // Data is available from the current buffer.
+        ByteBuffer bb = buffers.get(bufferOffset);
+        return len > bb.remaining() ? bb.remaining() : len;
+      } else if (!buffers.isEmpty() &&
+          !buffers.get(bufferOffset).hasRemaining() &&
+          bufferOffset < buffers.size() - 1) {
+        // There are additional buffers available.
+        ++bufferOffset;
+      } else if (chunkOffset < chunks.size() - 1) {
+        // There are additional chunks available.
+        readChunkFromContainer(chunkOffset + 1);
+      } else {
+        // All available input has been consumed.
+        return EOF;
+      }
+    }
+  }
+
+  /**
+   * Attempts to read the chunk at the specified offset in the chunk list.  If
+   * successful, then the data of the read chunk is saved so that its bytes can
+   * be returned from subsequent read calls.
+   *
+   * @param readChunkOffset offset in the chunk list of which chunk to read
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  private synchronized void readChunkFromContainer(int readChunkOffset)
+      throws IOException {
+    final ReadChunkResponseProto readChunkResponse;
+    try {
+      readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset),
+          key, traceID);
+    } catch (IOException e) {
+      throw new IOException("Unexpected OzoneException", e);
+    }
+    chunkOffset = readChunkOffset;
+    ByteString byteString = readChunkResponse.getData();
+    buffers = byteString.asReadOnlyByteBufferList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
new file mode 100644
index 0000000..0126e58
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.scm.storage;
+
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.scm.ScmConfigKeys;
+import org.apache.hadoop.scm.XceiverClient;
+import org.apache.hadoop.scm.XceiverClientManager;
+
+/**
+ * An {@link OutputStream} used by the REST service in combination with the
+ * SCMClient to write the value of a key to a sequence
+ * of container chunks.  Writes are buffered locally and periodically written to
+ * the container as a new chunk.  In order to preserve the semantics that
+ * replacement of a pre-existing key is atomic, each instance of the stream has
+ * an internal unique identifier.  This unique identifier and a monotonically
+ * increasing chunk index form a composite key that is used as the chunk name.
+ * After all data is written, a putKey call creates or updates the corresponding
+ * container key, and this call includes the full list of chunks that make up
+ * the key data.  The list of chunks is updated all at once.  Therefore, a
+ * concurrent reader never can see an intermediate state in which different
+ * chunks of data from different versions of the key data are interleaved.
+ * This class encapsulates all state management for buffering and writing
+ * through to the container.
+ */
+public class ChunkOutputStream extends OutputStream {
+
+  private final String containerKey;
+  private final String key;
+  private final String traceID;
+  private final KeyData.Builder containerKeyData;
+  private XceiverClientManager xceiverClientManager;
+  private XceiverClient xceiverClient;
+  private ByteBuffer buffer;
+  private final String streamId;
+  private int chunkIndex;
+
+  /**
+   * Creates a new ChunkOutputStream.
+   *
+   * @param containerKey container key
+   * @param key chunk key
+   * @param xceiverClientManager client manager that controls client
+   * @param xceiverClient client to perform container calls
+   * @param traceID container protocol call args
+   */
+  public ChunkOutputStream(String containerKey, String key,
+      XceiverClientManager xceiverClientManager, XceiverClient xceiverClient,
+      String traceID) {
+    this.containerKey = containerKey;
+    this.key = key;
+    this.traceID = traceID;
+    KeyValue keyValue = KeyValue.newBuilder()
+        .setKey("TYPE").setValue("KEY").build();
+    this.containerKeyData = KeyData.newBuilder()
+        .setContainerName(xceiverClient.getPipeline().getContainerName())
+        .setName(containerKey)
+        .addMetadata(keyValue);
+    this.xceiverClientManager = xceiverClientManager;
+    this.xceiverClient = xceiverClient;
+    this.buffer = ByteBuffer.allocate(ScmConfigKeys.CHUNK_SIZE);
+    this.streamId = UUID.randomUUID().toString();
+    this.chunkIndex = 0;
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    checkOpen();
+    int rollbackPosition = buffer.position();
+    int rollbackLimit = buffer.limit();
+    buffer.put((byte)b);
+    if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) {
+      flushBufferToChunk(rollbackPosition, rollbackLimit);
+    }
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+    checkOpen();
+    while (len > 0) {
+      int writeLen = Math.min(
+          ScmConfigKeys.CHUNK_SIZE - buffer.position(), len);
+      int rollbackPosition = buffer.position();
+      int rollbackLimit = buffer.limit();
+      buffer.put(b, off, writeLen);
+      if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) {
+        flushBufferToChunk(rollbackPosition, rollbackLimit);
+      }
+      off += writeLen;
+      len -= writeLen;
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    checkOpen();
+    if (buffer.position() > 0) {
+      int rollbackPosition = buffer.position();
+      int rollbackLimit = buffer.limit();
+      flushBufferToChunk(rollbackPosition, rollbackLimit);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (xceiverClientManager != null && xceiverClient != null &&
+        buffer != null) {
+      try {
+        if (buffer.position() > 0) {
+          writeChunkToContainer();
+        }
+        putKey(xceiverClient, containerKeyData.build(), traceID);
+      } catch (IOException e) {
+        throw new IOException("Unexpected Storage Container Exception", e);
+      } finally {
+        xceiverClientManager.releaseClient(xceiverClient);
+        xceiverClientManager = null;
+        xceiverClient = null;
+        buffer = null;
+      }
+    }
+
+  }
+
+  /**
+   * Checks if the stream is open.  If not, throws an exception.
+   *
+   * @throws IOException if stream is closed
+   */
+  private synchronized void checkOpen() throws IOException {
+    if (xceiverClient == null) {
+      throw new IOException("ChunkOutputStream has been closed.");
+    }
+  }
+
+  /**
+   * Attempts to flush buffered writes by writing a new chunk to the container.
+   * If successful, then clears the buffer to prepare to receive writes for a
+   * new chunk.
+   *
+   * @param rollbackPosition position to restore in buffer if write fails
+   * @param rollbackLimit limit to restore in buffer if write fails
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  private synchronized void flushBufferToChunk(int rollbackPosition,
+      int rollbackLimit) throws IOException {
+    boolean success = false;
+    try {
+      writeChunkToContainer();
+      success = true;
+    } finally {
+      if (success) {
+        buffer.clear();
+      } else {
+        buffer.position(rollbackPosition);
+        buffer.limit(rollbackLimit);
+      }
+    }
+  }
+
+  /**
+   * Writes buffered data as a new chunk to the container and saves chunk
+   * information to be used later in putKey call.
+   *
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  private synchronized void writeChunkToContainer() throws IOException {
+    buffer.flip();
+    ByteString data = ByteString.copyFrom(buffer);
+    ChunkInfo chunk = ChunkInfo
+        .newBuilder()
+        .setChunkName(
+            key + "_stream_" + streamId + "_chunk_" + ++chunkIndex)
+        .setOffset(0)
+        .setLen(data.size())
+        .build();
+    try {
+      writeChunk(xceiverClient, chunk, key, data, traceID);
+    } catch (IOException e) {
+      throw new IOException("Unexpected Storage Container Exception", e);
+    }
+    containerKeyData.addChunks(chunk);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
new file mode 100644
index 0000000..166b741
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.scm.storage;
+
+import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+
+import java.io.IOException;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto;
+import org.apache.hadoop.scm.XceiverClient;
+
+/**
+ * Implementation of all container protocol calls performed by
+ * .
+ */
+public final class ContainerProtocolCalls {
+
+  /**
+   * Calls the container protocol to get a container key.
+   *
+   * @param xceiverClient client to perform call
+   * @param containerKeyData key data to identify container
+   * @param traceID container protocol call args
+   * @return container protocol get key response
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  public static GetKeyResponseProto getKey(XceiverClient xceiverClient,
+      KeyData containerKeyData, String traceID) throws IOException {
+    GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyData(containerKeyData);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.GetKey)
+        .setTraceID(traceID)
+        .setGetKey(readKeyRequest)
+        .build();
+    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
+    validateContainerResponse(response, traceID);
+    return response.getGetKey();
+  }
+
+  /**
+   * Calls the container protocol to put a container key.
+   *
+   * @param xceiverClient client to perform call
+   * @param containerKeyData key data to identify container
+   * @param traceID container protocol call args
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  public static void putKey(XceiverClient xceiverClient,
+      KeyData containerKeyData, String traceID) throws IOException {
+    PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyData(containerKeyData);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.PutKey)
+        .setTraceID(traceID)
+        .setPutKey(createKeyRequest)
+        .build();
+    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
+    validateContainerResponse(response, traceID);
+  }
+
+  /**
+   * Calls the container protocol to read a chunk.
+   *
+   * @param xceiverClient client to perform call
+   * @param chunk information about chunk to read
+   * @param key the key name
+   * @param traceID container protocol call args
+   * @return container protocol read chunk response
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient,
+      ChunkInfo chunk, String key, String traceID)
+      throws IOException {
+    ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyName(key)
+        .setChunkData(chunk);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.ReadChunk)
+        .setTraceID(traceID)
+        .setReadChunk(readChunkRequest)
+        .build();
+    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
+    validateContainerResponse(response, traceID);
+    return response.getReadChunk();
+  }
+
+  /**
+   * Calls the container protocol to write a chunk.
+   *
+   * @param xceiverClient client to perform call
+   * @param chunk information about chunk to write
+   * @param key the key name
+   * @param data the data of the chunk to write
+   * @param traceID container protocol call args
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk,
+      String key, ByteString data, String traceID)
+      throws IOException {
+    WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyName(key)
+        .setChunkData(chunk)
+        .setData(data);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.WriteChunk)
+        .setTraceID(traceID)
+        .setWriteChunk(writeChunkRequest)
+        .build();
+    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
+    validateContainerResponse(response, traceID);
+  }
+
+  /**
+   * Validates a response from a container protocol call.  Any non-successful
+   * return code is mapped to a corresponding exception and thrown.
+   *
+   * @param response container protocol call response
+   * @param traceID container protocol call args
+   * @throws IOException if the container protocol call failed
+   */
+  private static void validateContainerResponse(
+      ContainerCommandResponseProto response, String traceID
+  ) throws IOException {
+    // TODO : throw the right type of exception
+    switch (response.getResult()) {
+    case SUCCESS:
+      break;
+    case MALFORMED_REQUEST:
+      throw new IOException(HTTP_BAD_REQUEST +
+          ":Bad container request: " + traceID);
+    case UNSUPPORTED_REQUEST:
+      throw new IOException(HTTP_INTERNAL_ERROR +
+          "Unsupported container request: " + traceID);
+    case CONTAINER_INTERNAL_ERROR:
+      throw new IOException(HTTP_INTERNAL_ERROR +
+          "Container internal error:" + traceID);
+    default:
+      throw new IOException(HTTP_INTERNAL_ERROR +
+          "Unrecognized container response:" + traceID);
+    }
+  }
+
+  /**
+   * There is no need to instantiate this class.
+   */
+  private ContainerProtocolCalls() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java
new file mode 100644
index 0000000..aa89af0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.scm.storage;
+
+/**
+ * This package contains StorageContainerManager classes.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java
deleted file mode 100644
index f639b4a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.web.storage;
-
-import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.scm.XceiverClient;
-import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.ozone.web.exceptions.OzoneException;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-
-/**
- * An {@link InputStream} used by the REST service in combination with the
- * {@link DistributedStorageHandler} to read the value of a key from a sequence
- * of container chunks.  All bytes of the key value are stored in container
- * chunks.  Each chunk may contain multiple underlying {@link ByteBuffer}
- * instances.  This class encapsulates all state management for iterating
- * through the sequence of chunks and the sequence of buffers within each chunk.
- */
-class ChunkInputStream extends InputStream {
-
-  private static final int EOF = -1;
-
-  private final String key;
-  private final UserArgs args;
-  private XceiverClientManager xceiverClientManager;
-  private XceiverClient xceiverClient;
-  private List<ChunkInfo> chunks;
-  private int chunkOffset;
-  private List<ByteBuffer> buffers;
-  private int bufferOffset;
-
-  /**
-   * Creates a new ChunkInputStream.
-   *
-   * @param key chunk key
-   * @param xceiverClientManager client manager that controls client
-   * @param xceiverClient client to perform container calls
-   * @param chunks list of chunks to read
-   * @param args container protocol call args
-   */
-  public ChunkInputStream(String key, XceiverClientManager xceiverClientManager,
-      XceiverClient xceiverClient, List<ChunkInfo> chunks, UserArgs args) {
-    this.key = key;
-    this.args = args;
-    this.xceiverClientManager = xceiverClientManager;
-    this.xceiverClient = xceiverClient;
-    this.chunks = chunks;
-    this.chunkOffset = 0;
-    this.buffers = null;
-    this.bufferOffset = 0;
-  }
-
-  @Override
-  public synchronized int read()
-      throws IOException {
-    checkOpen();
-    int available = prepareRead(1);
-    return available == EOF ? EOF : buffers.get(bufferOffset).get();
-  }
-
-  @Override
-  public synchronized int read(byte[] b, int off, int len) throws IOException {
-    // According to the JavaDocs for InputStream, it is recommended that
-    // subclasses provide an override of bulk read if possible for performance
-    // reasons.  In addition to performance, we need to do it for correctness
-    // reasons.  The Ozone REST service uses PipedInputStream and
-    // PipedOutputStream to relay HTTP response data between a Jersey thread and
-    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
-    // have a subtle dependency (bug?) on the wrapped stream providing separate
-    // implementations of single-byte read and bulk read.  Without this, get key
-    // responses might close the connection before writing all of the bytes
-    // advertised in the Content-Length.
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return 0;
-    }
-    checkOpen();
-    int available = prepareRead(len);
-    if (available == EOF) {
-      return EOF;
-    }
-    buffers.get(bufferOffset).get(b, off, available);
-    return available;
-  }
-
-  @Override
-  public synchronized void close() {
-    if (xceiverClientManager != null && xceiverClient != null) {
-      xceiverClientManager.releaseClient(xceiverClient);
-      xceiverClientManager = null;
-      xceiverClient = null;
-    }
-  }
-
-  /**
-   * Checks if the stream is open.  If not, throws an exception.
-   *
-   * @throws IOException if stream is closed
-   */
-  private synchronized void checkOpen() throws IOException {
-    if (xceiverClient == null) {
-      throw new IOException("ChunkInputStream has been closed.");
-    }
-  }
-
-  /**
-   * Prepares to read by advancing through chunks and buffers as needed until it
-   * finds data to return or encounters EOF.
-   *
-   * @param len desired length of data to read
-   * @return length of data available to read, possibly less than desired length
-   */
-  private synchronized int prepareRead(int len) throws IOException {
-    for (;;) {
-      if (chunks == null || chunks.isEmpty()) {
-        // This must be an empty key.
-        return EOF;
-      } else if (buffers == null) {
-        // The first read triggers fetching the first chunk.
-        readChunkFromContainer(0);
-      } else if (!buffers.isEmpty() &&
-          buffers.get(bufferOffset).hasRemaining()) {
-        // Data is available from the current buffer.
-        ByteBuffer bb = buffers.get(bufferOffset);
-        return len > bb.remaining() ? bb.remaining() : len;
-      } else if (!buffers.isEmpty() &&
-          !buffers.get(bufferOffset).hasRemaining() &&
-          bufferOffset < buffers.size() - 1) {
-        // There are additional buffers available.
-        ++bufferOffset;
-      } else if (chunkOffset < chunks.size() - 1) {
-        // There are additional chunks available.
-        readChunkFromContainer(chunkOffset + 1);
-      } else {
-        // All available input has been consumed.
-        return EOF;
-      }
-    }
-  }
-
-  /**
-   * Attempts to read the chunk at the specified offset in the chunk list.  If
-   * successful, then the data of the read chunk is saved so that its bytes can
-   * be returned from subsequent read calls.
-   *
-   * @param readChunkOffset offset in the chunk list of which chunk to read
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  private synchronized void readChunkFromContainer(int readChunkOffset)
-      throws IOException {
-    final ReadChunkResponseProto readChunkResponse;
-    try {
-      readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset),
-          key, args);
-    } catch (OzoneException e) {
-      throw new IOException("Unexpected OzoneException", e);
-    }
-    chunkOffset = readChunkOffset;
-    ByteString byteString = readChunkResponse.getData();
-    buffers = byteString.asReadOnlyByteBufferList();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java
deleted file mode 100644
index 1796a69..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.web.storage;
-
-import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE;
-import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
-import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
-import org.apache.hadoop.scm.XceiverClient;
-import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.ozone.web.exceptions.OzoneException;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.response.KeyInfo;
-
-/**
- * An {@link OutputStream} used by the REST service in combination with the
- * {@link DistributedStorageHandler} to write the value of a key to a sequence
- * of container chunks.  Writes are buffered locally and periodically written to
- * the container as a new chunk.  In order to preserve the semantics that
- * replacement of a pre-existing key is atomic, each instance of the stream has
- * an internal unique identifier.  This unique identifier and a monotonically
- * increasing chunk index form a composite key that is used as the chunk name.
- * After all data is written, a putKey call creates or updates the corresponding
- * container key, and this call includes the full list of chunks that make up
- * the key data.  The list of chunks is updated all at once.  Therefore, a
- * concurrent reader never can see an intermediate state in which different
- * chunks of data from different versions of the key data are interleaved.
- * This class encapsulates all state management for buffering and writing
- * through to the container.
- */
-class ChunkOutputStream extends OutputStream {
-
-  private final String containerKey;
-  private final KeyInfo key;
-  private final UserArgs args;
-  private final KeyData.Builder containerKeyData;
-  private XceiverClientManager xceiverClientManager;
-  private XceiverClient xceiverClient;
-  private ByteBuffer buffer;
-  private final String streamId;
-  private int chunkIndex;
-
-  /**
-   * Creates a new ChunkOutputStream.
-   *
-   * @param containerKey container key
-   * @param key chunk key
-   * @param xceiverClientManager client manager that controls client
-   * @param xceiverClient client to perform container calls
-   * @param args container protocol call args
-   */
-  public ChunkOutputStream(String containerKey, KeyInfo key,
-      XceiverClientManager xceiverClientManager, XceiverClient xceiverClient,
-      UserArgs args) {
-    this.containerKey = containerKey;
-    this.key = key;
-    this.args = args;
-    this.containerKeyData = fromKeyToContainerKeyDataBuilder(
-        xceiverClient.getPipeline().getContainerName(), containerKey, key);
-    this.xceiverClientManager = xceiverClientManager;
-    this.xceiverClient = xceiverClient;
-    this.buffer = ByteBuffer.allocate(CHUNK_SIZE);
-    this.streamId = UUID.randomUUID().toString();
-    this.chunkIndex = 0;
-  }
-
-  @Override
-  public synchronized void write(int b) throws IOException {
-    checkOpen();
-    int rollbackPosition = buffer.position();
-    int rollbackLimit = buffer.limit();
-    buffer.put((byte)b);
-    if (buffer.position() == CHUNK_SIZE) {
-      flushBufferToChunk(rollbackPosition, rollbackLimit);
-    }
-  }
-
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if ((off < 0) || (off > b.length) || (len < 0) ||
-        ((off + len) > b.length) || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return;
-    }
-    checkOpen();
-    while (len > 0) {
-      int writeLen = Math.min(CHUNK_SIZE - buffer.position(), len);
-      int rollbackPosition = buffer.position();
-      int rollbackLimit = buffer.limit();
-      buffer.put(b, off, writeLen);
-      if (buffer.position() == CHUNK_SIZE) {
-        flushBufferToChunk(rollbackPosition, rollbackLimit);
-      }
-      off += writeLen;
-      len -= writeLen;
-    }
-  }
-
-  @Override
-  public synchronized void flush() throws IOException {
-    checkOpen();
-    if (buffer.position() > 0) {
-      int rollbackPosition = buffer.position();
-      int rollbackLimit = buffer.limit();
-      flushBufferToChunk(rollbackPosition, rollbackLimit);
-    }
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    if (xceiverClientManager != null && xceiverClient != null &&
-        buffer != null) {
-      try {
-        if (buffer.position() > 0) {
-          writeChunkToContainer();
-        }
-        putKey(xceiverClient, containerKeyData.build(), args);
-      } catch (OzoneException e) {
-        throw new IOException("Unexpected OzoneException", e);
-      } finally {
-        xceiverClientManager.releaseClient(xceiverClient);
-        xceiverClientManager = null;
-        xceiverClient = null;
-        buffer = null;
-      }
-    }
-
-  }
-
-  /**
-   * Checks if the stream is open.  If not, throws an exception.
-   *
-   * @throws IOException if stream is closed
-   */
-  private synchronized void checkOpen() throws IOException {
-    if (xceiverClient == null) {
-      throw new IOException("ChunkOutputStream has been closed.");
-    }
-  }
-
-  /**
-   * Attempts to flush buffered writes by writing a new chunk to the container.
-   * If successful, then clears the buffer to prepare to receive writes for a
-   * new chunk.
-   *
-   * @param rollbackPosition position to restore in buffer if write fails
-   * @param rollbackLimit limit to restore in buffer if write fails
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  private synchronized void flushBufferToChunk(int rollbackPosition,
-      int rollbackLimit) throws IOException {
-    boolean success = false;
-    try {
-      writeChunkToContainer();
-      success = true;
-    } finally {
-      if (success) {
-        buffer.clear();
-      } else {
-        buffer.position(rollbackPosition);
-        buffer.limit(rollbackLimit);
-      }
-    }
-  }
-
-  /**
-   * Writes buffered data as a new chunk to the container and saves chunk
-   * information to be used later in putKey call.
-   *
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  private synchronized void writeChunkToContainer() throws IOException {
-    buffer.flip();
-    ByteString data = ByteString.copyFrom(buffer);
-    ChunkInfo chunk = ChunkInfo
-        .newBuilder()
-        .setChunkName(
-            key.getKeyName() + "_stream_" + streamId + "_chunk_" + ++chunkIndex)
-        .setOffset(0)
-        .setLen(data.size())
-        .build();
-    try {
-      writeChunk(xceiverClient, chunk, key.getKeyName(), data, args);
-    } catch (OzoneException e) {
-      throw new IOException("Unexpected OzoneException", e);
-    }
-    containerKeyData.addChunks(chunk);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java
deleted file mode 100644
index c683a74..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.web.storage;
-
-import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
-import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
-
-import java.io.IOException;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto;
-import org.apache.hadoop.scm.XceiverClient;
-import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
-import org.apache.hadoop.ozone.web.exceptions.OzoneException;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-
-/**
- * Implementation of all container protocol calls performed by
- * {@link DistributedStorageHandler}.
- */
-final class ContainerProtocolCalls {
-
-  /**
-   * Calls the container protocol to get a container key.
-   *
-   * @param xceiverClient client to perform call
-   * @param containerKeyData key data to identify container
-   * @param args container protocol call args
-   * @returns container protocol get key response
-   * @throws IOException if there is an I/O error while performing the call
-   * @throws OzoneException if the container protocol call failed
-   */
-  public static GetKeyResponseProto getKey(XceiverClient xceiverClient,
-      KeyData containerKeyData, UserArgs args) throws IOException,
-      OzoneException {
-    GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
-        .newBuilder()
-        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
-        .setKeyData(containerKeyData);
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .newBuilder()
-        .setCmdType(Type.GetKey)
-        .setTraceID(args.getRequestID())
-        .setGetKey(readKeyRequest)
-        .build();
-    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
-    validateContainerResponse(response, args);
-    return response.getGetKey();
-  }
-
-  /**
-   * Calls the container protocol to put a container key.
-   *
-   * @param xceiverClient client to perform call
-   * @param containerKeyData key data to identify container
-   * @param args container protocol call args
-   * @throws IOException if there is an I/O error while performing the call
-   * @throws OzoneException if the container protocol call failed
-   */
-  public static void putKey(XceiverClient xceiverClient,
-      KeyData containerKeyData, UserArgs args) throws IOException,
-      OzoneException {
-    PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
-        .newBuilder()
-        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
-        .setKeyData(containerKeyData);
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .newBuilder()
-        .setCmdType(Type.PutKey)
-        .setTraceID(args.getRequestID())
-        .setPutKey(createKeyRequest)
-        .build();
-    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
-    validateContainerResponse(response, args);
-  }
-
-  /**
-   * Calls the container protocol to read a chunk.
-   *
-   * @param xceiverClient client to perform call
-   * @param chunk information about chunk to read
-   * @param key the key name
-   * @param args container protocol call args
-   * @returns container protocol read chunk response
-   * @throws IOException if there is an I/O error while performing the call
-   * @throws OzoneException if the container protocol call failed
-   */
-  public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient,
-      ChunkInfo chunk, String key, UserArgs args)
-      throws IOException, OzoneException {
-    ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
-        .newBuilder()
-        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
-        .setKeyName(key)
-        .setChunkData(chunk);
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .newBuilder()
-        .setCmdType(Type.ReadChunk)
-        .setTraceID(args.getRequestID())
-        .setReadChunk(readChunkRequest)
-        .build();
-    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
-    validateContainerResponse(response, args);
-    return response.getReadChunk();
-  }
-
-  /**
-   * Calls the container protocol to write a chunk.
-   *
-   * @param xceiverClient client to perform call
-   * @param chunk information about chunk to write
-   * @param key the key name
-   * @param data the data of the chunk to write
-   * @param args container protocol call args
-   * @throws IOException if there is an I/O error while performing the call
-   * @throws OzoneException if the container protocol call failed
-   */
-  public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk,
-      String key, ByteString data, UserArgs args)
-      throws IOException, OzoneException {
-    WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
-        .newBuilder()
-        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
-        .setKeyName(key)
-        .setChunkData(chunk)
-        .setData(data);
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .newBuilder()
-        .setCmdType(Type.WriteChunk)
-        .setTraceID(args.getRequestID())
-        .setWriteChunk(writeChunkRequest)
-        .build();
-    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
-    validateContainerResponse(response, args);
-  }
-
-  /**
-   * Validates a response from a container protocol call.  Any non-successful
-   * return code is mapped to a corresponding exception and thrown.
-   *
-   * @param response container protocol call response
-   * @param args container protocol call args
-   * @throws OzoneException if the container protocol call failed
-   */
-  private static void validateContainerResponse(
-      ContainerCommandResponseProto response, UserArgs args)
-      throws OzoneException {
-    switch (response.getResult()) {
-    case SUCCESS:
-      break;
-    case MALFORMED_REQUEST:
-      throw ErrorTable.newError(new OzoneException(HTTP_BAD_REQUEST,
-          "badRequest", "Bad container request."), args);
-    case UNSUPPORTED_REQUEST:
-      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
-          "internalServerError", "Unsupported container request."), args);
-    case CONTAINER_INTERNAL_ERROR:
-      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
-          "internalServerError", "Container internal error."), args);
-    default:
-      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
-          "internalServerError", "Unrecognized container response."), args);
-    }
-  }
-
-  /**
-   * There is no need to instantiate this class.
-   */
-  private ContainerProtocolCalls() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c70775af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index 143d058..e8e5830 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.ozone.web.storage;
 
-import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*;
 import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
 
 import java.io.IOException;
@@ -57,6 +57,8 @@ import org.apache.hadoop.ozone.web.response.ListKeys;
 import org.apache.hadoop.ozone.web.response.ListVolumes;
 import org.apache.hadoop.ozone.web.response.VolumeInfo;
 import org.apache.hadoop.ozone.web.response.VolumeOwner;
+import org.apache.hadoop.scm.storage.ChunkInputStream;
+import org.apache.hadoop.scm.storage.ChunkOutputStream;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -95,7 +97,7 @@ public final class DistributedStorageHandler implements StorageHandler {
       volume.setCreatedBy(args.getAdminName());
       KeyData containerKeyData = fromVolumeToContainerKeyData(
           xceiverClient.getPipeline().getContainerName(), containerKey, volume);
-      putKey(xceiverClient, containerKeyData, args);
+      putKey(xceiverClient, containerKeyData, args.getRequestID());
     } finally {
       xceiverClientManager.releaseClient(xceiverClient);
     }
@@ -140,7 +142,7 @@ public final class DistributedStorageHandler implements StorageHandler {
       KeyData containerKeyData = containerKeyDataForRead(
           xceiverClient.getPipeline().getContainerName(), containerKey);
       GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
-          args);
+          args.getRequestID());
       return fromContainerKeyValueListToVolume(
           response.getKeyData().getMetadataList());
     } finally {
@@ -163,7 +165,7 @@ public final class DistributedStorageHandler implements StorageHandler {
       bucket.setStorageType(args.getStorageType());
       KeyData containerKeyData = fromBucketToContainerKeyData(
           xceiverClient.getPipeline().getContainerName(), containerKey, bucket);
-      putKey(xceiverClient, containerKeyData, args);
+      putKey(xceiverClient, containerKeyData, args.getRequestID());
     } finally {
       xceiverClientManager.releaseClient(xceiverClient);
     }
@@ -218,7 +220,7 @@ public final class DistributedStorageHandler implements StorageHandler {
       KeyData containerKeyData = containerKeyDataForRead(
           xceiverClient.getPipeline().getContainerName(), containerKey);
       GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
-          args);
+          args.getRequestID());
       return fromContainerKeyValueListToBucket(
           response.getKeyData().getMetadataList());
     } finally {
@@ -235,8 +237,8 @@ public final class DistributedStorageHandler implements StorageHandler {
     key.setKeyName(args.getKeyName());
     key.setCreatedOn(dateToString(new Date()));
     XceiverClient xceiverClient = acquireXceiverClient(containerKey);
-    return new ChunkOutputStream(containerKey, key, xceiverClientManager,
-        xceiverClient, args);
+    return new ChunkOutputStream(containerKey, key.getKeyName(),
+        xceiverClientManager, xceiverClient, args.getRequestID());
   }
 
   @Override
@@ -256,7 +258,7 @@ public final class DistributedStorageHandler implements StorageHandler {
       KeyData containerKeyData = containerKeyDataForRead(
           xceiverClient.getPipeline().getContainerName(), containerKey);
       GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
-          args);
+          args.getRequestID());
       long length = 0;
       List<ChunkInfo> chunks = response.getKeyData().getChunksList();
       for (ChunkInfo chunk : chunks) {
@@ -264,8 +266,8 @@ public final class DistributedStorageHandler implements StorageHandler {
       }
       success = true;
       return new LengthInputStream(new ChunkInputStream(
-          containerKey, xceiverClientManager, xceiverClient, chunks, args),
-          length);
+          containerKey, xceiverClientManager, xceiverClient,
+          chunks, args.getRequestID()), length);
     } finally {
       if (!success) {
         xceiverClientManager.releaseClient(xceiverClient);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message