hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject hadoop git commit: HDFS-13116. Ozone: Refactor Pipeline to have transport and container specific information. Contributed by Mukul Kumar Singh.
Date Sat, 10 Feb 2018 01:38:45 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 3c9a9a117 -> ea5c79285


HDFS-13116. Ozone: Refactor Pipeline to have transport and container specific information. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ea5c7928
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ea5c7928
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ea5c7928

Branch: refs/heads/HDFS-7240
Commit: ea5c79285d6724ddcd0181e341759c3bdbd9e55c
Parents: 3c9a9a1
Author: Anu Engineer <aengineer@apache.org>
Authored: Fri Feb 9 17:17:11 2018 -0800
Committer: Anu Engineer <aengineer@apache.org>
Committed: Fri Feb 9 17:17:11 2018 -0800

----------------------------------------------------------------------
 .../scm/container/common/helpers/Pipeline.java  | 131 +++----------
 .../common/helpers/PipelineChannel.java         | 121 ++++++++++++
 .../src/main/proto/Ozone.proto                  |  20 +-
 .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java |   6 +-
 .../ozone/scm/pipelines/PipelineManager.java    | 127 +++++++++++-
 .../ozone/scm/pipelines/PipelineSelector.java   |  23 ++-
 .../scm/pipelines/ratis/RatisManagerImpl.java   | 191 ++++---------------
 .../standalone/StandaloneManagerImpl.java       |  60 +++---
 .../hadoop/cblock/TestCBlockReadWrite.java      |   9 +-
 .../hadoop/ozone/TestMiniOzoneCluster.java      |  12 +-
 .../ozone/container/ContainerTestHelper.java    |  38 ++--
 .../common/impl/TestContainerPersistence.java   |  12 +-
 .../ozone/scm/block/TestDeletedBlockLog.java    |  12 +-
 .../ozone/scm/node/TestContainerPlacement.java  |  27 ---
 14 files changed, 418 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java
index 9aadec2..9d47646 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java
@@ -27,17 +27,15 @@ import com.fasterxml.jackson.databind.ObjectWriter;
 import com.fasterxml.jackson.databind.ser.FilterProvider;
 import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
 import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 
 /**
  * A pipeline represents the group of machines over which a container lives.
@@ -48,7 +46,7 @@ public class Pipeline {
 
   static {
     ObjectMapper mapper = new ObjectMapper();
-    String[] ignorableFieldNames = {"data", "leaderID", "datanodes"};
+    String[] ignorableFieldNames = {"data"};
     FilterProvider filters = new SimpleFilterProvider()
         .addFilter(PIPELINE_INFO, SimpleBeanPropertyFilter
             .serializeAllExcept(ignorableFieldNames));
@@ -60,25 +58,22 @@ public class Pipeline {
   }
 
   private String containerName;
-  private String leaderID;
-  private Map<String, DatanodeID> datanodes;
-  private OzoneProtos.LifeCycleState lifeCycleState;
-  private OzoneProtos.ReplicationType type;
-  private OzoneProtos.ReplicationFactor factor;
-  private String pipelineName;
+  private PipelineChannel pipelineChannel;
   /**
    * Allows you to maintain private data on pipelines. This is not serialized
    * via protobuf, just allows us to maintain some private data.
    */
+  @JsonIgnore
   private byte[] data;
   /**
    * Constructs a new pipeline data structure.
    *
-   * @param leaderID - First machine in this pipeline.
+   * @param containerName - Container
+   * @param pipelineChannel - transport information for this container
    */
