hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [39/68] [abbrv] [partial] hadoop git commit: HDFS-13405. Ozone: Rename HDSL to HDDS. Contributed by Ajay Kumar, Elek Marton, Mukul Kumar Singh, Shashikant Banerjee and Anu Engineer.
Date Thu, 26 Apr 2018 21:20:24 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
new file mode 100644
index 0000000..ac245d5
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsUtils.getSCMAddresses;
+
+/**
+ * Init Datanode State is the task that gets run when we are in Init State.
+ */
+public class InitDatanodeState implements DatanodeState,
+    Callable<DatanodeStateMachine.DatanodeStates> {
+  static final Logger LOG = LoggerFactory.getLogger(InitDatanodeState.class);
+  private final SCMConnectionManager connectionManager;
+  private final Configuration conf;
+  private final StateContext context;
+  private Future<DatanodeStateMachine.DatanodeStates> result;
+
+  /**
+   *  Create InitDatanodeState Task.
+   *
+   * @param conf - Conf
+   * @param connectionManager - Connection Manager
+   * @param context - Current Context
+   */
+  public InitDatanodeState(Configuration conf,
+                           SCMConnectionManager connectionManager,
+                           StateContext context) {
+    this.conf = conf;
+    this.connectionManager = connectionManager;
+    this.context = context;
+  }
+
+  /**
+   * Computes a result, or throws an exception if unable to do so.
+   *
+   * @return computed result
+   * @throws Exception if unable to compute a result
+   */
+  @Override
+  public DatanodeStateMachine.DatanodeStates call() throws Exception {
+    Collection<InetSocketAddress> addresses = null;
+    try {
+      addresses = getSCMAddresses(conf);
+    } catch (IllegalArgumentException e) {
+      if(!Strings.isNullOrEmpty(e.getMessage())) {
+        LOG.error("Failed to get SCM addresses: " + e.getMessage());
+      }
+      return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
+    }
+
+    if (addresses == null || addresses.isEmpty()) {
+      LOG.error("Null or empty SCM address list found.");
+      return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
+    } else {
+      for (InetSocketAddress addr : addresses) {
+        connectionManager.addSCMServer(addr);
+      }
+    }
+
+    // If datanode ID is set, persist it to the ID file.
+    persistContainerDatanodeDetails();
+
+    return this.context.getState().getNextState();
+  }
+
+  /**
+   * Persist DatanodeDetails to datanode.id file.
+   */
+  private void persistContainerDatanodeDetails() throws IOException {
+    String dataNodeIDPath = HddsUtils.getDatanodeIdFilePath(conf);
+    File idPath = new File(dataNodeIDPath);
+    DatanodeDetails datanodeDetails = this.context.getParent()
+        .getDatanodeDetails();
+    if (datanodeDetails != null && !idPath.exists()) {
+      ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
+      LOG.info("DatanodeDetails is persisted to {}", dataNodeIDPath);
+    }
+  }
+
+  /**
+   * Called before entering this state.
+   */
+  @Override
+  public void onEnter() {
+    LOG.trace("Entering init container state");
+  }
+
+  /**
+   * Called After exiting this state.
+   */
+  @Override
+  public void onExit() {
+    LOG.trace("Exiting init container state");
+  }
+
+  /**
+   * Executes one or more tasks that is needed by this state.
+   *
+   * @param executor -  ExecutorService
+   */
+  @Override
+  public void execute(ExecutorService executor) {
+    result = executor.submit(this);
+  }
+
+  /**
+   * Wait for execute to finish.
+   *
+   * @param time     - Time
+   * @param timeUnit - Unit of time.
+   */
+  @Override
+  public DatanodeStateMachine.DatanodeStates await(long time,
+      TimeUnit timeUnit) throws InterruptedException,
+      ExecutionException, TimeoutException {
+    return result.get(time, timeUnit);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
new file mode 100644
index 0000000..7a8c17b
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Class that implements handshake with SCM.
+ */
+public class RunningDatanodeState implements DatanodeState {
+  static final Logger
+      LOG = LoggerFactory.getLogger(RunningDatanodeState.class);
+  private final SCMConnectionManager connectionManager;
+  private final Configuration conf;
+  private final StateContext context;
+  private CompletionService<EndpointStateMachine.EndPointStates> ecs;
+
+  public RunningDatanodeState(Configuration conf,
+      SCMConnectionManager connectionManager,
+      StateContext context) {
+    this.connectionManager = connectionManager;
+    this.conf = conf;
+    this.context = context;
+  }
+
+  /**
+   * Called before entering this state.
+   */
+  @Override
+  public void onEnter() {
+    LOG.trace("Entering handshake task.");
+  }
+
+  /**
+   * Called After exiting this state.
+   */
+  @Override
+  public void onExit() {
+    LOG.trace("Exiting handshake task.");
+  }
+
+  /**
+   * Executes one or more tasks that is needed by this state.
+   *
+   * @param executor -  ExecutorService
+   */
+  @Override
+  public void execute(ExecutorService executor) {
+    ecs = new ExecutorCompletionService<>(executor);
+    for (EndpointStateMachine endpoint : connectionManager.getValues()) {
+      Callable<EndpointStateMachine.EndPointStates> endpointTask
+          = getEndPointTask(endpoint);
+      ecs.submit(endpointTask);
+    }
+  }
+  //TODO : Cache some of these tasks instead of creating them
+  //all the time.
+  private Callable<EndpointStateMachine.EndPointStates>
+      getEndPointTask(EndpointStateMachine endpoint) {
+    switch (endpoint.getState()) {
+    case GETVERSION:
+      return new VersionEndpointTask(endpoint, conf);
+    case REGISTER:
+      return  RegisterEndpointTask.newBuilder()
+          .setConfig(conf)
+          .setEndpointStateMachine(endpoint)
+          .setDatanodeDetails(context.getParent().getDatanodeDetails())
+          .build();
+    case HEARTBEAT:
+      return HeartbeatEndpointTask.newBuilder()
+          .setConfig(conf)
+          .setEndpointStateMachine(endpoint)
+          .setDatanodeDetails(context.getParent().getDatanodeDetails())
+          .setContext(context)
+          .build();
+    case SHUTDOWN:
+      break;
+    default:
+      throw new IllegalArgumentException("Illegal Argument.");
+    }
+    return null;
+  }
+
+  /**
+   * Computes the next state the container state machine must move to by looking
+   * at all the state of endpoints.
+   * <p>
+   * if any endpoint state has moved to Shutdown, either we have an
+   * unrecoverable error or we have been told to shutdown. Either case the
+   * datanode state machine should move to Shutdown state, otherwise we
+   * remain in the Running state.
+   *
+   * @return next container state.
+   */
+  private DatanodeStateMachine.DatanodeStates
+      computeNextContainerState(
+      List<Future<EndpointStateMachine.EndPointStates>> results) {
+    for (Future<EndpointStateMachine.EndPointStates> state : results) {
+      try {
+        if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) {
+          // if any endpoint tells us to shutdown we move to shutdown state.
+          return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Error in executing end point task.", e);
+      }
+    }
+    return DatanodeStateMachine.DatanodeStates.RUNNING;
+  }
+
+  /**
+   * Wait for execute to finish.
+   *
+   * @param duration - Time
+   * @param timeUnit - Unit of duration.
+   */
+  @Override
+  public DatanodeStateMachine.DatanodeStates
+      await(long duration, TimeUnit timeUnit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    int count = connectionManager.getValues().size();
+    int returned = 0;
+    long timeLeft = timeUnit.toMillis(duration);
+    long startTime = Time.monotonicNow();
+    List<Future<EndpointStateMachine.EndPointStates>> results = new
+        LinkedList<>();
+
+    while (returned < count && timeLeft > 0) {
+      Future<EndpointStateMachine.EndPointStates> result =
+          ecs.poll(timeLeft, TimeUnit.MILLISECONDS);
+      if (result != null) {
+        results.add(result);
+        returned++;
+      }
+      timeLeft = timeLeft - (Time.monotonicNow() - startTime);
+    }
+    return computeNextContainerState(results);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
new file mode 100644
index 0000000..6b8d16c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+/**
+ This package contians files that guide the state transitions from
+ Init->Running->Shutdown for the datanode.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
new file mode 100644
index 0000000..5dee10f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.container.common.helpers
+    .DeletedContainerBlocksSummary;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine.EndPointStates;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.ZonedDateTime;
+import java.util.concurrent.Callable;
+
+/**
+ * Heartbeat class for SCMs.
+ */
+public class HeartbeatEndpointTask
+    implements Callable<EndpointStateMachine.EndPointStates> {
+  static final Logger LOG =
+      LoggerFactory.getLogger(HeartbeatEndpointTask.class);
+  private final EndpointStateMachine rpcEndpoint;
+  private final Configuration conf;
+  private DatanodeDetailsProto datanodeDetailsProto;
+  private StateContext context;
+
+  /**
+   * Constructs a SCM heart beat.
+   *
+   * @param conf Config.
+   */
+  public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
+      Configuration conf, StateContext context) {
+    this.rpcEndpoint = rpcEndpoint;
+    this.conf = conf;
+    this.context = context;
+  }
+
+  /**
+   * Get the container Node ID proto.
+   *
+   * @return ContainerNodeIDProto
+   */
+  public DatanodeDetailsProto getDatanodeDetailsProto() {
+    return datanodeDetailsProto;
+  }
+
+  /**
+   * Set container node ID proto.
+   *
+   * @param datanodeDetailsProto - the node id.
+   */
+  public void setDatanodeDetailsProto(DatanodeDetailsProto
+      datanodeDetailsProto) {
+    this.datanodeDetailsProto = datanodeDetailsProto;
+  }
+
+  /**
+   * Computes a result, or throws an exception if unable to do so.
+   *
+   * @return computed result
+   * @throws Exception if unable to compute a result
+   */
+  @Override
+  public EndpointStateMachine.EndPointStates call() throws Exception {
+    rpcEndpoint.lock();
+    try {
+      Preconditions.checkState(this.datanodeDetailsProto != null);
+
+      SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
+          .sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport(),
+              this.context.getContainerReportState());
+      processResponse(reponse, datanodeDetailsProto);
+      rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
+      rpcEndpoint.zeroMissedCount();
+    } catch (IOException ex) {
+      rpcEndpoint.logIfNeeded(ex);
+    } finally {
+      rpcEndpoint.unlock();
+    }
+    return rpcEndpoint.getState();
+  }
+
+  /**
+   * Returns a builder class for HeartbeatEndpointTask task.
+   * @return   Builder.
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Add this command to command processing Queue.
+   *
+   * @param response - SCMHeartbeat response.
+   */
+  private void processResponse(SCMHeartbeatResponseProto response,
+      final DatanodeDetailsProto datanodeDetails) {
+    for (SCMCommandResponseProto commandResponseProto : response
+        .getCommandsList()) {
+      // Verify the response is indeed for this datanode.
+      Preconditions.checkState(commandResponseProto.getDatanodeUUID()
+          .equalsIgnoreCase(datanodeDetails.getUuid()),
+          "Unexpected datanode ID in the response.");
+      switch (commandResponseProto.getCmdType()) {
+      case sendContainerReport:
+        this.context.addCommand(SendContainerCommand.getFromProtobuf(
+            commandResponseProto.getSendReport()));
+        break;
+      case reregisterCommand:
+        if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Received SCM notification to register."
+                + " Interrupt HEARTBEAT and transit to REGISTER state.");
+          }
+          rpcEndpoint.setState(EndPointStates.REGISTER);
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Illegal state {} found, expecting {}.",
+                rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT);
+          }
+        }
+        break;
+      case deleteBlocksCommand:
+        DeleteBlocksCommand db = DeleteBlocksCommand
+            .getFromProtobuf(commandResponseProto.getDeleteBlocksProto());
+        if (!db.blocksTobeDeleted().isEmpty()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(DeletedContainerBlocksSummary
+                .getFrom(db.blocksTobeDeleted())
+                .toString());
+          }
+          this.context.addCommand(db);
+        }
+        break;
+      case closeContainerCommand:
+        CloseContainerCommand closeContainer =
+            CloseContainerCommand.getFromProtobuf(
+                commandResponseProto.getCloseContainerProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM container close request for container {}",
+              closeContainer.getContainerName());
+        }
+        this.context.addCommand(closeContainer);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown response : "
+            + commandResponseProto.getCmdType().name());
+      }
+    }
+  }
+
+  /**
+   * Builder class for HeartbeatEndpointTask.
+   */
+  public static class Builder {
+    private EndpointStateMachine endPointStateMachine;
+    private Configuration conf;
+    private DatanodeDetails datanodeDetails;
+    private StateContext context;
+
+    /**
+     * Constructs the builder class.
+     */
+    public Builder() {
+    }
+
+    /**
+     * Sets the endpoint state machine.
+     *
+     * @param rpcEndPoint - Endpoint state machine.
+     * @return Builder
+     */
+    public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
+      this.endPointStateMachine = rpcEndPoint;
+      return this;
+    }
+
+    /**
+     * Sets the Config.
+     *
+     * @param config - config
+     * @return Builder
+     */
+    public Builder setConfig(Configuration config) {
+      this.conf = config;
+      return this;
+    }
+
+    /**
+     * Sets the NodeID.
+     *
+     * @param dnDetails - NodeID proto
+     * @return Builder
+     */
+    public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
+      this.datanodeDetails = dnDetails;
+      return this;
+    }
+
+    /**
+     * Sets the context.
+     * @param stateContext - State context.
+     * @return this.
+     */
+    public Builder setContext(StateContext stateContext) {
+      this.context = stateContext;
+      return this;
+    }
+
+    public HeartbeatEndpointTask build() {
+      if (endPointStateMachine == null) {
+        LOG.error("No endpoint specified.");
+        throw new IllegalArgumentException("A valid endpoint state machine is" +
+            " needed to construct HeartbeatEndpointTask task");
+      }
+
+      if (conf == null) {
+        LOG.error("No config specified.");
+        throw new IllegalArgumentException("A valid configration is needed to" +
+            " construct HeartbeatEndpointTask task");
+      }
+
+      if (datanodeDetails == null) {
+        LOG.error("No datanode specified.");
+        throw new IllegalArgumentException("A vaild Node ID is needed to " +
+            "construct HeartbeatEndpointTask task");
+      }
+
+      HeartbeatEndpointTask task = new HeartbeatEndpointTask(this
+          .endPointStateMachine, this.conf, this.context);
+      task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage());
+      return task;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
new file mode 100644
index 0000000..6913896
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+/**
+ * Register a container with SCM.
+ */
+public final class RegisterEndpointTask implements
+    Callable<EndpointStateMachine.EndPointStates> {
+  static final Logger LOG = LoggerFactory.getLogger(RegisterEndpointTask.class);
+
+  private final EndpointStateMachine rpcEndPoint;
+  private final Configuration conf;
+  private Future<EndpointStateMachine.EndPointStates> result;
+  private DatanodeDetailsProto datanodeDetailsProto;
+
+  /**
+   * Creates a register endpoint task.
+   *
+   * @param rpcEndPoint - endpoint
+   * @param conf - conf
+   */
+  @VisibleForTesting
+  public RegisterEndpointTask(EndpointStateMachine rpcEndPoint,
+      Configuration conf) {
+    this.rpcEndPoint = rpcEndPoint;
+    this.conf = conf;
+
+  }
+
+  /**
+   * Get the DatanodeDetailsProto Proto.
+   *
+   * @return DatanodeDetailsProto
+   */
+  public DatanodeDetailsProto getDatanodeDetailsProto() {
+    return datanodeDetailsProto;
+  }
+
+  /**
+   * Set the contiainerNodeID Proto.
+   *
+   * @param datanodeDetailsProto - Container Node ID.
+   */
+  public void setDatanodeDetailsProto(
+      DatanodeDetailsProto datanodeDetailsProto) {
+    this.datanodeDetailsProto = datanodeDetailsProto;
+  }
+
+  /**
+   * Computes a result, or throws an exception if unable to do so.
+   *
+   * @return computed result
+   * @throws Exception if unable to compute a result
+   */
+  @Override
+  public EndpointStateMachine.EndPointStates call() throws Exception {
+
+    if (getDatanodeDetailsProto() == null) {
+      LOG.error("Container ID proto cannot be null in RegisterEndpoint task, " +
+          "shutting down the endpoint.");
+      return rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
+    }
+
+    rpcEndPoint.lock();
+    try {
+
+      // TODO : Add responses to the command Queue.
+      rpcEndPoint.getEndPoint().register(datanodeDetailsProto,
+          conf.getStrings(ScmConfigKeys.OZONE_SCM_NAMES));
+      EndpointStateMachine.EndPointStates nextState =
+          rpcEndPoint.getState().getNextState();
+      rpcEndPoint.setState(nextState);
+      rpcEndPoint.zeroMissedCount();
+    } catch (IOException ex) {
+      rpcEndPoint.logIfNeeded(ex
+      );
+    } finally {
+      rpcEndPoint.unlock();
+    }
+
+    return rpcEndPoint.getState();
+  }
+
+  /**
+   * Returns a builder class for RegisterEndPoint task.
+   *
+   * @return Builder.
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder class for RegisterEndPoint task.
+   */
+  public static class Builder {
+    private EndpointStateMachine endPointStateMachine;
+    private Configuration conf;
+    private DatanodeDetails datanodeDetails;
+
+    /**
+     * Constructs the builder class.
+     */
+    public Builder() {
+    }
+
+    /**
+     * Sets the endpoint state machine.
+     *
+     * @param rpcEndPoint - Endpoint state machine.
+     * @return Builder
+     */
+    public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
+      this.endPointStateMachine = rpcEndPoint;
+      return this;
+    }
+
+    /**
+     * Sets the Config.
+     *
+     * @param config - config
+     * @return Builder.
+     */
+    public Builder setConfig(Configuration config) {
+      this.conf = config;
+      return this;
+    }
+
+    /**
+     * Sets the NodeID.
+     *
+     * @param dnDetails - NodeID proto
+     * @return Builder
+     */
+    public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
+      this.datanodeDetails = dnDetails;
+      return this;
+    }
+
+    public RegisterEndpointTask build() {
+      if (endPointStateMachine == null) {
+        LOG.error("No endpoint specified.");
+        throw new IllegalArgumentException("A valid endpoint state machine is" +
+            " needed to construct RegisterEndPoint task");
+      }
+
+      if (conf == null) {
+        LOG.error("No config specified.");
+        throw new IllegalArgumentException("A valid configration is needed to" +
+            " construct RegisterEndpoint task");
+      }
+
+      if (datanodeDetails == null) {
+        LOG.error("No datanode specified.");
+        throw new IllegalArgumentException("A vaild Node ID is needed to " +
+            "construct RegisterEndpoint task");
+      }
+
+      RegisterEndpointTask task = new RegisterEndpointTask(this
+          .endPointStateMachine, this.conf);
+      task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage());
+      return task;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
new file mode 100644
index 0000000..b048ee5
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+/**
+ * Task that returns version.
+ */
+public class VersionEndpointTask implements
+    Callable<EndpointStateMachine.EndPointStates> {
+  private final EndpointStateMachine rpcEndPoint;
+  private final Configuration configuration;
+
+  public VersionEndpointTask(EndpointStateMachine rpcEndPoint,
+      Configuration conf) {
+    this.rpcEndPoint = rpcEndPoint;
+    this.configuration = conf;
+  }
+
+  /**
+   * Computes a result, or throws an exception if unable to do so.
+   *
+   * @return computed result
+   * @throws Exception if unable to compute a result
+   */
+  @Override
+  public EndpointStateMachine.EndPointStates call() throws Exception {
+    rpcEndPoint.lock();
+    try{
+      SCMVersionResponseProto versionResponse =
+          rpcEndPoint.getEndPoint().getVersion(null);
+      rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse));
+
+      EndpointStateMachine.EndPointStates nextState =
+          rpcEndPoint.getState().getNextState();
+      rpcEndPoint.setState(nextState);
+      rpcEndPoint.zeroMissedCount();
+    } catch (IOException ex) {
+      rpcEndPoint.logIfNeeded(ex);
+    } finally {
+      rpcEndPoint.unlock();
+    }
+    return rpcEndPoint.getState();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
new file mode 100644
index 0000000..1122598
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+/**
+ This package contains code for RPC endpoints transitions.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
new file mode 100644
index 0000000..92c953f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.states;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
new file mode 100644
index 0000000..50e45b4
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+
+/**
+ * Creates a netty server endpoint that acts as the communication layer for
+ * Ozone containers.
+ */
+public final class XceiverServer implements XceiverServerSpi {
+  private static final Logger
+      LOG = LoggerFactory.getLogger(XceiverServer.class);
+  private int port;
+  private final ContainerDispatcher storageContainer;
+
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+  private Channel channel;
+
+  /**
+   * Constructs a netty server class.
+   *
+   * @param conf - Configuration
+   */
+  public XceiverServer(DatanodeDetails datanodeDetails, Configuration conf,
+                       ContainerDispatcher dispatcher) {
+    Preconditions.checkNotNull(conf);
+
+    this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+    // Get an available port on current node and
+    // use that as the container port
+    if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
+      try (ServerSocket socket = new ServerSocket()) {
+        socket.setReuseAddress(true);
+        SocketAddress address = new InetSocketAddress(0);
+        socket.bind(address);
+        this.port = socket.getLocalPort();
+        LOG.info("Found a free port for the server : {}", this.port);
+      } catch (IOException e) {
+        LOG.error("Unable find a random free port for the server, "
+            + "fallback to use default port {}", this.port, e);
+      }
+    }
+    datanodeDetails.setContainerPort(port);
+    this.storageContainer = dispatcher;
+  }
+
+  @Override
+  public int getIPCPort() {
+    return this.port;
+  }
+
+  /**
+   * Returns the Replication type supported by this end-point.
+   *
+   * @return enum -- {Stand_Alone, Ratis, Chained}
+   */
+  @Override
+  public HddsProtos.ReplicationType getServerType() {
+    return HddsProtos.ReplicationType.STAND_ALONE;
+  }
+
+  @Override
+  public void start() throws IOException {
+    bossGroup = new NioEventLoopGroup();
+    workerGroup = new NioEventLoopGroup();
+    channel = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(new XceiverServerInitializer(storageContainer))
+        .bind(port)
+        .syncUninterruptibly()
+        .channel();
+  }
+
+  @Override
+  public void stop() {
+    if (storageContainer != null) {
+      storageContainer.shutdown();
+    }
+    if (bossGroup != null) {
+      bossGroup.shutdownGracefully();
+    }
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully();
+    }
+    if (channel != null) {
+      channel.close().awaitUninterruptibly();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
new file mode 100644
index 0000000..5947dde
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty server handlers that respond to Network events.
+ */
+public class XceiverServerHandler extends
+    SimpleChannelInboundHandler<ContainerCommandRequestProto> {
+
+  static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class);
+  private final ContainerDispatcher dispatcher;
+
+  /**
+   * Constructor for server handler.
+   * @param dispatcher - Dispatcher interface
+   */
+  public XceiverServerHandler(ContainerDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  /**
+   * <strong>Please keep in mind that this method will be renamed to {@code
+   * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
+   * <p>
+   * Is called for each message of type {@link ContainerCommandRequestProto}.
+   *
+   * @param ctx the {@link ChannelHandlerContext} which this {@link
+   *            SimpleChannelInboundHandler} belongs to
+   * @param msg the message to handle
+   * @throws Exception is thrown if an error occurred
+   */
+  @Override
+  public void channelRead0(ChannelHandlerContext ctx,
+                           ContainerCommandRequestProto msg) throws
+      Exception {
+    ContainerCommandResponseProto response = this.dispatcher.dispatch(msg);
+    LOG.debug("Writing the reponse back to client.");
+    ctx.writeAndFlush(response);
+
+  }
+
+  /**
+   * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
+   * Sub-classes may override this method to change behavior.
+   *
+   * @param ctx   - Channel Handler Context
+   * @param cause - Exception
+   */
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+      throws Exception {
+    LOG.error("An exception caught in the pipeline : " + cause.toString());
+    super.exceptionCaught(ctx, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
new file mode 100644
index 0000000..78ba26b
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+
+/**
+ * Creates a channel for the XceiverServer.
+ */
+public class XceiverServerInitializer extends ChannelInitializer<SocketChannel>{
+  private final ContainerDispatcher dispatcher;
+  public XceiverServerInitializer(ContainerDispatcher dispatcher) {
+    Preconditions.checkNotNull(dispatcher);
+    this.dispatcher = dispatcher;
+  }
+
+  /**
+   * This method will be called once the Channel is registered. After
+   * the method returns this instance will be removed from the {@link
+   * ChannelPipeline}
+   *
+   * @param ch the  which was registered.
+   * @throws Exception is thrown if an error occurs. In that case the channel
+   * will be closed.
+   */
+  @Override
+  protected void initChannel(SocketChannel ch) throws Exception {
+    ChannelPipeline pipeline = ch.pipeline();
+    pipeline.addLast(new ProtobufVarint32FrameDecoder());
+    pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto
+        .getDefaultInstance()));
+    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
+    pipeline.addLast(new ProtobufEncoder());
+    pipeline.addLast(new XceiverServerHandler(dispatcher));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
new file mode 100644
index 0000000..dad9e9f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+import java.io.IOException;
+
+/** A server endpoint that acts as the communication layer for Ozone
+ * containers. */
+public interface XceiverServerSpi {
+  /** Starts the server. */
+  void start() throws IOException;
+
+  /** Stops a running server. */
+  void stop();
+
+  /** Get server IPC port. */
+  int getIPCPort();
+
+  /**
+   * Returns the Replication type supported by this end-point.
+   * @return enum -- {Stand_Alone, Ratis, Chained}
+   */
+  HddsProtos.ReplicationType getServerType();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
new file mode 100644
index 0000000..59c96f1
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+/**
+ * This package contains classes for the server of the storage container
+ * protocol.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
new file mode 100644
index 0000000..1a89e44
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
+    .WriteChunkRequestProto;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.statemachine.StateMachineStorage;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.TransactionContextImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
+ *
+ * The stateMachine is responsible for handling different types of container
+ * requests. The container requests can be divided into readonly and write
+ * requests.
+ *
+ * Read only requests are classified in
+ * {@link org.apache.hadoop.hdds.scm.XceiverClientRatis#isReadOnly}
+ * and these readonly requests are replied from the {@link #query(Message)}.
+ *
+ * The write requests can be divided into requests with user data
+ * (WriteChunkRequest) and other request without user data.
+ *
+ * Inorder to optimize the write throughput, the writeChunk request is
+ * processed in 2 phases. The 2 phases are divided in
+ * {@link #startTransaction(RaftClientRequest)}, in the first phase the user
+ * data is written directly into the state machine via
+ * {@link #writeStateMachineData} and in the second phase the
+ * transaction is committed via {@link #applyTransaction(TransactionContext)}
+ *
+ * For the requests with no stateMachine data, the transaction is directly
+ * committed through
+ * {@link #applyTransaction(TransactionContext)}
+ *
+ * There are 2 ordering operation which are enforced right now in the code,
+ * 1) Write chunk operation are executed after the create container operation,
+ * the write chunk operation will fail otherwise as the container still hasn't
+ * been created. Hence the create container operation has been split in the
+ * {@link #startTransaction(RaftClientRequest)}, this will help in synchronizing
+ * the calls in {@link #writeStateMachineData}
+ *
+ * 2) Write chunk commit operation is executed after write chunk state machine
+ * operation. This will ensure that commit operation is sync'd with the state
+ * machine operation.
+ * */
+public class ContainerStateMachine extends BaseStateMachine {
+  static final Logger LOG = LoggerFactory.getLogger(
+      ContainerStateMachine.class);
+  private final SimpleStateMachineStorage storage
+      = new SimpleStateMachineStorage();
+  private final ContainerDispatcher dispatcher;
+  private ThreadPoolExecutor writeChunkExecutor;
+  private final ConcurrentHashMap<Long, CompletableFuture<Message>>
+      writeChunkFutureMap;
+  private final ConcurrentHashMap<String, CompletableFuture<Message>>
+      createContainerFutureMap;
+
+  ContainerStateMachine(ContainerDispatcher dispatcher,
+      ThreadPoolExecutor writeChunkExecutor) {
+    this.dispatcher = dispatcher;
+    this.writeChunkExecutor = writeChunkExecutor;
+    this.writeChunkFutureMap = new ConcurrentHashMap<>();
+    this.createContainerFutureMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public StateMachineStorage getStateMachineStorage() {
+    return storage;
+  }
+
+  @Override
+  public void initialize(
+      RaftPeerId id, RaftProperties properties, RaftStorage raftStorage)
+      throws IOException {
+    super.initialize(id, properties, raftStorage);
+    storage.init(raftStorage);
+    //  TODO handle snapshots
+
+    // TODO: Add a flag that tells you that initialize has been called.
+    // Check with Ratis if this feature is done in Ratis.
+  }
+
+  @Override
+  public TransactionContext startTransaction(RaftClientRequest request)
+      throws IOException {
+    final ContainerCommandRequestProto proto =
+        getRequestProto(request.getMessage().getContent());
+
+    final SMLogEntryProto log;
+    if (proto.getCmdType() == ContainerProtos.Type.WriteChunk) {
+      final WriteChunkRequestProto write = proto.getWriteChunk();
+      // create the state machine data proto
+      final WriteChunkRequestProto dataWriteChunkProto =
+          WriteChunkRequestProto
+              .newBuilder(write)
+              .setStage(ContainerProtos.Stage.WRITE_DATA)
+              .build();
+      ContainerCommandRequestProto dataContainerCommandProto =
+          ContainerCommandRequestProto
+              .newBuilder(proto)
+              .setWriteChunk(dataWriteChunkProto)
+              .build();
+
+      // create the log entry proto
+      final WriteChunkRequestProto commitWriteChunkProto =
+          WriteChunkRequestProto.newBuilder()
+              .setPipeline(write.getPipeline())
+              .setKeyName(write.getKeyName())
+              .setChunkData(write.getChunkData())
+              // skipping the data field as it is
+              // already set in statemachine data proto
+              .setStage(ContainerProtos.Stage.COMMIT_DATA)
+              .build();
+      ContainerCommandRequestProto commitContainerCommandProto =
+          ContainerCommandRequestProto
+              .newBuilder(proto)
+              .setWriteChunk(commitWriteChunkProto)
+              .build();
+
+      log = SMLogEntryProto.newBuilder()
+          .setData(getShadedByteString(commitContainerCommandProto))
+          .setStateMachineData(getShadedByteString(dataContainerCommandProto))
+          .build();
+    } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+      log = SMLogEntryProto.newBuilder()
+          .setData(request.getMessage().getContent())
+          .setStateMachineData(request.getMessage().getContent())
+          .build();
+    } else {
+      log = SMLogEntryProto.newBuilder()
+          .setData(request.getMessage().getContent())
+          .build();
+    }
+    return new TransactionContextImpl(this, request, log);
+  }
+
+  private ByteString getShadedByteString(ContainerCommandRequestProto proto) {
+    return ShadedProtoUtil.asShadedByteString(proto.toByteArray());
+  }
+
+  private ContainerCommandRequestProto getRequestProto(ByteString request)
+      throws InvalidProtocolBufferException {
+    return ContainerCommandRequestProto.parseFrom(
+        ShadedProtoUtil.asByteString(request));
+  }
+
+  private Message runCommand(ContainerCommandRequestProto requestProto) {
+    LOG.trace("dispatch {}", requestProto);
+    ContainerCommandResponseProto response = dispatcher.dispatch(requestProto);
+    LOG.trace("response {}", response);
+    return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray());
+  }
+
+  private CompletableFuture<Message> handleWriteChunk(
+      ContainerCommandRequestProto requestProto, long entryIndex) {
+    final WriteChunkRequestProto write = requestProto.getWriteChunk();
+    String containerName = write.getPipeline().getContainerName();
+    CompletableFuture<Message> future =
+        createContainerFutureMap.get(containerName);
+    CompletableFuture<Message> writeChunkFuture;
+    if (future != null) {
+      writeChunkFuture = future.thenApplyAsync(
+          v -> runCommand(requestProto), writeChunkExecutor);
+    } else {
+      writeChunkFuture = CompletableFuture.supplyAsync(
+          () -> runCommand(requestProto), writeChunkExecutor);
+    }
+    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+    return writeChunkFuture;
+  }
+
+  private CompletableFuture<Message> handleCreateContainer(
+      ContainerCommandRequestProto requestProto) {
+    String containerName =
+        requestProto.getCreateContainer().getContainerData().getName();
+    createContainerFutureMap.
+        computeIfAbsent(containerName, k -> new CompletableFuture<>());
+    return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+  }
+
+  @Override
+  public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
+    try {
+      final ContainerCommandRequestProto requestProto =
+          getRequestProto(entry.getSmLogEntry().getStateMachineData());
+      ContainerProtos.Type cmdType = requestProto.getCmdType();
+      switch (cmdType) {
+      case CreateContainer:
+        return handleCreateContainer(requestProto);
+      case WriteChunk:
+        return handleWriteChunk(requestProto, entry.getIndex());
+      default:
+        throw new IllegalStateException("Cmd Type:" + cmdType
+            + " should not have state machine data");
+      }
+    } catch (IOException e) {
+      return completeExceptionally(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    try {
+      final ContainerCommandRequestProto requestProto =
+          getRequestProto(request.getContent());
+      return CompletableFuture.completedFuture(runCommand(requestProto));
+    } catch (IOException e) {
+      return completeExceptionally(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    try {
+      ContainerCommandRequestProto requestProto =
+          getRequestProto(trx.getSMLogEntry().getData());
+      ContainerProtos.Type cmdType = requestProto.getCmdType();
+
+      if (cmdType == ContainerProtos.Type.WriteChunk) {
+        WriteChunkRequestProto write = requestProto.getWriteChunk();
+        // the data field has already been removed in start Transaction
+        Preconditions.checkArgument(!write.hasData());
+        CompletableFuture<Message> stateMachineFuture =
+            writeChunkFutureMap.remove(trx.getLogEntry().getIndex());
+        return stateMachineFuture
+            .thenComposeAsync(v ->
+                CompletableFuture.completedFuture(runCommand(requestProto)));
+      } else {
+        Message message = runCommand(requestProto);
+        if (cmdType == ContainerProtos.Type.CreateContainer) {
+          String containerName =
+              requestProto.getCreateContainer().getContainerData().getName();
+          createContainerFutureMap.remove(containerName).complete(message);
+        }
+        return CompletableFuture.completedFuture(message);
+      }
+    } catch (IOException e) {
+      return completeExceptionally(e);
+    }
+  }
+
+  private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
+    final CompletableFuture<T> future = new CompletableFuture<>();
+    future.completeExceptionally(e);
+    return future;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
new file mode 100644
index 0000000..4bd55f1
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Creates a ratis server endpoint that acts as the communication layer for
+ * Ozone containers.
+ */
+public final class XceiverServerRatis implements XceiverServerSpi {
+  static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
+  private final int port;
+  private final RaftServer server;
+  private ThreadPoolExecutor writeChunkExecutor;
+
+  private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
+      ContainerDispatcher dispatcher, Configuration conf) throws IOException {
+
+    final String rpcType = conf.get(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
+    final int raftSegmentSize = conf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
+    final int raftSegmentPreallocatedSize = conf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
+    final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
+    final int numWriteChunkThreads = conf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
+
+    Objects.requireNonNull(dd, "id == null");
+    this.port = port;
+    RaftProperties serverProperties = newRaftProperties(rpc, port,
+        storageDir, maxChunkSize, raftSegmentSize, raftSegmentPreallocatedSize);
+
+    writeChunkExecutor =
+        new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
+            100, TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(1024),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    ContainerStateMachine stateMachine =
+        new ContainerStateMachine(dispatcher, writeChunkExecutor);
+    this.server = RaftServer.newBuilder()
+        .setServerId(RatisHelper.toRaftPeerId(dd))
+        .setGroup(RatisHelper.emptyRaftGroup())
+        .setProperties(serverProperties)
+        .setStateMachine(stateMachine)
+        .build();
+  }
+
+  private static RaftProperties newRaftProperties(
+      RpcType rpc, int port, String storageDir, int scmChunkSize,
+      int raftSegmentSize, int raftSegmentPreallocatedSize) {
+    final RaftProperties properties = new RaftProperties();
+    RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
+    RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
+        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+    RaftServerConfigKeys.Log.setWriteBufferSize(properties,
+        SizeInBytes.valueOf(scmChunkSize));
+    RaftServerConfigKeys.Log.setPreallocatedSize(properties,
+        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+    RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
+        SizeInBytes.valueOf(raftSegmentSize));
+    RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
+    RaftConfigKeys.Rpc.setType(properties, rpc);
+
+    RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
+    GrpcConfigKeys.setMessageSizeMax(properties,
+        SizeInBytes.valueOf(scmChunkSize + raftSegmentPreallocatedSize));
+    RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+        TimeDuration.valueOf(800, TimeUnit.MILLISECONDS));
+    RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+        TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS));
+    if (rpc == SupportedRpcType.GRPC) {
+      GrpcConfigKeys.Server.setPort(properties, port);
+    } else if (rpc == SupportedRpcType.NETTY) {
+      NettyConfigKeys.Server.setPort(properties, port);
+    }
+    return properties;
+  }
+
+  public static XceiverServerRatis newXceiverServerRatis(
+      DatanodeDetails datanodeDetails, Configuration ozoneConf,
+      ContainerDispatcher dispatcher) throws IOException {
+    final String ratisDir = File.separator + "ratis";
+    int localPort = ozoneConf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
+    String storageDir = ozoneConf.get(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
+
+    if (Strings.isNullOrEmpty(storageDir)) {
+      storageDir = ozoneConf.get(OzoneConfigKeys
+          .OZONE_METADATA_DIRS);
+      Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " +
+          "cannot be null, Please check your configs.");
+      storageDir = storageDir.concat(ratisDir);
+      LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " +
+              "storage under {}. It is a good idea to map this to an SSD disk.",
+          storageDir);
+    }
+
+    // Get an available port on current node and
+    // use that as the container port
+    if (ozoneConf.getBoolean(OzoneConfigKeys
+            .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
+      try (ServerSocket socket = new ServerSocket()) {
+        socket.setReuseAddress(true);
+        SocketAddress address = new InetSocketAddress(0);
+        socket.bind(address);
+        localPort = socket.getLocalPort();
+        LOG.info("Found a free port for the server : {}", localPort);
+        // If we have random local ports configured this means that it
+        // probably running under MiniOzoneCluster. Ratis locks the storage
+        // directories, so we need to pass different local directory for each
+        // local instance. So we map ratis directories under datanode ID.
+        storageDir =
+            storageDir.concat(File.separator +
+                datanodeDetails.getUuidString());
+      } catch (IOException e) {
+        LOG.error("Unable find a random free port for the server, "
+            + "fallback to use default port {}", localPort, e);
+      }
+    }
+    datanodeDetails.setRatisPort(localPort);
+    return new XceiverServerRatis(datanodeDetails, localPort, storageDir,
+        dispatcher, ozoneConf);
+  }
+
+  @Override
+  public void start() throws IOException {
+    LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
+        server.getId(), getIPCPort());
+    writeChunkExecutor.prestartAllCoreThreads();
+    server.start();
+  }
+
+  @Override
+  public void stop() {
+    try {
+      writeChunkExecutor.shutdown();
+      server.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public int getIPCPort() {
+    return port;
+  }
+
+  /**
+   * Returns the Replication type supported by this end-point.
+   *
+   * @return enum -- {Stand_Alone, Ratis, Chained}
+   */
+  @Override
+  public HddsProtos.ReplicationType getServerType() {
+    return HddsProtos.ReplicationType.RATIS;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
new file mode 100644
index 0000000..8debfe0
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+/**
+ * This package contains classes for the server implementation
+ * using Apache Ratis
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
new file mode 100644
index 0000000..6ae45b6
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.MapIterator;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * container cache is a LRUMap that maintains the DB handles.
+ */
+public final class ContainerCache extends LRUMap {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerCache.class);
+  private final Lock lock = new ReentrantLock();
+  private static ContainerCache cache;
+  private static final float LOAD_FACTOR = 0.75f;
+  /**
+   * Constructs a cache that holds DBHandle references.
+   */
+  private ContainerCache(int maxSize, float loadFactor, boolean
+      scanUntilRemovable) {
+    super(maxSize, loadFactor, scanUntilRemovable);
+  }
+
+  /**
+   * Return a singleton instance of {@link ContainerCache}
+   * that holds the DB handlers.
+   *
+   * @param conf - Configuration.
+   * @return A instance of {@link ContainerCache}.
+   */
+  public synchronized static ContainerCache getInstance(Configuration conf) {
+    if (cache == null) {
+      int cacheSize = conf.getInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE,
+          OzoneConfigKeys.OZONE_CONTAINER_CACHE_DEFAULT);
+      cache = new ContainerCache(cacheSize, LOAD_FACTOR, true);
+    }
+    return cache;
+  }
+
+  /**
+   * Closes a db instance.
+   *
+   * @param container - name of the container to be closed.
+   * @param db - db instance to close.
+   */
+  private void closeDB(String container, MetadataStore db) {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        LOG.error("Error closing DB. Container: " + container, e);
+      }
+    }
+  }
+
+  /**
+   * Closes all the db instances and resets the cache.
+   */
+  public void shutdownCache() {
+    lock.lock();
+    try {
+      // iterate the cache and close each db
+      MapIterator iterator = cache.mapIterator();
+      while (iterator.hasNext()) {
+        iterator.next();
+        MetadataStore db = (MetadataStore) iterator.getValue();
+        closeDB(iterator.getKey().toString(), db);
+      }
+      // reset the cache
+      cache.clear();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected boolean removeLRU(LinkEntry entry) {
+    lock.lock();
+    try {
+      MetadataStore db = (MetadataStore) entry.getValue();
+      closeDB(entry.getKey().toString(), db);
+    } finally {
+      lock.unlock();
+    }
+    return true;
+  }
+
+  /**
+   * Returns a DB handle if available, create the handler otherwise.
+   *
+   * @param containerName - Name of the container.
+   * @return MetadataStore.
+   */
+  public MetadataStore getDB(String containerName, String containerDBPath)
+      throws IOException {
+    Preconditions.checkNotNull(containerName);
+    Preconditions.checkState(!containerName.isEmpty());
+    lock.lock();
+    try {
+      MetadataStore db = (MetadataStore) this.get(containerName);
+
+      if (db == null) {
+        db = MetadataStoreBuilder.newBuilder()
+            .setDbFile(new File(containerDBPath))
+            .setCreateIfMissing(false)
+            .build();
+        this.put(containerName, db);
+      }
+      return db;
+    } catch (Exception e) {
+      LOG.error("Error opening DB. Container:{} ContainerPath:{}",
+          containerName, containerDBPath, e);
+      throw e;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Remove a DB handler from cache.
+   *
+   * @param containerName - Name of the container.
+   */
+  public void removeDB(String containerName) {
+    Preconditions.checkNotNull(containerName);
+    Preconditions.checkState(!containerName.isEmpty());
+    lock.lock();
+    try {
+      MetadataStore db = (MetadataStore)this.get(containerName);
+      closeDB(containerName, db);
+      this.remove(containerName);
+    } finally {
+      lock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java
new file mode 100644
index 0000000..08264f0
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.utils;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message