hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [47/50] [abbrv] hadoop git commit: HDFS-9891. Ozone: Add container transport client. Contributed by Anu Engineer.
Date Fri, 11 Mar 2016 22:23:05 GMT
HDFS-9891. Ozone: Add container transport client. Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/979dfe4c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/979dfe4c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/979dfe4c

Branch: refs/heads/HDFS-7240
Commit: 979dfe4c2ef5775dfbe8dcbcb07d3138a7257707
Parents: b31a5d6
Author: Chris Nauroth <cnauroth@apache.org>
Authored: Tue Mar 8 10:29:27 2016 -0800
Committer: Arpit Agarwal <arp@apache.org>
Committed: Fri Mar 11 12:57:09 2016 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/protocol/DatanodeID.java |  53 ++++++++
 .../src/main/proto/hdfs.proto                   |   1 +
 .../hadoop-hdfs/CHANGES-HDFS-7240.txt           |   2 +
 .../ozone/container/helpers/Pipeline.java       | 132 +++++++++++++++++++
 .../transport/client/XceiverClient.java         | 122 +++++++++++++++++
 .../transport/client/XceiverClientHandler.java  | 112 ++++++++++++++++
 .../client/XceiverClientInitializer.java        |  68 ++++++++++
 .../ozone/container/ContainerTestHelper.java    | 103 +++++++++++++++
 .../transport/server/TestContainerServer.java   |  71 ++++++++--
 9 files changed, 650 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
