hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject hadoop git commit: HDFS-11865. Ozone: Do not initialize Ratis cluster during datanode startup.
Date Sun, 28 May 2017 07:25:35 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 e641bee7b -> c1d714d93


HDFS-11865. Ozone: Do not initialize Ratis cluster during datanode startup.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c1d714d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c1d714d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c1d714d9

Branch: refs/heads/HDFS-7240
Commit: c1d714d93338dbc172114bf176cc821850cb5a65
Parents: e641bee
Author: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Authored: Sun May 28 15:19:32 2017 +0800
Committer: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Committed: Sun May 28 15:19:32 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/scm/XceiverClientRatis.java   | 28 +-----
 .../main/java/org/apache/ratis/RatisHelper.java | 93 ++++++++++++++++++++
 .../java/org/apache/ratis/package-info.java     | 22 +++++
 .../com/google/protobuf/package-info.java       | 22 +++++
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  7 +-
 .../server/ratis/XceiverServerRatis.java        | 40 ++++-----
 .../transport/server/ratis/package-info.java    | 23 +++++
 .../apache/hadoop/ozone/MiniOzoneCluster.java   | 29 +++---
 .../apache/hadoop/ozone/RatisTestHelper.java    | 11 +--
 .../ozone/container/ContainerTestHelper.java    | 26 +++++-
 .../ozoneimpl/TestOzoneContainerRatis.java      | 34 +++++--
 .../transport/server/TestContainerServer.java   | 29 ++++--
 hadoop-project/pom.xml                          |  2 +-
 13 files changed, 272 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