-  public Pipeline(String leaderID) {
-    this.leaderID = leaderID;
-    datanodes = new TreeMap<>();
+  public Pipeline(String containerName, PipelineChannel pipelineChannel) {
+    this.containerName = containerName;
+    this.pipelineChannel = pipelineChannel;
     data = null;
   }
 
@@ -90,36 +85,13 @@ public class Pipeline {
    */
   public static Pipeline getFromProtoBuf(OzoneProtos.Pipeline pipeline) {
     Preconditions.checkNotNull(pipeline);
-    Pipeline newPipeline = new Pipeline(pipeline.getLeaderID());
-    for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) {
-      newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID));
-    }
-
-    newPipeline.setContainerName(pipeline.getContainerName());
-    newPipeline.setLifeCycleState(pipeline.getState());
-    newPipeline.setType(pipeline.getType());
-    newPipeline.setFactor(pipeline.getFactor());
-    if (pipeline.hasPipelineName()) {
-      newPipeline.setPipelineName(pipeline.getPipelineName());
-    }
-    return newPipeline;
+    PipelineChannel pipelineChannel =
+        PipelineChannel.getFromProtoBuf(pipeline.getPipelineChannel());
+    return new Pipeline(pipeline.getContainerName(), pipelineChannel);
   }
 
   public OzoneProtos.ReplicationFactor getFactor() {
-    return factor;
-  }
-
-  public void setFactor(OzoneProtos.ReplicationFactor factor) {
-    this.factor = factor;
-  }
-
-  /**
-   * Adds a member to the pipeline.
-   *
-   * @param dataNodeId - Datanode to be added.
-   */
-  public void addMember(DatanodeID dataNodeId) {
-    datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId);
+    return pipelineChannel.getFactor();
   }
 
   /**
@@ -129,7 +101,7 @@ public class Pipeline {
    */
   @JsonIgnore
   public DatanodeID getLeader() {
-    return datanodes.get(leaderID);
+    return pipelineChannel.getDatanodes().get(pipelineChannel.getLeaderID());
   }
 
   /**
@@ -138,7 +110,8 @@ public class Pipeline {
    * @return First Machine.
    */
   public String getLeaderHost() {
-    return datanodes.get(leaderID).getHostName();
+    return pipelineChannel.getDatanodes()
+        .get(pipelineChannel.getLeaderID()).getHostName();
   }
 
   /**
@@ -148,7 +121,7 @@ public class Pipeline {
    */
   @JsonIgnore
   public List<DatanodeID> getMachines() {
-    return new ArrayList<>(datanodes.values());
+    return new ArrayList<>(pipelineChannel.getDatanodes().values());
   }
 
   /**
@@ -158,7 +131,7 @@ public class Pipeline {
    */
   public List<String> getDatanodeHosts() {
     List<String> dataHosts = new ArrayList<>();
-    for (DatanodeID id : datanodes.values()) {
+    for (DatanodeID id : pipelineChannel.getDatanodes().values()) {
       dataHosts.add(id.getHostName());
     }
     return dataHosts;
@@ -173,22 +146,8 @@ public class Pipeline {
   public OzoneProtos.Pipeline getProtobufMessage() {
     OzoneProtos.Pipeline.Builder builder =
         OzoneProtos.Pipeline.newBuilder();
-    for (DatanodeID datanode : datanodes.values()) {
-      builder.addMembers(datanode.getProtoBufMessage());
-    }
-    builder.setLeaderID(leaderID);
     builder.setContainerName(this.containerName);
-
-    if (this.getLifeCycleState() != null) {
-      builder.setState(this.getLifeCycleState());
-    }
-    if (this.getType() != null) {
-      builder.setType(this.getType());
-    }
-
-    if (this.getFactor() != null) {
-      builder.setFactor(this.getFactor());
-    }
+    builder.setPipelineChannel(this.pipelineChannel.getProtobufMessage());
     return builder.build();
   }
 
@@ -202,15 +161,6 @@ public class Pipeline {
   }
 
   /**
-   * Sets the container Name.
-   *
-   * @param containerName - Name of the container.
-   */
-  public void setContainerName(String containerName) {
-    this.containerName = containerName;
-  }
-
-  /**
    * Returns private data that is set on this pipeline.
    *
    * @return blob, the user can interpret it any way they like.
@@ -223,6 +173,11 @@ public class Pipeline {
     }
   }
 
+  @VisibleForTesting
+  public PipelineChannel getPipelineChannel() {
+    return pipelineChannel;
+  }
+
   /**
    * Set private data on pipeline.
    *
@@ -240,16 +195,7 @@ public class Pipeline {
    * @return - LifeCycleStates.
    */
   public OzoneProtos.LifeCycleState getLifeCycleState() {
-    return lifeCycleState;
-  }
-
-  /**
-   * Sets the lifecycleState.
-   *
-   * @param lifeCycleStates - Enum
-   */
-  public void setLifeCycleState(OzoneProtos.LifeCycleState lifeCycleStates) {
-    this.lifeCycleState = lifeCycleStates;
+    return pipelineChannel.getLifeCycleState();
   }
 
   /**
@@ -258,16 +204,7 @@ public class Pipeline {
    * @return - Name of the pipeline
    */
   public String getPipelineName() {
-    return pipelineName;
-  }
-
-  /**
-   * Sets the pipeline name.
-   *
-   * @param pipelineName - Sets the name.
-   */
-  public void setPipelineName(String pipelineName) {
-    this.pipelineName = pipelineName;
+    return pipelineChannel.getName();
   }
 
   /**
@@ -276,24 +213,16 @@ public class Pipeline {
    * @return type - Standalone, Ratis, Chained.
    */
   public OzoneProtos.ReplicationType getType() {
-    return type;
-  }
-
-  /**
-   * Sets the type of this pipeline.
-   *
-   * @param type - Standalone, Ratis, Chained.
-   */
-  public void setType(OzoneProtos.ReplicationType type) {
-    this.type = type;
+    return pipelineChannel.getType();
   }
 
   @Override
   public String toString() {
     final StringBuilder b = new StringBuilder(getClass().getSimpleName())
         .append("[");
-    datanodes.keySet().stream()
-        .forEach(id -> b.append(id.endsWith(leaderID) ? "*" + id : id));
+    pipelineChannel.getDatanodes().keySet().stream()
+        .forEach(id -> b.
+            append(id.endsWith(pipelineChannel.getLeaderID()) ? "*" + id : id));
     b.append("] container:").append(containerName);
     b.append(" name:").append(getPipelineName());
     if (getType() != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/PipelineChannel.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/PipelineChannel.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/PipelineChannel.java
new file mode 100644
index 0000000..0af0534
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/PipelineChannel.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.scm.container.common.helpers;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * PipelineChannel information for a {@link Pipeline}.
+ */
+public class PipelineChannel {
+  @JsonIgnore
+  private String leaderID;
+  @JsonIgnore
+  private Map<String, DatanodeID> datanodes;
+  private LifeCycleState lifeCycleState;
+  private ReplicationType type;
+  private ReplicationFactor factor;
+  private String name;
+
+  public PipelineChannel(String leaderID, LifeCycleState lifeCycleState,
+      ReplicationType replicationType, ReplicationFactor replicationFactor,
+      String name) {
+    this.leaderID = leaderID;
+    this.lifeCycleState = lifeCycleState;
+    this.type = replicationType;
+    this.factor = replicationFactor;
+    this.name = name;
+    datanodes = new TreeMap<>();
+  }
+
+  public String getLeaderID() {
+    return leaderID;
+  }
+
+  public Map<String, DatanodeID> getDatanodes() {
+    return datanodes;
+  }
+
+  public LifeCycleState getLifeCycleState() {
+    return lifeCycleState;
+  }
+
+  public ReplicationType getType() {
+    return type;
+  }
+
+  public ReplicationFactor getFactor() {
+    return factor;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void addMember(DatanodeID dataNodeId) {
+    datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId);
+  }
+
+  @JsonIgnore
+  public OzoneProtos.PipelineChannel getProtobufMessage() {
+    OzoneProtos.PipelineChannel.Builder builder =
+        OzoneProtos.PipelineChannel.newBuilder();
+    for (DatanodeID datanode : datanodes.values()) {
+      builder.addMembers(datanode.getProtoBufMessage());
+    }
+    builder.setLeaderID(leaderID);
+
+    if (this.getLifeCycleState() != null) {
+      builder.setState(this.getLifeCycleState());
+    }
+    if (this.getType() != null) {
+      builder.setType(this.getType());
+    }
+
+    if (this.getFactor() != null) {
+      builder.setFactor(this.getFactor());
+    }
+    return builder.build();
+  }
+
+  public static PipelineChannel getFromProtoBuf(
+      OzoneProtos.PipelineChannel transportProtos) {
+    Preconditions.checkNotNull(transportProtos);
+    PipelineChannel pipelineChannel =
+        new PipelineChannel(transportProtos.getLeaderID(),
+            transportProtos.getState(),
+            transportProtos.getType(),
+            transportProtos.getFactor(),
+            transportProtos.getName());
+
+    for (HdfsProtos.DatanodeIDProto dataID : transportProtos.getMembersList()) {
+      pipelineChannel.addMember(DatanodeID.getFromProtoBuf(dataID));
+    }
+    return pipelineChannel;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
index 7e0c954..0cc1482 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
@@ -29,16 +29,20 @@ option java_generate_equals_and_hash = true;
 package hadoop.hdfs.ozone;
 import "hdfs.proto";
 
-
-// A pipeline is composed of one or more datanodes that back a container.
-message Pipeline {
+message PipelineChannel {
     required string leaderID = 1;
     repeated DatanodeIDProto members = 2;
-    required string containerName = 3;
-    optional LifeCycleState state = 4 [default = OPEN];
-    optional ReplicationType type = 5 [default = STAND_ALONE];
-    optional ReplicationFactor factor = 6 [default = ONE];
-    optional string pipelineName = 7;
+    optional LifeCycleState state = 3 [default = OPEN];
+    optional ReplicationType type = 4 [default = STAND_ALONE];
+    optional ReplicationFactor factor = 5 [default = ONE];
+    optional string name = 6;
+}
+
+// A pipeline is composed of PipelineChannel (Ratis/StandAlone) that back a
+// container.
+message Pipeline {
+    required string containerName = 1;
+    required PipelineChannel pipelineChannel = 2;
 }
 
 message KeyValue {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index 21c437d..6152c55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -532,10 +532,12 @@ public class SQLCLI  extends Configured implements Tool {
       Pipeline pipeline, Set<String> uuidChecked) throws SQLException {
     LOG.info("Insert to sql container db, for container {}", containerName);
     String insertContainerInfo = String.format(
-        INSERT_CONTAINER_INFO, containerName, pipeline.getLeaderID());
+        INSERT_CONTAINER_INFO, containerName,
+        pipeline.getPipelineChannel().getLeaderID());
     executeSQL(conn, insertContainerInfo);
 
-    for (HdfsProtos.DatanodeIDProto dnID : pipeline.getMembersList()) {
+    for (HdfsProtos.DatanodeIDProto dnID :
+        pipeline.getPipelineChannel().getMembersList()) {
       String uuid = dnID.getDatanodeUuid();
       if (!uuidChecked.contains(uuid)) {
         // we may also not use this checked set, but catch exception instead

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
index 6293d84..28e5267 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
@@ -18,16 +18,32 @@ package org.apache.hadoop.ozone.scm.pipelines;
 
 
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Manage Ozone pipelines.
  */
-public interface PipelineManager {
+public abstract class PipelineManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineManager.class);
+  private final List<PipelineChannel> activePipelineChannels;
+  private final AtomicInteger conduitsIndex;
+
+  public PipelineManager() {
+    activePipelineChannels = new LinkedList<>();
+    conduitsIndex = new AtomicInteger(0);
+  }
 
   /**
    * This function is called by the Container Manager while allocating a new
@@ -39,31 +55,122 @@ public interface PipelineManager {
    * @param replicationFactor - Replication Factor
    * @return a Pipeline.
    */
-  Pipeline getPipeline(String containerName,
-      OzoneProtos.ReplicationFactor replicationFactor) throws IOException;
+  public synchronized final Pipeline getPipeline(String containerName,
+      ReplicationFactor replicationFactor, ReplicationType replicationType)
+      throws IOException {
+    /**
+     * In the Ozone world, we have a very simple policy.
+     *
+     * 1. Try to create a pipelineChannel if there are enough free nodes.
+     *
+     * 2. This allows all nodes to part of a pipelineChannel quickly.
+     *
+     * 3. if there are not enough free nodes, return conduits in a
+     * round-robin fashion.
+     *
+     * TODO: Might have to come up with a better algorithm than this.
+     * Create a new placement policy that returns conduits in round robin
+     * fashion.
+     */
+    PipelineChannel pipelineChannel =
+        allocatePipelineChannel(replicationFactor);
+    if (pipelineChannel != null) {
+      LOG.debug("created new pipelineChannel:{} for container:{}",
+          pipelineChannel.getName(), containerName);
+      activePipelineChannels.add(pipelineChannel);
+    } else {
+      pipelineChannel =
+          findOpenPipelineChannel(replicationType, replicationFactor);
+      if (pipelineChannel != null) {
+        LOG.debug("re-used pipelineChannel:{} for container:{}",
+            pipelineChannel.getName(), containerName);
+      }
+    }
+    if (pipelineChannel == null) {
+      LOG.error("Get pipelineChannel call failed. We are not able to find" +
+              "free nodes or operational pipelineChannel.");
+      return null;
+    } else {
+      return new Pipeline(containerName, pipelineChannel);
+    }
+  }
+
+  protected int getReplicationCount(ReplicationFactor factor) {
+    switch (factor) {
+    case ONE:
+      return 1;
+    case THREE:
+      return 3;
+    default:
+      throw new IllegalArgumentException("Unexpected replication count");
+    }
+  }
+
+  public abstract PipelineChannel allocatePipelineChannel(
+      ReplicationFactor replicationFactor) throws IOException;
+
+  /**
+   * Find a PipelineChannel that is operational.
+   *
+   * @return - Pipeline or null
+   */
+  private PipelineChannel findOpenPipelineChannel(
+      ReplicationType type, ReplicationFactor factor) {
+    PipelineChannel pipelineChannel = null;
+    final int sentinal = -1;
+    if (activePipelineChannels.size() == 0) {
+      LOG.error("No Operational conduits found. Returning null.");
+      return null;
+    }
+    int startIndex = getNextIndex();
+    int nextIndex = sentinal;
+    for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
+      // Just walk the list in a circular way.
+      PipelineChannel temp =
+          activePipelineChannels
+              .get(nextIndex != sentinal ? nextIndex : startIndex);
+      // if we find an operational pipelineChannel just return that.
+      if ((temp.getLifeCycleState() == LifeCycleState.OPEN) &&
+          (temp.getFactor() == factor) && (temp.getType() == type)) {
+        pipelineChannel = temp;
+        break;
+      }
+    }
+    return pipelineChannel;
+  }
+
+  /**
+   * gets the next index of the PipelineChannel to get.
+   *
+   * @return index in the link list to get.
+   */
+  private int getNextIndex() {
+    return conduitsIndex.incrementAndGet() % activePipelineChannels.size();
+  }
 
   /**
    * Creates a pipeline from a specified set of Nodes.
    * @param pipelineID - Name of the pipeline
    * @param datanodes - The list of datanodes that make this pipeline.
    */
-  void createPipeline(String pipelineID, List<DatanodeID> datanodes)
-      throws IOException;;
+  public abstract void createPipeline(String pipelineID,
+      List<DatanodeID> datanodes) throws IOException;
 
   /**
    * Close the  pipeline with the given clusterId.
    */
-  void closePipeline(String pipelineID) throws IOException;
+  public abstract void closePipeline(String pipelineID) throws IOException;
 
   /**
    * list members in the pipeline .
    * @return the datanode
    */
-  List<DatanodeID> getMembers(String pipelineID) throws IOException;
+  public abstract List<DatanodeID> getMembers(String pipelineID)
+      throws IOException;
 
   /**
    * Update the datanode list of the pipeline.
    */
-  void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes)
-      throws IOException;
+  public abstract void updatePipeline(String pipelineID,
+      List<DatanodeID> newDatanodes) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
index 15d17ff..7759f9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
 import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
 import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
@@ -29,6 +31,7 @@ import org.apache.hadoop.ozone.scm.pipelines.ratis.RatisManagerImpl;
 import org.apache.hadoop.ozone.scm.pipelines.standalone.StandaloneManagerImpl;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,20 +83,19 @@ public class PipelineSelector {
    * The first of the list will be the leader node.
    * @return pipeline corresponding to nodes
    */
-  public static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes) {
+  public static PipelineChannel newPipelineFromNodes(List<DatanodeID> nodes,
+      LifeCycleState state, ReplicationType replicationType,
+      ReplicationFactor replicationFactor, String name) {
     Preconditions.checkNotNull(nodes);
     Preconditions.checkArgument(nodes.size() > 0);
     String leaderId = nodes.get(0).getDatanodeUuid();
-    Pipeline pipeline = new Pipeline(leaderId);
+    PipelineChannel
+        pipelineChannel = new PipelineChannel(leaderId, state, replicationType,
+        replicationFactor, name);
     for (DatanodeID node : nodes) {
-      pipeline.addMember(node);
+      pipelineChannel.addMember(node);
     }
-
-    // A Standalone pipeline is always open, no action from the client
-    // is needed to make it open.
-    pipeline.setType(ReplicationType.STAND_ALONE);
-    pipeline.setLifeCycleState(OzoneProtos.LifeCycleState.OPEN);
-    return pipeline;
+    return pipelineChannel;
   }
 
   /**
@@ -167,7 +169,8 @@ public class PipelineSelector {
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     LOG.debug("Getting replication pipeline for {} : Replication {}",
         containerName, replicationFactor.toString());
-    return manager.getPipeline(containerName, replicationFactor);
+    return manager.
+        getPipeline(containerName, replicationFactor, replicationType);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
index fb02172..16659e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
@@ -19,7 +19,10 @@ package org.apache.hadoop.ozone.scm.pipelines.ratis;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
 import org.apache.hadoop.ozone.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
@@ -27,6 +30,7 @@ import org.apache.hadoop.ozone.scm.pipelines.PipelineManager;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.scm.XceiverClientRatis;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,28 +40,19 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
-    .LifeCycleState.OPEN;
-
 
 /**
  * Implementation of {@link PipelineManager}.
  *
  * TODO : Introduce a state machine.
  */
-public class RatisManagerImpl implements PipelineManager {
+public class RatisManagerImpl extends PipelineManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(RatisManagerImpl.class);
-  private final NodeManager nodeManager;
-  private final ContainerPlacementPolicy placementPolicy;
-  private final long containerSize;
-  private final Set<DatanodeID> ratisMembers;
-  private final List<Pipeline> activePipelines;
-  private final AtomicInteger pipelineIndex;
   private static final String PREFIX = "Ratis-";
   private final Configuration conf;
+  private final NodeManager nodeManager;
+  private final Set<DatanodeID> ratisMembers;
 
   /**
    * Constructs a Ratis Pipeline Manager.
@@ -66,147 +61,22 @@ public class RatisManagerImpl implements PipelineManager {
    */
   public RatisManagerImpl(NodeManager nodeManager,
       ContainerPlacementPolicy placementPolicy, long size, Configuration conf) {
+    super();
+    this.conf = conf;
     this.nodeManager = nodeManager;
-    this.placementPolicy = placementPolicy;
-    this.containerSize = size;
     ratisMembers = new HashSet<>();
-    activePipelines = new LinkedList<>();
-    pipelineIndex = new AtomicInteger(0);
-    this.conf = conf;
-  }
-
-  /**
-   * This function is called by the Container Manager while allocation a new
-   * container. The client specifies what kind of replication pipeline is needed
-   * and based on the replication type in the request appropriate Interface is
-   * invoked.
-   *
-   * @param containerName Name of the container
-   * @param replicationFactor - Replication Factor
-   * @return a Pipeline.
-   * <p>
-   * TODO: Evaulate if we really need this lock. Right now favoring safety over
-   * speed.
-   */
-  @Override
-  public synchronized Pipeline getPipeline(String containerName,
-      OzoneProtos.ReplicationFactor replicationFactor) throws IOException {
-    /**
-     * In the ratis world, we have a very simple policy.
-     *
-     * 1. Try to create a pipeline if there are enough free nodes.
-     *
-     * 2. This allows all nodes to part of a pipeline quickly.
-     *
-     * 3. if there are not enough free nodes, return pipelines in a
-     * round-robin fashion.
-     *
-     * TODO: Might have to come up with a better algorithm than this.
-     * Create a new placement policy that returns pipelines in round robin
-     * fashion.
-     */
-    Pipeline pipeline = null;
-    List<DatanodeID> newNodes = allocatePipelineNodes(replicationFactor);
-    if (newNodes != null) {
-      Preconditions.checkState(newNodes.size() ==
-          getReplicationCount(replicationFactor), "Replication factor " +
-          "does not match the expected node count.");
-      pipeline =
-          allocateRatisPipeline(newNodes, containerName, replicationFactor);
-      try (XceiverClientRatis client =
-          XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-        client
-            .createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
-      }
-    } else {
-      Pipeline openPipeline = findOpenPipeline(replicationFactor);
-      if (openPipeline != null) {
-        // if an open pipeline is found use the same machines
-        pipeline = allocateRatisPipeline(openPipeline.getMachines(),
-            containerName, replicationFactor);
-      }
-    }
-    if (pipeline == null) {
-      LOG.error("Get pipeline call failed. We are not able to find free nodes" +
-          " or operational pipeline.");
-    }
-    return pipeline;
-  }
-
-  /**
-   * Find a pipeline that is operational.
-   *
-   * @return - Pipeline or null
-   */
-  Pipeline findOpenPipeline(OzoneProtos.ReplicationFactor factor) {
-    Pipeline pipeline = null;
-    final int sentinal = -1;
-    if (activePipelines.size() == 0) {
-      LOG.error("No Operational pipelines found. Returning null.");
-      return pipeline;
-    }
-    int startIndex = getNextIndex();
-    int nextIndex = sentinal;
-    for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
-      // Just walk the list in a circular way.
-      Pipeline temp =
-          activePipelines.get(nextIndex != sentinal ? nextIndex : startIndex);
-      // if we find an operational pipeline just return that.
-      if ((temp.getLifeCycleState() == OPEN) && (temp.getFactor() == factor)) {
-        pipeline = temp;
-        break;
-      }
-    }
-    return pipeline;
-  }
-
-  /**
-   * Allocate a new Ratis pipeline from the existing nodes.
-   *
-   * @param nodes - list of Nodes.
-   * @param containerName - container Name
-   * @return - Pipeline.
-   */
-  Pipeline allocateRatisPipeline(List<DatanodeID> nodes, String containerName,
-      OzoneProtos.ReplicationFactor factor) {
-    Preconditions.checkNotNull(nodes);
-    Pipeline pipeline = PipelineSelector.newPipelineFromNodes(nodes);
-    if (pipeline != null) {
-      // Start all pipeline names with "Ratis", easy to grep the logs.
-      String pipelineName = PREFIX +
-          UUID.randomUUID().toString().substring(PREFIX.length());
-      pipeline.setType(OzoneProtos.ReplicationType.RATIS);
-      pipeline.setLifeCycleState(OPEN);
-      pipeline.setFactor(factor);
-      pipeline.setPipelineName(pipelineName);
-      pipeline.setContainerName(containerName);
-      LOG.info("Creating new ratis pipeline: {}", pipeline.toString());
-      activePipelines.add(pipeline);
-    }
-    return pipeline;
-  }
-
-  /**
-   * gets the next index of in the pipelines to get.
-   *
-   * @return index in the link list to get.
-   */
-  private int getNextIndex() {
-    return pipelineIndex.incrementAndGet() % activePipelines.size();
   }
 
   /**
-   * Allocates a set of new nodes for the Ratis pipeline.
+   * Allocates a new ratis PipelineChannel from the free nodes.
    *
-   * @param replicationFactor - One or Three
-   * @return List of Datanodes.
+   * @param factor - One or Three
+   * @return PipelineChannel.
    */
-  private List<DatanodeID> allocatePipelineNodes(OzoneProtos.ReplicationFactor
-      replicationFactor) {
+  public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
     List<DatanodeID> newNodesList = new LinkedList<>();
-    List<DatanodeID> datanodes =
-        nodeManager.getNodes(OzoneProtos.NodeState.HEALTHY);
-    int count = getReplicationCount(replicationFactor);
+    List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
+    int count = getReplicationCount(factor);
     //TODO: Add Raft State to the Nodes, so we can query and skip nodes from
     // data from datanode instead of maintaining a set.
     for (DatanodeID datanode : datanodes) {
@@ -217,25 +87,28 @@ public class RatisManagerImpl implements PipelineManager {
           // once a datanode has been added to a pipeline, exclude it from
           // further allocations
           ratisMembers.addAll(newNodesList);
-          LOG.info("Allocating a new pipeline of size: {}", count);
-          return newNodesList;
+          LOG.info("Allocating a new pipelineChannel of size: {}", count);
+          // Start all channel names with "Ratis", easy to grep the logs.
+          String conduitName = PREFIX +
+              UUID.randomUUID().toString().substring(PREFIX.length());
+          PipelineChannel pipelineChannel =
+              PipelineSelector.newPipelineFromNodes(newNodesList,
+              LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName);
+          Pipeline pipeline =
+              new Pipeline("setup", pipelineChannel);
+          try (XceiverClientRatis client =
+              XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+            client.createPipeline(pipeline.getPipelineName(), newNodesList);
+          } catch (IOException e) {
+            return null;
+          }
+          return pipelineChannel;
         }
       }
     }
     return null;
   }
 
-  private int getReplicationCount(OzoneProtos.ReplicationFactor factor) {
-    switch (factor) {
-    case ONE:
-      return 1;
-    case THREE:
-      return 3;
-    default:
-      throw new IllegalArgumentException("Unexpected replication count");
-    }
-  }
-
   /**
    * Creates a pipeline from a specified set of Nodes.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
index 2ec7d7c..a2e6439 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -16,30 +16,38 @@
  */
 package org.apache.hadoop.ozone.scm.pipelines.standalone;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
 import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineManager;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.LinkedList;
 
 /**
  * Standalone Manager Impl to prove that pluggable interface
  * works with current tests.
  */
-public class StandaloneManagerImpl implements PipelineManager {
+public class StandaloneManagerImpl extends PipelineManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(StandaloneManagerImpl.class);
   private final NodeManager nodeManager;
   private final ContainerPlacementPolicy placementPolicy;
   private final long containerSize;
+  private final Set<DatanodeID> standAloneMembers;
 
   /**
    * Constructor for Standalone Node Manager Impl.
@@ -49,34 +57,42 @@ public class StandaloneManagerImpl implements PipelineManager {
    */
   public StandaloneManagerImpl(NodeManager nodeManager,
       ContainerPlacementPolicy placementPolicy, long containerSize) {
+    super();
     this.nodeManager = nodeManager;
     this.placementPolicy = placementPolicy;
     this.containerSize =  containerSize;
+    this.standAloneMembers = new HashSet<>();
   }
 
 
   /**
-   * This function is called by the Container Manager while allocating a new
-   * container. The client specifies what kind of replication pipeline is needed
-   * and based on the replication type in the request appropriate Interface is
-   * invoked.
+   * Allocates a new standalone PipelineChannel from the free nodes.
    *
-   * @param containerName Name of the container
-   * @param replicationFactor - Replication Factor
-   * @return a Pipeline.
+   * @param factor - One
+   * @return PipelineChannel.
    */
-  @Override
-  public Pipeline getPipeline(String containerName, OzoneProtos
-      .ReplicationFactor replicationFactor) throws IOException {
-    List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
-        replicationFactor.getNumber(), containerSize);
-    Pipeline pipeline = PipelineSelector.newPipelineFromNodes(datanodes);
-    String pipelineName = "SA-" + UUID.randomUUID().toString().substring(3);
-    pipeline.setContainerName(containerName);
-    pipeline.setPipelineName(pipelineName);
-    pipeline.setFactor(replicationFactor);
-    LOG.info("Creating new standalone pipeline: {}", pipeline.toString());
-    return pipeline;
+  public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
+    List<DatanodeID> newNodesList = new LinkedList<>();
+    List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
+    int count = getReplicationCount(factor);
+    for (DatanodeID datanode : datanodes) {
+      Preconditions.checkNotNull(datanode);
+      if (!standAloneMembers.contains(datanode)) {
+        newNodesList.add(datanode);
+        if (newNodesList.size() == count) {
+          // once a datanode has been added to a pipeline, exclude it from
+          // further allocations
+          standAloneMembers.addAll(newNodesList);
+          LOG.info("Allocating a new pipeline channel of size: {}", count);
+          String channelName =
+              "SA-" + UUID.randomUUID().toString().substring(3);
+          return PipelineSelector.newPipelineFromNodes(newNodesList,
+              LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
+              ReplicationFactor.ONE, channelName);
+        }
+      }
+    }
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
index 70793f4..cdd65c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockReadWrite.java
@@ -29,8 +29,12 @@ import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;
@@ -301,7 +305,10 @@ public class TestCBlockReadWrite {
     String data = RandomStringUtils.random(4 * KB);
 
     List<Pipeline> fakeContainerPipelines = new LinkedList<>();
-    Pipeline fakePipeline = new Pipeline("fake");
+    PipelineChannel pipelineChannel = new PipelineChannel("fake",
+        LifeCycleState.OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+        "fake");
+    Pipeline fakePipeline = new Pipeline("fake", pipelineChannel);
     fakePipeline.setData(Longs.toByteArray(1));
     fakeContainerPipelines.add(fakePipeline);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index 946b7e0..65633fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -27,9 +27,11 @@ import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.XceiverClient;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.test.TestGenericTestUtils;
@@ -105,9 +107,13 @@ public class TestMiniOzoneCluster {
       // Create a single member pipe line
       String containerName = OzoneUtils.getRequestID();
       DatanodeID dnId = dn.getDatanodeId();
-      Pipeline pipeline = new Pipeline(dnId.getDatanodeUuid());
-      pipeline.addMember(dnId);
-      pipeline.setContainerName(containerName);
+      final PipelineChannel pipelineChannel =
+          new PipelineChannel(dnId.getDatanodeUuid(),
+              OzoneProtos.LifeCycleState.OPEN,
+              OzoneProtos.ReplicationType.STAND_ALONE,
+              OzoneProtos.ReplicationFactor.ONE, "test");
+      pipelineChannel.addMember(dnId);
+      Pipeline pipeline = new Pipeline(containerName, pipelineChannel);
 
       // Verify client is able to connect to the container
       try (XceiverClient client = new XceiverClient(pipeline, conf)){

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index e76f986..13d751c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -32,7 +32,11 @@ import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -119,16 +123,15 @@ public final class ContainerTestHelper {
     final Iterator<DatanodeID> i = ids.iterator();
     Preconditions.checkArgument(i.hasNext());
     final DatanodeID leader = i.next();
-    final Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
-    pipeline.setContainerName(containerName);
-    pipeline.addMember(leader);
-    pipeline.setFactor(OzoneProtos.ReplicationFactor.ONE);
-    pipeline.setType(OzoneProtos.ReplicationType.STAND_ALONE);
-
+    String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3);
+    final PipelineChannel pipelineChannel =
+        new PipelineChannel(leader.getDatanodeUuid(), LifeCycleState.OPEN,
+            ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
+    pipelineChannel.addMember(leader);
     for(; i.hasNext();) {
-      pipeline.addMember(i.next());
+      pipelineChannel.addMember(i.next());
     }
-    return pipeline;
+    return new Pipeline(containerName, pipelineChannel);
   }
 
   /**
@@ -193,8 +196,9 @@ public final class ContainerTestHelper {
         ContainerProtos.WriteChunkRequestProto
             .newBuilder();
 
-    pipeline.setContainerName(containerName);
-    writeRequest.setPipeline(pipeline.getProtobufMessage());
+    Pipeline newPipeline =
+        new Pipeline(containerName, pipeline.getPipelineChannel());
+    writeRequest.setPipeline(newPipeline.getProtobufMessage());
     writeRequest.setKeyName(keyName);
 
     byte[] data = getData(datalen);
@@ -209,7 +213,7 @@ public final class ContainerTestHelper {
     request.setCmdType(ContainerProtos.Type.WriteChunk);
     request.setWriteChunk(writeRequest);
     request.setTraceID(UUID.randomUUID().toString());
-    request.setDatanodeID(pipeline.getLeader().getDatanodeUuid().toString());
+    request.setDatanodeID(newPipeline.getLeader().getDatanodeUuid());
 
     return request.build();
   }
@@ -228,7 +232,8 @@ public final class ContainerTestHelper {
       throws Exception {
     ContainerProtos.PutSmallFileRequestProto.Builder smallFileRequest =
         ContainerProtos.PutSmallFileRequestProto.newBuilder();
-    pipeline.setContainerName(containerName);
+    Pipeline newPipeline =
+        new Pipeline(containerName, pipeline.getPipelineChannel());
     byte[] data = getData(dataLen);
     ChunkInfo info = getChunk(keyName, 0, 0, dataLen);
     setDataChecksum(info, data);
@@ -237,7 +242,7 @@ public final class ContainerTestHelper {
     ContainerProtos.PutKeyRequestProto.Builder putRequest =
         ContainerProtos.PutKeyRequestProto.newBuilder();
 
-    putRequest.setPipeline(pipeline.getProtobufMessage());
+    putRequest.setPipeline(newPipeline.getProtobufMessage());
     KeyData keyData = new KeyData(containerName, keyName);
 
     List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
@@ -254,7 +259,7 @@ public final class ContainerTestHelper {
     request.setCmdType(ContainerProtos.Type.PutSmallFile);
     request.setPutSmallFile(smallFileRequest);
     request.setTraceID(UUID.randomUUID().toString());
-    request.setDatanodeID(pipeline.getLeader().getDatanodeUuid());
+    request.setDatanodeID(newPipeline.getLeader().getDatanodeUuid());
     return request.build();
   }
 
@@ -390,8 +395,7 @@ public final class ContainerTestHelper {
     containerData.setName(containerName);
     String[] keys = metaData.keySet().toArray(new String[]{});
     for(int i=0; i<keys.length; i++) {
-      OzoneProtos.KeyValue.Builder kvBuilder =
-          OzoneProtos.KeyValue.newBuilder();
+      KeyValue.Builder kvBuilder = KeyValue.newBuilder();
       kvBuilder.setKey(keys[i]);
       kvBuilder.setValue(metaData.get(keys[i]));
       containerData.addMetadata(i, kvBuilder.build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 6902890..72cfa03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -371,18 +371,19 @@ public class TestContainerPersistence {
       Pipeline pipeline) throws IOException,
       NoSuchAlgorithmException {
     final int datalen = 1024;
-    pipeline.setContainerName(containerName);
+    Pipeline newPipeline =
+        new Pipeline(containerName, pipeline.getPipelineChannel());
     ContainerData cData = new ContainerData(containerName, conf);
     cData.addMetadata("VOLUME", "shire");
     cData.addMetadata("owner", "bilbo");
     if(!containerManager.getContainerMap()
         .containsKey(containerName)) {
-      containerManager.createContainer(pipeline, cData);
+      containerManager.createContainer(newPipeline, cData);
     }
     ChunkInfo info = getChunk(keyName, 0, 0, datalen);
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
-    chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
+    chunkManager.writeChunk(newPipeline, keyName, info, data, COMBINED);
     return info;
 
   }
@@ -420,7 +421,6 @@ public class TestContainerPersistence {
     Pipeline pipeline = createSingleNodePipeline(containerName);
     Map<String, ChunkInfo> fileHashMap = new HashMap<>();
 
-    pipeline.setContainerName(containerName);
     ContainerData cData = new ContainerData(containerName, conf);
     cData.addMetadata("VOLUME", "shire");
     cData.addMetadata("owner)", "bilbo");
@@ -484,7 +484,6 @@ public class TestContainerPersistence {
     String keyName = OzoneUtils.getRequestID();
     Pipeline pipeline = createSingleNodePipeline(containerName);
 
-    pipeline.setContainerName(containerName);
     ContainerData cData = new ContainerData(containerName, conf);
     cData.addMetadata("VOLUME", "shire");
     cData.addMetadata("owner)", "bilbo");
@@ -519,7 +518,6 @@ public class TestContainerPersistence {
     String keyName = OzoneUtils.getRequestID();
     Pipeline pipeline = createSingleNodePipeline(containerName);
 
-    pipeline.setContainerName(containerName);
     ContainerData cData = new ContainerData(containerName, conf);
     cData.addMetadata("VOLUME", "shire");
     cData.addMetadata("owner)", "bilbo");
@@ -562,7 +560,6 @@ public class TestContainerPersistence {
     String keyName = OzoneUtils.getRequestID();
     Pipeline pipeline = createSingleNodePipeline(containerName);
 
-    pipeline.setContainerName(containerName);
     ContainerData cData = new ContainerData(containerName, conf);
     cData.addMetadata("VOLUME", "shire");
     cData.addMetadata("owner)", "bilbo");
@@ -601,7 +598,6 @@ public class TestContainerPersistence {
     String keyName = OzoneUtils.getRequestID();
     Pipeline pipeline = createSingleNodePipeline(containerName);
 
-    pipeline.setContainerName(containerName);
     ContainerData cData = new ContainerData(containerName, conf);
     cData.addMetadata("VOLUME", "shire");
     cData.addMetadata("owner)", "bilbo");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
index c4006e5..f0bf4c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
@@ -21,10 +21,14 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.ozone.scm.container.ContainerMapping;
 import org.apache.hadoop.ozone.scm.container.Mapping;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
 import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -328,12 +332,14 @@ public class TestDeletedBlockLog {
 
   private void mockContainerInfo(Mapping mappingService, String containerName,
       DatanodeID dnID) throws IOException {
-    Pipeline pipeline = new Pipeline("fake");
-    pipeline.addMember(dnID);
+    PipelineChannel pipelineChannel =
+        new PipelineChannel("fake", LifeCycleState.OPEN,
+            ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake");
+    pipelineChannel.addMember(dnID);
+    Pipeline pipeline = new Pipeline(containerName, pipelineChannel);
 
     ContainerInfo.Builder builder = new ContainerInfo.Builder();
     builder.setPipeline(pipeline);
-    builder.setContainerName(containerName);
 
     ContainerInfo conatinerInfo = builder.build();
     Mockito.doReturn(conatinerInfo).when(mappingService)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea5c7928/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
index bbc24d9..98a96dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
@@ -54,7 +54,6 @@ import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
     .HEALTHY;
 import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
 import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
-import static org.hamcrest.core.StringStartsWith.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -164,32 +163,6 @@ public class TestContainerPlacement {
           .getPipeline();
       assertEquals(xceiverClientManager.getFactor().getNumber(),
           pipeline1.getMachines().size());
-
-      final long newUsed = 7L * OzoneConsts.GB;
-      final long newRemaining = capacity - newUsed;
-
-      for (DatanodeID datanodeID : datanodes) {
-        SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
-        SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
-        srb.setStorageUuid(UUID.randomUUID().toString());
-        srb.setCapacity(capacity).setScmUsed(newUsed).
-            setRemaining(newRemaining).build();
-        nodeManager.sendHeartbeat(datanodeID,
-            nrb.addStorageReport(srb).build(), reportState);
-      }
-
-      GenericTestUtils.waitFor(() -> nodeManager.getStats().getRemaining()
-              .get() == nodeCount * newRemaining,
-          100, 4 * 1000);
-
-      thrown.expect(IOException.class);
-      thrown.expectMessage(
-          startsWith("Unable to find enough nodes that meet "
-              + "the space requirement"));
-      String container2 = UUID.randomUUID().toString();
-      containerManager.allocateContainer(xceiverClientManager.getType(),
-          xceiverClientManager.getFactor(), container2,
-          "OZONE");
     } finally {
       IOUtils.closeQuietly(containerManager);
       IOUtils.closeQuietly(nodeManager);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message