index 5fd845d..30e946d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 
 /**
  * This class represents the primary identifier for a Datanode.
@@ -49,6 +50,7 @@ public class DatanodeID implements Comparable<DatanodeID> {
   private int infoSecurePort; // info server port
   private int ipcPort;       // IPC server port
   private String xferAddr;
+  private int containerPort; // container server port.
 
   /**
    * UUID identifying a given datanode. For upgraded Datanodes this is the
@@ -274,4 +276,55 @@ public class DatanodeID implements Comparable<DatanodeID> {
   public int compareTo(DatanodeID that) {
     return getXferAddr().compareTo(that.getXferAddr());
   }
+
+  /**
+   * Returns the container port.
+   * @return Port
+   */
+  public int getContainerPort() {
+    return containerPort;
+  }
+
+  /**
+   * Sets the container port.
+   * @param containerPort - container port.
+   */
+  public void setContainerPort(int containerPort) {
+    this.containerPort = containerPort;
+  }
+
+  /**
+   * Returns a DataNode ID from the protocol buffers.
+   *
+   * @param datanodeIDProto - protoBuf Message
+   * @return DataNodeID
+   */
+  public static DatanodeID getFromProtoBuf(HdfsProtos.DatanodeIDProto
+                                               datanodeIDProto) {
+    DatanodeID id = new DatanodeID(datanodeIDProto.getDatanodeUuid(),
+        datanodeIDProto.getIpAddr(), datanodeIDProto.getHostName(),
+        datanodeIDProto.getXferPort(), datanodeIDProto.getInfoPort(),
+        datanodeIDProto.getInfoSecurePort(), datanodeIDProto.getIpcPort());
+    id.setContainerPort(datanodeIDProto.getContainerPort());
+    return id;
+  }
+
+  /**
+   * Returns a DataNodeID protobuf message from a datanode ID.
+   * @return HdfsProtos.DatanodeIDProto
+   */
+  public  HdfsProtos.DatanodeIDProto getProtoBufMessage() {
+    HdfsProtos.DatanodeIDProto.Builder builder =
+        HdfsProtos.DatanodeIDProto.newBuilder();
+
+    return builder.setDatanodeUuid(this.getDatanodeUuid())
+        .setIpAddr(this.getIpcAddr())
+        .setHostName(this.getHostName())
+        .setXferPort(this.getXferPort())
+        .setInfoPort(this.getInfoPort())
+        .setInfoSecurePort(this.getInfoSecurePort())
+        .setIpcPort(this.getIpcPort())
+        .setContainerPort(this.getContainerPort())
+        .build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 0db8a3f..f9b875c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -58,6 +58,7 @@ message DatanodeIDProto {
   required uint32 infoPort = 5;  // datanode http port
   required uint32 ipcPort = 6;   // ipc server port
   optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port
+  optional uint32 containerPort = 8 [default = 0]; // Ozone container protocol
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
index 30f28d2..9f4fcf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
@@ -41,3 +41,5 @@
     (Anu Engineer via cnauroth)
 
     HDFS-9873. Ozone: Add container transport server (Anu Engineer via cnauroth)
+
+    HDFS-9891. Ozone: Add container transport client (Anu Engineer via cnauroth)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java
new file mode 100644
index 0000000..d1bcc8d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A pipeline represents the group of machines over which a container lives.
+ */
+public class Pipeline {
+  private String containerName;
+  private String leaderID;
+  private Map<String, DatanodeID> datanodes;
+
+  /**
+   * Constructs a new pipeline data structure.
+   *
+   * @param leaderID - First machine in this pipeline.
+   */
+  public Pipeline(String leaderID) {
+    this.leaderID = leaderID;
+    datanodes = new TreeMap<>();
+  }
+
+  /**
+   * Gets pipeline object from protobuf.
+   *
+   * @param pipeline - ProtoBuf definition for the pipeline.
+   * @return Pipeline Object
+   */
+  public static Pipeline getFromProtoBuf(ContainerProtos.Pipeline pipeline) {
+    Preconditions.checkNotNull(pipeline);
+    Pipeline newPipeline = new Pipeline(pipeline.getLeaderID());
+    for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) {
+      newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID));
+    }
+    if (pipeline.hasContainerName()) {
+      newPipeline.containerName = newPipeline.getContainerName();
+    }
+    return newPipeline;
+  }
+
+  /** Adds a member to pipeline */
+
+  /**
+   * Adds a member to the pipeline.
+   *
+   * @param dataNodeId - Datanode to be added.
+   */
+  public void addMember(DatanodeID dataNodeId) {
+    datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId);
+  }
+
+  /**
+   * Returns the first machine in the set of datanodes.
+   *
+   * @return First Machine.
+   */
+  public DatanodeID getLeader() {
+    return datanodes.get(leaderID);
+  }
+
+  /**
+   * Returns all machines that make up this pipeline.
+   *
+   * @return List of Machines.
+   */
+  public List<DatanodeID> getMachines() {
+    return new ArrayList<>(datanodes.values());
+  }
+
+  /**
+   * Return a Protobuf Pipeline message from pipeline.
+   *
+   * @return Protobuf message
+   */
+  public ContainerProtos.Pipeline getProtobufMessage() {
+    ContainerProtos.Pipeline.Builder builder =
+        ContainerProtos.Pipeline.newBuilder();
+    for (DatanodeID datanode : datanodes.values()) {
+      builder.addMembers(datanode.getProtoBufMessage());
+    }
+    builder.setLeaderID(leaderID);
+    if (this.containerName != null) {
+      builder.setContainerName(this.containerName);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Returns containerName if available.
+   *
+   * @return String.
+   */
+  public String getContainerName() {
+    return containerName;
+  }
+
+  /**
+   * Sets the container Name.
+   *
+   * @param containerName - Name of the container.
+   */
+  public void setContainerName(String containerName) {
+    this.containerName = containerName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java
new file mode 100644
index 0000000..0c2686d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.transport.client;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.helpers.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A Client for the storageContainer protocol.
+ */
+public class XceiverClient {
+  static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
+  private final Pipeline pipeline;
+  private final OzoneConfiguration config;
+  private ChannelFuture channelFuture;
+  private Bootstrap b;
+  private EventLoopGroup group;
+
+  /**
+   * Constructs a client that can communicate with the Container framework on
+   * data nodes.
+   * @param pipeline - Pipeline that defines the machines.
+   * @param config -- Ozone Config
+   */
+  public XceiverClient(Pipeline pipeline, OzoneConfiguration config) {
+    Preconditions.checkNotNull(pipeline);
+    Preconditions.checkNotNull(config);
+    this.pipeline = pipeline;
+    this.config = config;
+  }
+
+  /**
+   * Connects to the leader in the pipeline.
+   */
+  public void connect() throws Exception {
+    if (channelFuture != null
+        && channelFuture.channel() != null
+        && channelFuture.channel().isActive()) {
+      throw new IOException("This client is already connected to a host.");
+    }
+
+    group = new NioEventLoopGroup();
+    b = new Bootstrap();
+    b.group(group)
+        .channel(NioSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .handler(new XceiverClientInitializer(this.pipeline));
+    DatanodeID leader = this.pipeline.getLeader();
+
+    // read port from the data node, on failure use default configured
+    // port.
+    int port = leader.getContainerPort();
+    if (port == 0) {
+      port = config.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
+          OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT);
+    }
+    LOG.debug("Connecting to server Port : " + port);
+    channelFuture = b.connect(leader.getHostName(), port).sync();
+  }
+
+  /**
+   * Close the client.
+   */
+  public void close() {
+    if(group != null) {
+      group.shutdownGracefully();
+    }
+
+    if (channelFuture != null) {
+      channelFuture.channel().close();
+    }
+  }
+
+  /**
+   * Sends a given command to server and gets the reply back.
+   * @param request Request
+   * @return Response to the command
+   * @throws IOException
+   */
+  public ContainerProtos.ContainerCommandResponseProto sendCommand(
+      ContainerProtos.ContainerCommandRequestProto request)
+      throws IOException {
+    if((channelFuture == null) || (!channelFuture.channel().isActive())) {
+      throw new IOException("This channel is not connected.");
+    }
+    XceiverClientHandler handler =
+        channelFuture.channel().pipeline().get(XceiverClientHandler.class);
+
+    return handler.sendCommand(request);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java
new file mode 100644
index 0000000..25624f4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.transport.client;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.helpers.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Netty client handler.
+ */
+public class XceiverClientHandler extends
+    SimpleChannelInboundHandler<ContainerProtos.ContainerCommandResponseProto> {
+
+  static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
+  private final BlockingQueue<ContainerProtos.ContainerCommandResponseProto>
+      responses = new LinkedBlockingQueue<>();
+  private final Pipeline pipeline;
+  private volatile Channel channel;
+
+  /**
+   * Constructs a client that can communicate to a container server.
+   */
+  public XceiverClientHandler(Pipeline pipeline) {
+    super(false);
+    this.pipeline = pipeline;
+  }
+
+  /**
+   * <strong>Please keep in mind that this method will be renamed to {@code
+   * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
+   * <p>
+   * Is called for each message of type {@link ContainerProtos
+   * .ContainerCommandResponseProto}.
+   *
+   * @param ctx the {@link ChannelHandlerContext} which this {@link
+   *            SimpleChannelInboundHandler} belongs to
+   * @param msg the message to handle
+   * @throws Exception is thrown if an error occurred
+   */
+  @Override
+  public void channelRead0(ChannelHandlerContext ctx,
+                           ContainerProtos.ContainerCommandResponseProto msg)
+      throws Exception {
+    responses.add(msg);
+  }
+
+  @Override
+  public void channelRegistered(ChannelHandlerContext ctx) {
+    LOG.debug("channelRegistered: Connected to ctx");
+    channel = ctx.channel();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    LOG.info("Exception in client " + cause.toString());
+    ctx.close();
+  }
+
+  /**
+   * Since netty is async, we send a work request and then wait until a response
+   * appears in the reply queue. This is simple sync interface for clients. we
+   * should consider building async interfaces for client if this turns out to
+   * be a performance bottleneck.
+   *
+   * @param request - request.
+   * @return -- response
+   */
+  public ContainerProtos.ContainerCommandResponseProto
+      sendCommand(ContainerProtos.ContainerCommandRequestProto request) {
+
+    ContainerProtos.ContainerCommandResponseProto response;
+    channel.writeAndFlush(request);
+    boolean interrupted = false;
+    for (; ; ) {
+      try {
+        response = responses.take();
+        break;
+      } catch (InterruptedException ignore) {
+        interrupted = true;
+      }
+    }
+
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+    return response;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java
new file mode 100644
index 0000000..6951f28
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.transport.client;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.hadoop.ozone.container.helpers.Pipeline;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+
+/**
+ * Setup the netty pipeline.
+ */
+public class XceiverClientInitializer extends
+    ChannelInitializer<SocketChannel> {
+  private final Pipeline pipeline;
+
+  /**
+   * Constructs an Initializer for the client pipeline.
+   * @param pipeline  - Pipeline.
+   */
+  public XceiverClientInitializer(Pipeline pipeline) {
+    this.pipeline = pipeline;
+  }
+
+  /**
+   * This method will be called once when the Channel is registered. After
+   * the method returns this instance will be removed from the
+   * ChannelPipeline of the Channel.
+   *
+   * @param ch   Channel which was registered.
+   * @throws Exception is thrown if an error occurs. In that case the
+   *                   Channel will be closed.
+   */
+  @Override
+  protected void initChannel(SocketChannel ch) throws Exception {
+    ChannelPipeline p = ch.pipeline();
+
+    p.addLast(new ProtobufVarint32FrameDecoder());
+    p.addLast(new ProtobufDecoder(ContainerProtos
+        .ContainerCommandResponseProto.getDefaultInstance()));
+
+    p.addLast(new ProtobufVarint32LengthFieldPrepender());
+    p.addLast(new ProtobufEncoder());
+
+    p.addLast(new XceiverClientHandler(this.pipeline));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
new file mode 100644
index 0000000..0622c82
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.container.helpers.Pipeline;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.UUID;
+
+/**
+ * Helpers for container tests.
+ */
+public class ContainerTestHelper {
+
+  /**
+   * Create a pipeline with single node replica.
+   *
+   * @return Pipeline with single node in it.
+   * @throws IOException
+   */
+  public static Pipeline createSingleNodePipeline() throws IOException {
+    ServerSocket socket = new ServerSocket(0);
+    int port = socket.getLocalPort();
+    DatanodeID datanodeID = new DatanodeID(socket.getInetAddress()
+        .getHostAddress(), socket.getInetAddress().getHostName(),
+        UUID.randomUUID().toString(), port, port, port, port);
+    datanodeID.setContainerPort(port);
+    Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid());
+    pipeline.addMember(datanodeID);
+    socket.close();
+    return pipeline;
+  }
+
+  /**
+   * Returns a create container command for test purposes. There are a bunch of
+   * tests where we need to just send a request and get a reply.
+   *
+   * @return ContainerCommandRequestProto.
+   */
+  public static ContainerCommandRequestProto getCreateContainerRequest() throws
+      IOException {
+    ContainerProtos.CreateContainerRequestProto.Builder createRequest =
+        ContainerProtos.CreateContainerRequestProto
+            .newBuilder();
+    ContainerProtos.ContainerData.Builder containerData = ContainerProtos
+        .ContainerData.newBuilder();
+    containerData.setName("testContainer");
+    createRequest.setPipeline(
+        ContainerTestHelper.createSingleNodePipeline().getProtobufMessage());
+    createRequest.setContainerData(containerData.build());
+
+    ContainerCommandRequestProto.Builder request =
+        ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.CreateContainer);
+    request.setCreateContainer(createRequest);
+    return request.build();
+  }
+
+  /**
+   * Returns a create container response for test purposes. There are a bunch of
+   * tests where we need to just send a request and get a reply.
+   *
+   * @return ContainerCommandRequestProto.
+   */
+  public static ContainerCommandResponseProto
+  getCreateContainerResponse(ContainerCommandRequestProto request) throws
+      IOException {
+    ContainerProtos.CreateContainerResponseProto.Builder createResponse =
+        ContainerProtos.CreateContainerResponseProto.newBuilder();
+
+    ContainerCommandResponseProto.Builder response =
+        ContainerCommandResponseProto.newBuilder();
+    response.setCmdType(ContainerProtos.Type.CreateContainer);
+    response.setTraceID(request.getTraceID());
+    response.setCreateContainer(createResponse.build());
+    return response.build();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
index 37820eb..f546a12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
@@ -19,9 +19,16 @@
 package org.apache.hadoop.ozone.container.transport.server;
 
 import io.netty.channel.embedded.EmbeddedChannel;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.helpers.Pipeline;
 import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.transport.client.XceiverClient;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -30,17 +37,53 @@ import java.io.IOException;
 public class TestContainerServer {
 
   @Test
-  public void testPipeline() {
-    EmbeddedChannel channel = new EmbeddedChannel(new XceiverServerHandler(
-        new TestContainerDispatcher()));
-    ContainerCommandRequestProto request = ContainerCommandRequestProto
-        .getDefaultInstance();
-    channel.writeInbound(request);
-    Assert.assertTrue(channel.finish());
-    ContainerCommandResponseProto response = channel.readOutbound();
-    Assert.assertTrue(
-        ContainerCommandResponseProto.getDefaultInstance().equals(response));
-    channel.close();
+  public void testPipeline() throws IOException {
+    EmbeddedChannel channel = null;
+    try {
+      channel = new EmbeddedChannel(new XceiverServerHandler(
+          new TestContainerDispatcher()));
+      ContainerCommandRequestProto request =
+          ContainerTestHelper.getCreateContainerRequest();
+      channel.writeInbound(request);
+      Assert.assertTrue(channel.finish());
+      ContainerCommandResponseProto response = channel.readOutbound();
+      Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+    } finally {
+      if (channel != null) {
+        channel.close();
+      }
+    }
+  }
+
+  @Test
+  public void testClientServer() throws Exception {
+    XceiverServer server = null;
+    XceiverClient client = null;
+
+    try {
+      Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
+      OzoneConfiguration conf = new OzoneConfiguration();
+      conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
+          pipeline.getLeader().getContainerPort());
+
+      server = new XceiverServer(conf, new TestContainerDispatcher());
+      client = new XceiverClient(pipeline, conf);
+
+      server.start();
+      client.connect();
+
+      ContainerCommandRequestProto request =
+          ContainerTestHelper.getCreateContainerRequest();
+      ContainerCommandResponseProto response = client.sendCommand(request);
+      Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+    } finally {
+      if (client != null) {
+        client.close();
+      }
+      if (server != null) {
+        server.stop();
+      }
+    }
   }
 
   private class TestContainerDispatcher implements ContainerDispatcher {
@@ -54,7 +97,7 @@ public class TestContainerServer {
     @Override
     public ContainerCommandResponseProto
     dispatch(ContainerCommandRequestProto msg) throws IOException {
-      return ContainerCommandResponseProto.getDefaultInstance();
+      return ContainerTestHelper.getCreateContainerResponse(msg);
     }
   }
 }


Mime
View raw message