index 738a588..a0ad24e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
@@ -22,13 +22,10 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.ratis.RatisHelper;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
@@ -36,10 +33,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 /**
  * An abstract implementation of {@link XceiverClientSpi} using Ratis.
@@ -67,24 +62,6 @@ public final  class XceiverClientRatis implements XceiverClientSpi {
     this.rpcType = rpcType;
   }
 
-  static RaftClient newRaftClient(Pipeline pipeline, RpcType rpcType) {
-    final List<RaftPeer> peers = pipeline.getMachines().stream()
-        .map(dn -> dn.getXferAddr())
-        .map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
-        .collect(Collectors.toList());
-
-    final RaftProperties properties = new RaftProperties();
-    final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
-        properties, null));
-
-    return RaftClient.newBuilder()
-        .setClientRpc(factory.newRaftClientRpc())
-        .setServers(peers)
-        .setLeaderId(new RaftPeerId(pipeline.getLeader().getXferAddr()))
-        .setProperties(properties)
-        .build();
-  }
-
   @Override
   public Pipeline getPipeline() {
     return pipeline;
@@ -92,7 +69,8 @@ public final  class XceiverClientRatis implements XceiverClientSpi {
 
   @Override
   public void connect() throws Exception {
-    if (!client.compareAndSet(null, newRaftClient(pipeline, rpcType))) {
+    if (!client.compareAndSet(null,
+        RatisHelper.newRaftClient(rpcType, getPipeline()))) {
       throw new IllegalStateException("Client is already connected.");
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
new file mode 100644
index 0000000..bedd9a8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ratis;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.RpcType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Ratis helper methods.
+ */
+public interface RatisHelper {
+  Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
+
+  static String toRaftPeerIdString(DatanodeID id) {
+    return id.getIpAddr() + ":" + id.getContainerPort();
+  }
+
+  static RaftPeerId toRaftPeerId(DatanodeID id) {
+    return RaftPeerId.valueOf(toRaftPeerIdString(id));
+  }
+
+  static RaftPeer toRaftPeer(String id) {
+    return new RaftPeer(RaftPeerId.valueOf(id), id);
+  }
+
+  static RaftPeer toRaftPeer(DatanodeID id) {
+    return toRaftPeer(toRaftPeerIdString(id));
+  }
+
+  static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
+    return pipeline.getMachines().stream()
+        .map(RatisHelper::toRaftPeer)
+        .collect(Collectors.toList());
+  }
+
+  static RaftPeer[] toRaftPeerArray(Pipeline pipeline) {
+    return toRaftPeers(pipeline).toArray(RaftPeer.EMPTY_PEERS);
+  }
+
+  static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
+    return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
+        toRaftPeers(pipeline));
+  }
+
+  static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {
+    return newRaftClient(rpcType, leader.getId(),
+        new ArrayList<>(Arrays.asList(leader)));
+  }
+
+  static RaftClient newRaftClient(
+      RpcType rpcType, RaftPeerId leader, List<RaftPeer> peers) {
+    LOG.trace("newRaftClient: {}, leader={}, peers={}", rpcType, leader, peers);
+    final RaftProperties properties = new RaftProperties();
+    final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
+        properties, null));
+
+    return RaftClient.newBuilder()
+        .setClientRpc(factory.newRaftClientRpc())
+        .setServers(peers)
+        .setLeaderId(leader)
+        .setProperties(properties)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java
new file mode 100644
index 0000000..c13c20c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+/**
+ * This package contains classes related to Apache Ratis.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
new file mode 100644
index 0000000..032dd96
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.shaded.com.google.protobuf;
+
+/**
+ * This package contains classes related to the shaded protobuf in Apache Ratis.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index feca620..7489054 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -83,10 +83,9 @@ public final class OzoneConfigKeys {
       = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY;
   public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
       = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT;
-  public static final String DFS_CONTAINER_RATIS_CONF =
-      "dfs.container.ratis.conf";
-  public static final String DFS_CONTAINER_RATIS_DATANODE_ADDRESS =
-      "dfs.container.ratis.datanode.address";
+  /** A unique ID to identify a Ratis server. */
+  public static final String DFS_CONTAINER_RATIS_SERVER_ID =
+      "dfs.container.ratis.server.id";
   public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
       "dfs.container.ratis.datanode.storage.dir";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 4c82ac2..69f3801 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -18,9 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -28,23 +26,25 @@ import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.Objects;
 
 /**
  * Creates a ratis server endpoint that acts as the communication layer for
  * Ozone containers.
  */
 public final class XceiverServerRatis implements XceiverServerSpi {
+  static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
+
   static RaftProperties newRaftProperties(
       RpcType rpc, int port, String storageDir) {
     final RaftProperties properties = new RaftProperties();
@@ -62,37 +62,31 @@ public final class XceiverServerRatis implements XceiverServerSpi {
       Configuration ozoneConf, ContainerDispatcher dispatcher)
       throws IOException {
     final String id = ozoneConf.get(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS);
-    final Collection<String> servers = ozoneConf.getStringCollection(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID);
+    final int port = ozoneConf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     final String storageDir = ozoneConf.get(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
     final String rpcType = ozoneConf.get(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
     final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
-    return new XceiverServerRatis(id, servers, storageDir, dispatcher, rpc);
+    return new XceiverServerRatis(id, port, storageDir, dispatcher, rpc);
   }
 
   private final int port;
   private final RaftServer server;
 
   private XceiverServerRatis(
-      String id, Collection<String> servers, String storageDir,
+      String id, int port, String storageDir,
       ContainerDispatcher dispatcher, RpcType rpcType) throws IOException {
-    Preconditions.checkArgument(servers.contains(id),
-        "%s is not one of %s specified in %s",
-        id, servers, OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
-
-    final List<RaftPeer> peers = servers.stream()
-        .map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
-        .collect(Collectors.toList());
-
-    this.port = NetUtils.createSocketAddr(id).getPort();
+    Objects.requireNonNull(id, "id == null");
+    this.port = port;
 
     this.server = RaftServer.newBuilder()
-        .setServerId(new RaftPeerId(id))
-        .setPeers(peers)
+        .setServerId(RaftPeerId.valueOf(id))
+        .setPeers(Collections.emptyList())
         .setProperties(newRaftProperties(rpcType, port, storageDir))
         .setStateMachine(new ContainerStateMachine(dispatcher))
         .build();
@@ -100,6 +94,8 @@ public final class XceiverServerRatis implements XceiverServerSpi {
 
   @Override
   public void start() throws IOException {
+    LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
+        server.getId(), getIPCPort());
     server.start();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
new file mode 100644
index 0000000..8debfe0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+/**
+ * This package contains classes for the server implementation
+ * using Apache Ratis
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 173b911..5cfcaff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
 import org.apache.hadoop.ozone.ksm.KeySpaceManager;
 import org.apache.hadoop.scm.ScmConfigKeys;
@@ -102,12 +103,11 @@ public final class MiniOzoneCluster extends MiniDFSCluster
     if (!useRatis) {
       return;
     }
-    final String[] ids = dnConf.getStrings(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
-    // TODO: use the i-th raft server as the i-th datanode address
-    //       this only work for one Raft cluster
-    setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS,
-        ids[i]);
+    final String address = ContainerTestHelper.createLocalAddress();
+    setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID,
+        address);
+    setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+        String.valueOf(NetUtils.createSocketAddr(address).getPort()));
     setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
         getInstanceStorageDir(i, -1).getCanonicalPath());
   }
@@ -206,16 +206,13 @@ public final class MiniOzoneCluster extends MiniDFSCluster
    */
   public void waitOzoneReady() throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> {
-      if (scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY)
-          >= numDataNodes) {
-        return true;
-      }
-      LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.",
-          scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY),
-          numDataNodes);
-
-      return false;
-    }, 1000, 5 * 60 * 1000); //wait for 5 mins.
+      final int healthy = scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY);
+      final boolean isReady = healthy >= numDataNodes;
+      LOG.info("{}. Got {} of {} DN Heartbeats.",
+            isReady? "Cluster is ready" : "Waiting for cluster to be ready",
+            healthy, numDataNodes);
+      return isReady;
+    }, 1000, 60 * 1000); //wait for 1 min.
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index d56fad2..89664eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -26,7 +26,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.stream.Collectors;
 
 /**
  * Helpers for Ratis tests.
@@ -34,23 +33,17 @@ import java.util.stream.Collectors;
 public interface RatisTestHelper {
   Logger LOG = LoggerFactory.getLogger(RatisTestHelper.class);
 
-  static void initRatisConf(
-      RpcType rpc, Pipeline pipeline, Configuration conf) {
+  static void initRatisConf(RpcType rpc, Configuration conf) {
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true);
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name());
     LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY
         + " = " + rpc.name());
-    final String s = pipeline.getMachines().stream()
-            .map(dn -> dn.getXferAddr())
-            .collect(Collectors.joining(","));
-    conf.setStrings(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF, s);
-    LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF + " = " + s);
   }
 
   static XceiverClientRatis newXceiverClientRatis(
       RpcType rpcType, Pipeline pipeline, OzoneConfiguration conf)
       throws IOException {
-    initRatisConf(rpcType, pipeline, conf);
+    initRatisConf(rpcType, conf);
     return XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index a1abfeb..6db7621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -80,6 +80,11 @@ public final class ContainerTestHelper {
     return createPipeline(containerName, 1);
   }
 
+  public static String createLocalAddress() throws IOException {
+    try(ServerSocket s = new ServerSocket(0)) {
+      return "127.0.0.1:" + s.getLocalPort();
+    }
+  }
   public static DatanodeID createDatanodeID() throws IOException {
     ServerSocket socket = new ServerSocket(0);
     int port = socket.getLocalPort();
@@ -100,13 +105,26 @@ public final class ContainerTestHelper {
   public static Pipeline createPipeline(String containerName, int numNodes)
       throws IOException {
     Preconditions.checkArgument(numNodes >= 1);
-    final DatanodeID leader = createDatanodeID();
-    Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
+    final List<DatanodeID> ids = new ArrayList<>(numNodes);
+    for(int i = 0; i < numNodes; i++) {
+      ids.add(createDatanodeID());
+    }
+    return createPipeline(containerName, ids);
+  }
+
+  public static Pipeline createPipeline(
+      String containerName, Iterable<DatanodeID> ids)
+      throws IOException {
+    Objects.requireNonNull(ids, "ids == null");
+    final Iterator<DatanodeID> i = ids.iterator();
+    Preconditions.checkArgument(i.hasNext());
+    final DatanodeID leader = i.next();
+    final Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
     pipeline.setContainerName(containerName);
     pipeline.addMember(leader);
 
-    for(int i = 1; i < numNodes; i++) {
-      pipeline.addMember(createDatanodeID());
+    for(; i.hasNext();) {
+      pipeline.addMember(i.next());
     }
     return pipeline;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java
index 2662909..3adb881 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerRatis.java
@@ -18,23 +18,31 @@
 
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.RatisTestHelper;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.ratis.RatisHelper;
+import org.apache.hadoop.scm.XceiverClientRatis;
 import org.apache.hadoop.scm.XceiverClientSpi;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.util.CheckedBiConsumer;
+import org.apache.ratis.util.CollectionUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 /**
  * Tests ozone containers with Apache Ratis.
  */
@@ -78,19 +86,31 @@ public class TestOzoneContainerRatis {
       throws Exception {
     LOG.info(testName + "(rpc=" + rpc + ", numNodes=" + numNodes);
 
+    // create Ozone clusters
     final OzoneConfiguration conf = newOzoneConfiguration();
-    final String containerName = OzoneUtils.getRequestID();
-    final Pipeline pipeline = ContainerTestHelper.createPipeline(
-        containerName, numNodes);
-    final XceiverClientSpi client = RatisTestHelper.newXceiverClientRatis(
-        rpc, pipeline, conf);
-
+    RatisTestHelper.initRatisConf(rpc, conf);
     final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
         .setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL)
-        .numDataNodes(pipeline.getMachines().size())
+        .numDataNodes(numNodes)
         .build();
     cluster.waitOzoneReady();
 
+    final String containerName = OzoneUtils.getRequestID();
+    final List<DataNode> datanodes = cluster.getDataNodes();
+    final Pipeline pipeline = ContainerTestHelper.createPipeline(containerName,
+        CollectionUtils.as(datanodes, DataNode::getDatanodeId));
+
+    LOG.info("pipeline=" + pipeline);
+    // Create Ratis cluster
+    final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline);
+    for(RaftPeer p : peers) {
+      final RaftClient client = RatisHelper.newRaftClient(rpc, p);
+      client.reinitialize(peers, p.getId());
+    }
+
+    LOG.info("reinitialize done");
+    final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis(
+        pipeline, conf);
     try {
       test.accept(containerName, client);
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
index 5fc6a7c..ad64cae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
@@ -40,7 +40,11 @@ import org.apache.hadoop.scm.XceiverClientRatis;
 import org.apache.hadoop.scm.XceiverClientSpi;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.util.CheckedBiConsumer;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -92,7 +96,8 @@ public class TestContainerServer {
         (pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
             pipeline.getLeader().getContainerPort()),
         XceiverClient::new,
-        (dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher()));
+        (dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher()),
+        (dn, p) -> {});
   }
 
   @FunctionalInterface
@@ -115,7 +120,8 @@ public class TestContainerServer {
   static XceiverServerRatis newXceiverServerRatis(
       DatanodeID dn, OzoneConfiguration conf) throws IOException {
     final String id = dn.getXferAddr();
-    conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS, id);
+    conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID, id);
+    conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, dn.getContainerPort());
     final String dir = TEST_DIR + id.replace(':', '_');
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
 
@@ -123,13 +129,22 @@ public class TestContainerServer {
     return XceiverServerRatis.newXceiverServerRatis(conf, dispatcher);
   }
 
+  static void initXceiverServerRatis(
+      RpcType rpc, DatanodeID id, Pipeline pipeline) throws IOException {
+    final RaftPeer p = RatisHelper.toRaftPeer(id);
+    final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline);
+    final RaftClient client = RatisHelper.newRaftClient(rpc, p);
+    client.reinitialize(peers, p.getId());
+  }
+
+
   static void runTestClientServerRatis(RpcType rpc, int numNodes)
       throws Exception {
     runTestClientServer(numNodes,
-        (pipeline, conf) -> RatisTestHelper.initRatisConf(
-            rpc, pipeline, conf),
+        (pipeline, conf) -> RatisTestHelper.initRatisConf(rpc, conf),
         XceiverClientRatis::newXceiverClientRatis,
-        TestContainerServer::newXceiverServerRatis);
+        TestContainerServer::newXceiverServerRatis,
+        (dn, p) -> initXceiverServerRatis(rpc, dn, p));
   }
 
   static void runTestClientServer(
@@ -138,7 +153,8 @@ public class TestContainerServer {
       CheckedBiFunction<Pipeline, OzoneConfiguration, XceiverClientSpi,
           IOException> createClient,
       CheckedBiFunction<DatanodeID, OzoneConfiguration, XceiverServerSpi,
-          IOException> createServer)
+          IOException> createServer,
+      CheckedBiConsumer<DatanodeID, Pipeline, IOException> initServer)
       throws Exception {
     final List<XceiverServerSpi> servers = new ArrayList<>();
     XceiverClientSpi client = null;
@@ -153,6 +169,7 @@ public class TestContainerServer {
         final XceiverServerSpi s = createServer.apply(dn, conf);
         servers.add(s);
         s.start();
+        initServer.accept(dn, pipeline);
       }
 
       client = createClient.apply(pipeline, conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1d714d9/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index a5e7720..7d8460a 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -99,7 +99,7 @@
     <ldap-api.version>1.0.0-M33</ldap-api.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.1-SNAPSHOT</ratis.version>
+    <ratis.version>0.1.1-alpha-SNAPSHOT</ratis.version>
 
     <!-- define the Java language version used by the compiler -->
     <javac.version>1.8</javac.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message