This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 917ac9f HDDS-972. Add support for configuring multiple OMs. Contributed by Hanisha Koneru.
917ac9f is described below
commit 917ac9f108fa980a90720ba87e5e5b35d39aa358
Author: Bharat Viswanadham <bharat@apache.org>
AuthorDate: Tue Feb 12 22:07:27 2019 -0800
HDDS-972. Add support for configuring multiple OMs. Contributed by Hanisha Koneru.
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 3 +
.../common/src/main/resources/ozone-default.xml | 106 +++++---
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 65 +++++
.../ozone/OzoneIllegalArgumentException.java | 40 +++
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 17 +-
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 23 ++
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 30 ++-
.../hadoop/ozone/MiniOzoneHAClusterImpl.java | 261 +++++++++++++++++++
.../hadoop/ozone/TestOzoneConfigurationFields.java | 2 +
.../apache/hadoop/ozone/om/TestOzoneManager.java | 2 +-
.../ozone/om/TestOzoneManagerConfiguration.java | 290 +++++++++++++++++++++
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 156 +++++++++++
.../org/apache/hadoop/ozone/om/OMNodeDetails.java | 107 ++++++++
.../org/apache/hadoop/ozone/om/OzoneManager.java | 285 +++++++++++++++++---
.../ozone/om/ratis/OzoneManagerRatisClient.java | 35 +--
.../ozone/om/ratis/OzoneManagerRatisServer.java | 139 ++++++----
.../om/ratis/TestOzoneManagerRatisServer.java | 23 +-
17 files changed, 1427 insertions(+), 157 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index ca319f6..f44acfd 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -268,4 +268,7 @@ public final class OzoneConsts {
Metadata.Key.of(OZONE_BLOCK_TOKEN, ASCII_STRING_MARSHALLER);
public static final Metadata.Key<String> USER_METADATA_KEY =
Metadata.Key.of(OZONE_USER, ASCII_STRING_MARSHALLER);
+
+ // Default OMServiceID for OM Ratis servers to use as RaftGroupId
+ public static final String OM_SERVICE_ID_DEFAULT = "om-service-value";
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 9c655e5..819062c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -455,6 +455,48 @@
</description>
</property>
<property>
+ <name>ozone.om.service.ids</name>
+ <value></value>
+ <tag>OM, HA</tag>
+ <description>
+ Comma-separated list of OM service Ids.
+
+ If not set, the default value of "om-service-value" is assigned as the
+ OM service ID.
+ </description>
+ </property>
+ <property>
+ <name>ozone.om.nodes.EXAMPLEOMSERVICEID</name>
+ <value></value>
+ <tag>OM, HA</tag>
+ <description>
+ Comma-separated list of OM node Ids for a given OM service ID (eg.
+ EXAMPLEOMSERVICEID). The OM service ID should be the value (one of the
+ values if there are multiple) set for the parameter ozone.om.service.ids.
+
+ Unique identifiers for each OM Node, delimited by commas. This will be
+ used by OzoneManagers in HA setup to determine all the OzoneManagers
+ belonging to the same OMservice in the cluster. For example, if you
+ used “omService1” as the OM service ID previously, and you wanted to
+ use “om1”, “om2” and "om3" as the individual IDs of the OzoneManagers,
+ you would configure a property ozone.om.nodes.omService1, and its value
+ "om1,om2,om3".
+ </description>
+ </property>
+ <property>
+ <name>ozone.om.node.id</name>
+ <value></value>
+ <tag>OM, HA</tag>
+ <description>
+ The ID of this OM node. If the OM node ID is not configured it
+ is determined automatically by matching the local node's address
+ with the configured address.
+
+ If node ID is not deterministic from the configuration, then it is set
+ to the OmId from the OM version file.
+ </description>
+ </property>
+ <property>
<name>ozone.om.address</name>
<value/>
<tag>OM, REQUIRED</tag>
@@ -1453,15 +1495,6 @@
</property>
<property>
- <name>ozone.om.ratis.random.port</name>
- <value>false</value>
- <tag>OZONE, OM, RATIS, DEBUG</tag>
- <description>Allocates a random free port for OM's Ratis server. This is
- used only while running unit tests.
- </description>
- </property>
-
- <property>
<name>ozone.om.ratis.rpc.type</name>
<value>GRPC</value>
<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
@@ -1543,28 +1576,39 @@
<description>The timeout duration for OM Ratis client request.
</description>
</property>
- <property>
- <name>ozone.om.ratis.client.request.max.retries</name>
- <value>180</value>
- <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
- <description>Number of retries for OM client request.</description>
- </property>
- <property>
- <name>ozone.om.ratis.client.request.retry.interval</name>
- <value>100ms</value>
- <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
- <description>Interval between successive retries for a OM client request.
- </description>
- </property>
-
- <property>
- <name>ozone.om.leader.election.minimum.timeout.duration</name>
- <value>1s</value>
- <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
- <description>The minimum timeout duration for OM ratis leader election.
- Default is 1s.
- </description>
- </property>
+ <property>
+ <name>ozone.om.ratis.client.request.max.retries</name>
+ <value>180</value>
+ <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+ <description>Number of retries for OM client request.</description>
+ </property>
+ <property>
+ <name>ozone.om.ratis.client.request.retry.interval</name>
+ <value>100ms</value>
+ <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+ <description>Interval between successive retries for a OM client request.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.om.leader.election.minimum.timeout.duration</name>
+ <value>1s</value>
+ <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+ <description>The minimum timeout duration for OM ratis leader election.
+ Default is 1s.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.om.ratis.server.failure.timeout.duration</name>
+ <value>120s</value>
+ <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+ <description>The timeout duration for ratis server failure detection,
+ once the threshold has reached, the ratis state machine will be informed
+ about the failure in the ratis ring.
+ </description>
+ </property>
+
<property>
<name>ozone.acl.authorizer.class</name>
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index ef0e88c..82b9f6a 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -17,12 +17,15 @@
package org.apache.hadoop.ozone;
+import com.google.common.base.Joiner;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Optional;
import org.apache.commons.lang3.RandomStringUtils;
@@ -38,7 +41,9 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_BIND_HOST_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -191,4 +196,64 @@ public final class OmUtils {
"This could possibly indicate a faulty JRE");
}
}
+
+ /**
+ * Add non empty and non null suffix to a key.
+ */
+ private static String addSuffix(String key, String suffix) {
+ if (suffix == null || suffix.isEmpty()) {
+ return key;
+ }
+ assert !suffix.startsWith(".") :
+ "suffix '" + suffix + "' should not already have '.' prepended.";
+ return key + "." + suffix;
+ }
+
+ /**
+ * Concatenate list of suffix strings '.' separated.
+ */
+ private static String concatSuffixes(String... suffixes) {
+ if (suffixes == null) {
+ return null;
+ }
+ return Joiner.on(".").skipNulls().join(suffixes);
+ }
+
+ /**
+ * Return configuration key of format key.suffix1.suffix2...suffixN.
+ */
+ public static String addKeySuffixes(String key, String... suffixes) {
+ String keySuffix = concatSuffixes(suffixes);
+ return addSuffix(key, keySuffix);
+ }
+
+ /**
+ * Match input address to local address.
+ * Return true if it matches, false otherwsie.
+ */
+ public static boolean isAddressLocal(InetSocketAddress addr) {
+ return NetUtils.isLocalAddress(addr.getAddress());
+ }
+
+ /**
+ * Get a collection of all omNodeIds for the given omServiceId.
+ */
+ public static Collection<String> getOMNodeIds(Configuration conf,
+ String omServiceId) {
+ String key = addSuffix(OZONE_OM_NODES_KEY, omServiceId);
+ return conf.getTrimmedStringCollection(key);
+ }
+
+ /**
+ * @return <code>coll</code> if it is non-null and non-empty. Otherwise,
+ * returns a list with a single null value.
+ */
+ public static Collection<String> emptyAsSingletonNull(Collection<String>
+ coll) {
+ if (coll == null || coll.isEmpty()) {
+ return Collections.singletonList(null);
+ } else {
+ return coll;
+ }
+ }
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneIllegalArgumentException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneIllegalArgumentException.java
new file mode 100644
index 0000000..e732dc2
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneIllegalArgumentException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Indicates that a method has been passed illegal or invalid argument. This
+ * exception is thrown instead of IllegalArgumentException to differentiate the
+ * exception thrown in Hadoop implementation from the one thrown in JDK.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class OzoneIllegalArgumentException extends IllegalArgumentException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs exception with the specified detail message.
+ * @param message detailed message.
+ */
+ public OzoneIllegalArgumentException(final String message) {
+ super(message);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 867a3e1..ba6211c 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -40,6 +40,13 @@ public final class OMConfigKeys {
"ozone.om.handler.count.key";
public static final int OZONE_OM_HANDLER_COUNT_DEFAULT = 20;
+ public static final String OZONE_OM_SERVICE_IDS_KEY =
+ "ozone.om.service.ids";
+ public static final String OZONE_OM_NODES_KEY =
+ "ozone.om.nodes";
+ public static final String OZONE_OM_NODE_ID_KEY =
+ "ozone.om.node.id";
+
public static final String OZONE_OM_ADDRESS_KEY =
"ozone.om.address";
public static final String OZONE_OM_BIND_HOST_DEFAULT =
@@ -101,11 +108,6 @@ public final class OMConfigKeys {
= "ozone.om.ratis.port";
public static final int OZONE_OM_RATIS_PORT_DEFAULT
= 9872;
- // When set to true, allocate a random free port for ozone ratis server
- public static final String OZONE_OM_RATIS_RANDOM_PORT_KEY =
- "ozone.om.ratis.random.port";
- public static final boolean OZONE_OM_RATIS_RANDOM_PORT_KEY_DEFAULT
- = false;
public static final String OZONE_OM_RATIS_RPC_TYPE_KEY
= "ozone.om.ratis.rpc.type";
public static final String OZONE_OM_RATIS_RPC_TYPE_DEFAULT
@@ -175,6 +177,11 @@ public final class OMConfigKeys {
public static final TimeDuration
OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
TimeDuration.valueOf(1, TimeUnit.SECONDS);
+ public static final String OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY
+ = "ozone.om.ratis.server.failure.timeout.duration";
+ public static final TimeDuration
+ OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+ = TimeDuration.valueOf(120, TimeUnit.SECONDS);
public static final String OZONE_OM_KERBEROS_KEYTAB_FILE_KEY = "ozone.om."
+ "kerberos.keytab.file";
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index b348b50..7b65c46 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -52,6 +52,17 @@ public interface MiniOzoneCluster {
}
/**
+ * Returns the Builder to construct MiniOzoneHACluster.
+ *
+ * @param conf OzoneConfiguration
+ *
+ * @return MiniOzoneCluster builder
+ */
+ static Builder newHABuilder(OzoneConfiguration conf) {
+ return new MiniOzoneHAClusterImpl.Builder(conf);
+ }
+
+ /**
* Returns the configuration object associated with the MiniOzoneCluster.
*
* @return Configuration
@@ -223,6 +234,8 @@ public interface MiniOzoneCluster {
protected final String path;
protected String clusterId;
+ protected String omServiceId;
+ protected int numOfOMs;
protected Optional<Boolean> enableTrace = Optional.of(false);
protected Optional<Integer> hbInterval = Optional.empty();
@@ -416,6 +429,16 @@ public interface MiniOzoneCluster {
return this;
}
+ public Builder setNumOfOzoneManagers(int numOMs) {
+ this.numOfOMs = numOMs;
+ return this;
+ }
+
+ public Builder setOMServiceId(String serviceId) {
+ this.omServiceId = serviceId;
+ return this;
+ }
+
/**
* Constructs and returns MiniOzoneCluster.
*
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ab322a2..cc48116 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -83,14 +83,14 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
* StorageContainerManager and multiple DataNodes.
*/
@InterfaceAudience.Private
-public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
+public class MiniOzoneClusterImpl implements MiniOzoneCluster {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneClusterImpl.class);
private final OzoneConfiguration conf;
private StorageContainerManager scm;
- private final OzoneManager ozoneManager;
+ private OzoneManager ozoneManager;
private final List<HddsDatanodeService> hddsDatanodes;
// Timeout for the cluster to be ready
@@ -101,7 +101,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
*
* @throws IOException if there is an I/O error
*/
- private MiniOzoneClusterImpl(OzoneConfiguration conf,
+ MiniOzoneClusterImpl(OzoneConfiguration conf,
OzoneManager ozoneManager,
StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes) {
@@ -111,6 +111,20 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
this.hddsDatanodes = hddsDatanodes;
}
+ /**
+ * Creates a new MiniOzoneCluster without the OzoneManager. This is used by
+ * {@link MiniOzoneHAClusterImpl} for starting multiple OzoneManagers.
+ * @param conf
+ * @param scm
+ * @param hddsDatanodes
+ */
+ MiniOzoneClusterImpl(OzoneConfiguration conf, StorageContainerManager scm,
+ List<HddsDatanodeService> hddsDatanodes) {
+ this.conf = conf;
+ this.scm = scm;
+ this.hddsDatanodes = hddsDatanodes;
+ }
+
public OzoneConfiguration getConf() {
return conf;
}
@@ -394,11 +408,11 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
}
/**
- * Initializes the configureation required for starting MiniOzoneCluster.
+ * Initializes the configuration required for starting MiniOzoneCluster.
*
* @throws IOException
*/
- private void initializeConfiguration() throws IOException {
+ void initializeConfiguration() throws IOException {
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, ozoneEnabled);
Path metaDir = Paths.get(path, "ozone-meta");
Files.createDirectories(metaDir);
@@ -434,7 +448,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
*
* @throws IOException
*/
- private StorageContainerManager createSCM()
+ StorageContainerManager createSCM()
throws IOException, AuthenticationException {
configureSCM();
SCMStorageConfig scmStore = new SCMStorageConfig(conf);
@@ -455,7 +469,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
scmStore.initialize();
}
- private void initializeOmStorage(OMStorage omStorage) throws IOException{
+ void initializeOmStorage(OMStorage omStorage) throws IOException{
if (omStorage.getState() == StorageState.INITIALIZED) {
return;
}
@@ -487,7 +501,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
*
* @throws IOException
*/
- private List<HddsDatanodeService> createHddsDatanodes(
+ List<HddsDatanodeService> createHddsDatanodes(
StorageContainerManager scm) throws IOException {
configureHddsDatanodes();
String scmAddress = scm.getDatanodeRpcAddress().getHostString() +
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
new file mode 100644
index 0000000..a1ef1f6
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMStorage;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * MiniOzoneHAClusterImpl creates a complete in-process Ozone cluster
+ * with OM HA suitable for running tests. The cluster consists of a set of
+ * OzoneManagers, StorageContainerManager and multiple DataNodes.
+ */
+public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
+
+ private List<OzoneManager> ozoneManagers;
+
+ private static final Random RANDOM = new Random();
+ private static final int RATIS_LEADER_ELECTION_TIMEOUT = 1000; // 1 seconds
+
+ public static final int NODE_FAILURE_TIMEOUT = 2000; // 2 seconds
+
+ /**
+ * Creates a new MiniOzoneCluster with OM HA.
+ *
+ * @throws IOException if there is an I/O error
+ */
+
+ private MiniOzoneHAClusterImpl(
+ OzoneConfiguration conf,
+ List<OzoneManager> omList,
+ StorageContainerManager scm,
+ List<HddsDatanodeService> hddsDatanodes) {
+ super(conf, scm, hddsDatanodes);
+ this.ozoneManagers = omList;
+ }
+
+ /**
+ * Returns the first OzoneManager from the list.
+ * @return
+ */
+ @Override
+ public OzoneManager getOzoneManager() {
+ return this.ozoneManagers.get(0);
+ }
+
+ public OzoneManager getOzoneManager(int index) {
+ return this.ozoneManagers.get(index);
+ }
+
+ @Override
+ public void restartOzoneManager() throws IOException {
+ for (OzoneManager ozoneManager : ozoneManagers) {
+ ozoneManager.stop();
+ ozoneManager.restart();
+ }
+ }
+
+ @Override
+ public void stop() {
+ for (OzoneManager ozoneManager : ozoneManagers) {
+ if (ozoneManager != null) {
+ LOG.info("Stopping the OzoneManager " + ozoneManager.getOMNodId());
+ ozoneManager.stop();
+ ozoneManager.join();
+ }
+ }
+ super.stop();
+ }
+
+ public void stopOzoneManager(int index) {
+ ozoneManagers.get(index).stop();
+ }
+
+ /**
+ * Builder for configuring the MiniOzoneCluster to run.
+ */
+ public static class Builder extends MiniOzoneClusterImpl.Builder {
+
+ private final String nodeIdBaseStr = "omNode-";
+
+ /**
+ * Creates a new Builder.
+ *
+ * @param conf configuration
+ */
+ public Builder(OzoneConfiguration conf) {
+ super(conf);
+ }
+
+ @Override
+ public MiniOzoneCluster build() throws IOException {
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ initializeConfiguration();
+ StorageContainerManager scm;
+ List<OzoneManager> omList;
+ try {
+ scm = createSCM();
+ scm.start();
+ omList = createOMService();
+ } catch (AuthenticationException ex) {
+ throw new IOException("Unable to build MiniOzoneCluster. ", ex);
+ }
+
+ final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
+ MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omList,
+ scm, hddsDatanodes);
+ if (startDataNodes) {
+ cluster.startHddsDatanodes();
+ }
+ return cluster;
+ }
+
+ /**
+ * Initialize OM configurations.
+ * @throws IOException
+ */
+ @Override
+ void initializeConfiguration() throws IOException {
+ super.initializeConfiguration();
+ conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
+ conf.setInt(OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY, numOfOmHandlers);
+ conf.setTimeDuration(
+ OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ RATIS_LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+ NODE_FAILURE_TIMEOUT, TimeUnit.MILLISECONDS);
+ conf.setInt(OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY,
+ 10);
+ }
+
+ /**
+ * Start OM service with multiple OMs.
+ * @return list of OzoneManagers
+ * @throws IOException
+ * @throws AuthenticationException
+ */
+ private List<OzoneManager> createOMService() throws IOException,
+ AuthenticationException {
+
+ List<OzoneManager> omList = new ArrayList<>(numOfOMs);
+
+ int retryCount = 0;
+ int basePort = 10000;
+
+ while (true) {
+ try {
+ basePort = 10000 + RANDOM.nextInt(1000) * 4;
+ initHAConfig(basePort);
+
+ for (int i = 1; i<= numOfOMs; i++) {
+ // Set nodeId
+ conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeIdBaseStr + i);
+
+ // Set metadata/DB dir base path
+ String metaDirPath = path + "/" + nodeIdBaseStr + i;
+ conf.set(OZONE_METADATA_DIRS, metaDirPath);
+ OMStorage omStore = new OMStorage(conf);
+ initializeOmStorage(omStore);
+
+ // Set HTTP address to the rpc port + 2
+ int httpPort = basePort + (6*i) - 4;
+ conf.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY,
+ "127.0.0.1:" + httpPort);
+
+ OzoneManager om = OzoneManager.createOm(null, conf);
+ om.setCertClient(certClient);
+ omList.add(om);
+
+ om.start();
+ LOG.info("Started OzoneManager RPC server at " +
+ om.getOmRpcServerAddr());
+ }
+
+ // Set default OM address to point to the first OM. Clients would
+ // try connecting to this address by default
+ conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
+ NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
+
+ break;
+ } catch (BindException e) {
+ for (OzoneManager om : omList) {
+ om.stop();
+ om.join();
+ LOG.info("Stopping OzoneManager server at " +
+ om.getOmRpcServerAddr());
+ }
+ omList.clear();
+ ++retryCount;
+ LOG.info("MiniOzoneHACluster port conflicts, retried " +
+ retryCount + " times");
+ }
+ }
+ return omList;
+ }
+
+ /**
+ * Initialize HA related configurations.
+ */
+ private void initHAConfig(int basePort) throws IOException {
+ // Set configurations required for starting OM HA service
+ conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId);
+ String omNodesKey = OmUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
+ StringBuilder omNodesKeyValue = new StringBuilder();
+
+ int port = basePort;
+
+ for (int i = 1; i <= numOfOMs; i++, port+=6) {
+ String omNodeId = nodeIdBaseStr + i;
+ omNodesKeyValue.append(",").append(omNodeId);
+ String omAddrKey = OmUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId);
+ String omRatisPortKey = OmUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNodeId);
+
+ conf.set(omAddrKey, "127.0.0.1:" + port);
+ // Reserve port+2 for OMs HTTP server
+ conf.setInt(omRatisPortKey, port + 4);
+ }
+
+ conf.set(omNodesKey, omNodesKeyValue.substring(1));
+ }
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 0839292..30f0749 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -38,6 +38,7 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
errorIfMissingConfigProps = true;
errorIfMissingXmlProps = true;
xmlPropsToSkipCompare.add("hadoop.tags.custom");
+ xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID");
addPropertiesNotInXml();
}
@@ -45,5 +46,6 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_KEY_ALGORITHM);
configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_SECURITY_PROVIDER);
configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT);
+ configurationPropsToSkipCompare.add(OMConfigKeys.OZONE_OM_NODES_KEY);
}
}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
index 140f91c..eb4421a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
@@ -1381,7 +1381,7 @@ public class TestOzoneManager {
* set to true.
*/
@Test
- public void testRatsiServerOnOmInitialization() throws IOException {
+ public void testRatisServerOnOMInitialization() throws IOException {
// OM Ratis server should not be started when OZONE_OM_RATIS_ENABLE_KEY
// is not set to true
Assert.assertNull("OM Ratis server started though OM Ratis is disabled.",
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java
new file mode 100644
index 0000000..4c422e6
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.LifeCycle;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests OM related configurations.
+ */
+public class TestOzoneManagerConfiguration {
+
+ private OzoneConfiguration conf;
+ private MiniOzoneCluster cluster;
+ private String omId;
+ private String clusterId;
+ private String scmId;
+ private OzoneManager om;
+ private OzoneManagerRatisServer omRatisServer;
+
+ private static final long LEADER_ELECTION_TIMEOUT = 500L;
+
+ @Before
+ public void init() throws IOException {
+ conf = new OzoneConfiguration();
+ omId = UUID.randomUUID().toString();
+ clusterId = UUID.randomUUID().toString();
+ scmId = UUID.randomUUID().toString();
+ final String path = GenericTestUtils.getTempPath(omId);
+ Path metaDirPath = Paths.get(path, "om-meta");
+ conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
+ conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
+ conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
+ conf.setTimeDuration(
+ OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+
+ OMStorage omStore = new OMStorage(conf);
+ omStore.setClusterId("testClusterId");
+ omStore.setScmId("testScmId");
+ // writes the version file properties
+ omStore.initialize();
+ }
+
+ @After
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private void startCluster() throws Exception {
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setClusterId(clusterId)
+ .setScmId(scmId)
+ .setOmId(omId)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ /**
+ * Test a single node OM service (default setting for MiniOzoneCluster).
+ * @throws Exception
+ */
+ @Test
+ public void testSingleNodeOMservice() throws Exception {
+ // Default settings of MiniOzoneCluster start a sinle node OM service.
+ startCluster();
+ om = cluster.getOzoneManager();
+ omRatisServer = om.getOmRatisServer();
+
+ Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState());
+ // OM's Ratis server should have only 1 peer (itself) in its RaftGroup
+ Collection<RaftPeer> peers = omRatisServer.getRaftGroup().getPeers();
+ Assert.assertEquals(1, peers.size());
+
+ // The RaftPeer id should match the configured omId
+ RaftPeer raftPeer = peers.toArray(new RaftPeer[1])[0];
+ Assert.assertEquals(omId, raftPeer.getId().toString());
+ }
+
+ /**
+ * Test configurating an OM service with three OM nodes.
+ * @throws Exception
+ */
+ @Test
+ public void testThreeNodeOMservice() throws Exception {
+ // Set the configuration for 3 node OM service. Set one node's rpc
+ // address to localhost. OM will parse all configurations and find the
+ // nodeId representing the localhost
+
+ final String omServiceId = "om-service-test1";
+ final String omNode1Id = "omNode1";
+ final String omNode2Id = "omNode2";
+ final String omNode3Id = "omNode3";
+
+ String omNodesKeyValue = omNode1Id + "," + omNode2Id + "," + omNode3Id;
+ String omNodesKey = OmUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
+
+ String omNode1RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode1Id);
+ String omNode2RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode2Id);
+ String omNode3RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode3Id);
+
+ String omNode3RatisPortKey = OmUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNode3Id);
+
+ conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId);
+ conf.set(omNodesKey, omNodesKeyValue);
+
+ // Set node2 to localhost and the other two nodes to dummy addresses
+ conf.set(omNode1RpcAddrKey, "123.0.0.123:9862");
+ conf.set(omNode2RpcAddrKey, "0.0.0.0:9862");
+ conf.set(omNode3RpcAddrKey, "124.0.0.124:9862");
+
+ conf.setInt(omNode3RatisPortKey, 9898);
+
+ startCluster();
+ om = cluster.getOzoneManager();
+ omRatisServer = om.getOmRatisServer();
+
+ Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState());
+
+ // OM's Ratis server should have 3 peers in its RaftGroup
+ Collection<RaftPeer> peers = omRatisServer.getRaftGroup().getPeers();
+ Assert.assertEquals(3, peers.size());
+
+ // Ratis server RaftPeerId should match with omNode2 ID as node2 is the
+ // localhost
+ Assert.assertEquals(omNode2Id, omRatisServer.getRaftPeerId().toString());
+
+ // Verify peer details
+ for (RaftPeer peer : peers) {
+ String expectedPeerAddress = null;
+ switch (peer.getId().toString()) {
+ case omNode1Id :
+ // Ratis port is not set for node1. So it should take the default port
+ expectedPeerAddress = "123.0.0.123:" +
+ OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
+ break;
+ case omNode2Id :
+ expectedPeerAddress = "0.0.0.0:"+
+ OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
+ break;
+ case omNode3Id :
+ // Ratis port is not set for node3. So it should take the default port
+ expectedPeerAddress = "124.0.0.124:9898";
+ break;
+ default : Assert.fail("Unrecognized RaftPeerId");
+ }
+ Assert.assertEquals(expectedPeerAddress, peer.getAddress());
+ }
+ }
+
+ /**
+ * Test a wrong configuration for OM HA. A configuration with none of the
+ * OM addresses matching the local address should throw an error.
+ * @throws Exception
+ */
+ @Test
+ public void testWrongConfiguration() throws Exception {
+ String omServiceId = "om-service-test1";
+
+ String omNode1Id = "omNode1";
+ String omNode2Id = "omNode2";
+ String omNode3Id = "omNode3";
+ String omNodesKeyValue = omNode1Id + "," + omNode2Id + "," + omNode3Id;
+ String omNodesKey = OmUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
+
+ String omNode1RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode1Id);
+ String omNode2RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode2Id);
+ String omNode3RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode3Id);
+
+ conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId);
+ conf.set(omNodesKey, omNodesKeyValue);
+
+ // Set node2 to localhost and the other two nodes to dummy addresses
+ conf.set(omNode1RpcAddrKey, "123.0.0.123:9862");
+ conf.set(omNode2RpcAddrKey, "125.0.0.2:9862");
+ conf.set(omNode3RpcAddrKey, "124.0.0.124:9862");
+
+ try {
+ startCluster();
+ Assert.fail("Wrong Configuration. OM initialization should have failed.");
+ } catch (OzoneIllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains("Configuration has no " +
+ OMConfigKeys.OZONE_OM_ADDRESS_KEY + " address that matches local " +
+ "node's address.", e);
+ }
+ }
+
+ /**
+ * Test multiple OM service configuration.
+ */
+ @Test
+ public void testMultipleOMServiceIds() throws Exception {
+ // Set up OZONE_OM_SERVICES_KEY with 2 service Ids.
+ String om1ServiceId = "om-service-test1";
+ String om2ServiceId = "om-service-test2";
+ String omServices = om1ServiceId + "," + om2ServiceId;
+ conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServices);
+
+ String omNode1Id = "omNode1";
+ String omNode2Id = "omNode2";
+ String omNode3Id = "omNode3";
+ String omNodesKeyValue = omNode1Id + "," + omNode2Id + "," + omNode3Id;
+
+ // Set the node Ids for the 2 services. The nodeIds need to be
+ // distinch within one service. The ids can overlap between
+ // different services.
+ String om1NodesKey = OmUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_NODES_KEY, om1ServiceId);
+ String om2NodesKey = OmUtils.addKeySuffixes(
+ OMConfigKeys.OZONE_OM_NODES_KEY, om2ServiceId);
+ conf.set(om1NodesKey, omNodesKeyValue);
+ conf.set(om2NodesKey, omNodesKeyValue);
+
+ // Set the RPC addresses for all 6 OMs (3 for each service). Only one
+ // node out of these must have the localhost address.
+ conf.set(getOMAddrKeyWithSuffix(om1ServiceId, omNode1Id),
+ "122.0.0.123:9862");
+ conf.set(getOMAddrKeyWithSuffix(om1ServiceId, omNode2Id),
+ "123.0.0.124:9862");
+ conf.set(getOMAddrKeyWithSuffix(om1ServiceId, omNode3Id),
+ "124.0.0.125:9862");
+ conf.set(getOMAddrKeyWithSuffix(om2ServiceId, omNode1Id),
+ "125.0.0.126:9862");
+ conf.set(getOMAddrKeyWithSuffix(om2ServiceId, omNode2Id),
+ "0.0.0.0:9862");
+ conf.set(getOMAddrKeyWithSuffix(om2ServiceId, omNode3Id),
+ "126.0.0.127:9862");
+
+ startCluster();
+ om = cluster.getOzoneManager();
+ omRatisServer = om.getOmRatisServer();
+
+ Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState());
+
+ // OM's Ratis server should have 3 peers in its RaftGroup
+ Collection<RaftPeer> peers = omRatisServer.getRaftGroup().getPeers();
+ Assert.assertEquals(3, peers.size());
+
+ // Verify that the serviceId and nodeId match the node with the localhost
+ // address - om-service-test2 and omNode2
+ Assert.assertEquals(om2ServiceId, om.getOMServiceId());
+ Assert.assertEquals(omNode2Id, omRatisServer.getRaftPeerId().toString());
+ }
+
+ private String getOMAddrKeyWithSuffix(String serviceId, String nodeId) {
+ return OmUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
+ serviceId, nodeId);
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
new file mode 100644
index 0000000..be932a2
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.ozone.*;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.response.VolumeInfo;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.junit.*;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
+ .NODE_FAILURE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+
+/**
+ * Test Ozone Manager operation in distributed handler scenario.
+ */
+public class TestOzoneManagerHA {
+
+ private MiniOzoneHAClusterImpl cluster = null;
+ private StorageHandler storageHandler;
+ private UserArgs userArgs;
+ private OzoneConfiguration conf;
+ private String clusterId;
+ private String scmId;
+ private int numOfOMs = 3;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Rule
+ public Timeout timeout = new Timeout(60_000);
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ * @throws IOException
+ */
+ @Before
+ public void init() throws Exception {
+ conf = new OzoneConfiguration();
+ clusterId = UUID.randomUUID().toString();
+ scmId = UUID.randomUUID().toString();
+ conf.setBoolean(OZONE_ACL_ENABLED, true);
+ conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
+
+ cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+ .setClusterId(clusterId)
+ .setScmId(scmId)
+ .setOMServiceId("om-service-test1")
+ .setNumOfOzoneManagers(numOfOMs)
+ .build();
+ cluster.waitForClusterToBeReady();
+ storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+ userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+ null, null, null, null);
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @After
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test a client request when all OM nodes are running. The request should
+ * succeed.
+ * @throws Exception
+ */
+ @Test
+ public void testAllOMNodesRunning() throws Exception {
+ testCreateVolume(true);
+ }
+
+ /**
+ * Test client request succeeds even if one OM is down.
+ */
+ @Test
+ public void testOneOMNodeDown() throws Exception {
+ cluster.stopOzoneManager(1);
+ Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
+
+ testCreateVolume(true);
+ }
+
+ /**
+ * Test client request fails when 2 OMs are down.
+ */
+ @Test
+ public void testTwoOMNodesDown() throws Exception {
+ cluster.stopOzoneManager(1);
+ cluster.stopOzoneManager(2);
+ Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
+
+ testCreateVolume(false);
+ }
+
+ /**
+ * Create a volume and test its attribute.
+ */
+ private void testCreateVolume(boolean checkSuccess) throws Exception {
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+ createVolumeArgs.setUserName(userName);
+ createVolumeArgs.setAdminName(adminName);
+
+ storageHandler.createVolume(createVolumeArgs);
+
+ VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
+ VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
+
+ if (checkSuccess) {
+ Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
+ Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
+ } else {
+ // Verify that the request failed
+ Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty());
+ }
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMNodeDetails.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMNodeDetails.java
new file mode 100644
index 0000000..caa7674
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMNodeDetails.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.net.NetUtils;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * This class stores OM node details.
+ */
+public final class OMNodeDetails {
+ private String omServiceId;
+ private String omNodeId;
+ private InetSocketAddress rpcAddress;
+ private int rpcPort;
+ private int ratisPort;
+
+ /**
+ * Constructs OMNodeDetails object.
+ */
+ private OMNodeDetails(String serviceId, String nodeId,
+ InetSocketAddress rpcAddr, int rpcPort, int ratisPort) {
+ this.omServiceId = serviceId;
+ this.omNodeId = nodeId;
+ this.rpcAddress = rpcAddr;
+ this.rpcPort = rpcPort;
+ this.ratisPort = ratisPort;
+ }
+
+ /**
+ * Builder class for OMNodeDetails.
+ */
+ public static class Builder {
+ private String omServiceId;
+ private String omNodeId;
+ private InetSocketAddress rpcAddress;
+ private int rpcPort;
+ private int ratisPort;
+
+ public Builder setRpcAddress(InetSocketAddress rpcAddr) {
+ this.rpcAddress = rpcAddr;
+ this.rpcPort = rpcAddress.getPort();
+ return this;
+ }
+
+ public Builder setRatisPort(int port) {
+ this.ratisPort = port;
+ return this;
+ }
+
+ public Builder setOMServiceId(String serviceId) {
+ this.omServiceId = serviceId;
+ return this;
+ }
+
+ public Builder setOMNodeId(String nodeId) {
+ this.omNodeId = nodeId;
+ return this;
+ }
+
+ public OMNodeDetails build() {
+ return new OMNodeDetails(omServiceId, omNodeId, rpcAddress, rpcPort,
+ ratisPort);
+ }
+ }
+
+ public String getOMServiceId() {
+ return omServiceId;
+ }
+
+ public String getOMNodeId() {
+ return omNodeId;
+ }
+
+ public InetSocketAddress getRpcAddress() {
+ return rpcAddress;
+ }
+
+ public InetAddress getAddress() {
+ return rpcAddress.getAddress();
+ }
+
+ public int getRatisPort() {
+ return ratisPort;
+ }
+
+ public String getRpcAddressString() {
+ return NetUtils.getHostPortString(rpcAddress);
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 13bfd98..2243fbd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -25,8 +25,10 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import java.security.KeyPair;
+import java.util.Collection;
import java.util.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -50,6 +52,7 @@ import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.security.OzoneSecurityException;
@@ -144,7 +147,6 @@ import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
-import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
@@ -161,6 +163,12 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys
.OZONE_OM_METRICS_SAVE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys
.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODE_ID_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+ .OZONE_OM_RATIS_PORT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.protocol.proto
@@ -200,6 +208,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress;
private String omId;
+ private OMNodeDetails omNodeDetails;
+ private List<OMNodeDetails> peerNodes;
+ private boolean isRatisEnabled;
private OzoneManagerRatisServer omRatisServer;
private OzoneManagerRatisClient omRatisClient;
private final OMMetadataManager metadataManager;
@@ -229,7 +240,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final S3SecretManager s3SecretManager;
private volatile boolean isOmRpcServerRunning = false;
- private OzoneManager(OzoneConfiguration conf) throws IOException {
+ private OzoneManager(OzoneConfiguration conf) throws IOException,
+ AuthenticationException {
super(OzoneVersionInfo.OZONE_VERSION_INFO);
Preconditions.checkNotNull(conf);
configuration = conf;
@@ -240,6 +252,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
ResultCodes.OM_NOT_INITIALIZED);
}
+ // Load HA related configurations
+ loadOMHAConfigs(configuration);
+
+ // Authenticate KSM if security is enabled
+ if (securityEnabled) {
+ loginOMUser(configuration);
+ }
+
if (!testSecureOmFlag || !isOzoneSecurityEnabled()) {
scmContainerClient = getScmContainerClient(configuration);
// verifies that the SCM info in the OM Version file is correct.
@@ -256,12 +276,15 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
scmBlockClient = null;
}
-
RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
+ startRatisServer();
+ startRatisClient();
+
+ InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
+ omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
- omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration));
secConfig = new SecurityConfig(configuration);
if (secConfig.isBlockTokenEnabled()) {
blockTokenMgr = createBlockTokenSecretManager(configuration);
@@ -269,7 +292,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
if(secConfig.isSecurityEnabled()){
delegationTokenMgr = createDelegationTokenSecretManager(configuration);
}
- InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
+
omRpcServer = getRpcServer(conf);
omRpcAddress = updateRPCListenAddress(configuration,
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
@@ -297,7 +320,157 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
accessAuthorizer = null;
}
omMetaDir = OmUtils.getOmDbDir(configuration);
+ }
+
+ /**
+ * Inspects and loads OM node configurations.
+ *
+ * If {@link OMConfigKeys#OZONE_OM_SERVICE_IDS_KEY} is configured with
+ * multiple ids and/ or if {@link OMConfigKeys#OZONE_OM_NODE_ID_KEY} is not
+ * specifically configured , this method determines the omServiceId
+ * and omNodeId by matching the node's address with the configured
+ * addresses. When a match is found, it sets the omServicId and omNodeId from
+ * the corresponding configuration key. This method also finds the OM peers
+ * nodes belonging to the same OM service.
+ *
+ * @param conf
+ */
+ private void loadOMHAConfigs(Configuration conf) {
+ InetSocketAddress localRpcAddress = null;
+ String localOMServiceId = null;
+ String localOMNodeId = null;
+ int localRatisPort = 0;
+ Collection<String> omServiceIds = conf.getTrimmedStringCollection(
+ OZONE_OM_SERVICE_IDS_KEY);
+
+ String knownOMNodeId = conf.get(OZONE_OM_NODE_ID_KEY);
+ int found = 0;
+ boolean isOMAddressSet = false;
+
+ for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
+ Collection<String> omNodeIds = OmUtils.getOMNodeIds(conf, serviceId);
+
+ List<OMNodeDetails> peerNodesList = new ArrayList<>();
+ boolean isPeer = false;
+ for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+ if (knownOMNodeId != null && !knownOMNodeId.equals(nodeId)) {
+ isPeer = true;
+ } else {
+ isPeer = false;
+ }
+ String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+ serviceId, nodeId);
+ String rpcAddrStr = conf.get(rpcAddrKey);
+ if (rpcAddrStr == null) {
+ continue;
+ }
+
+ // If OM address is set for any node id, we will not fallback to the
+ // default
+ isOMAddressSet = true;
+
+ String ratisPortKey = OmUtils.addKeySuffixes(OZONE_OM_RATIS_PORT_KEY,
+ serviceId, nodeId);
+ int ratisPort = conf.getInt(ratisPortKey, OZONE_OM_RATIS_PORT_DEFAULT);
+ InetSocketAddress addr = null;
+ try {
+ addr = NetUtils.createSocketAddr(rpcAddrStr);
+ } catch (Exception e) {
+ LOG.warn("Exception in creating socket address " + addr, e);
+ continue;
+ }
+ if (!addr.isUnresolved()) {
+ if (!isPeer && OmUtils.isAddressLocal(addr)) {
+ localRpcAddress = addr;
+ localOMServiceId = serviceId;
+ localOMNodeId = nodeId;
+ localRatisPort = ratisPort;
+ found++;
+ } else {
+ // This OMNode belongs to same OM service as the current OMNode.
+ // Add it to peerNodes list.
+ OMNodeDetails peerNodeInfo = new OMNodeDetails.Builder()
+ .setOMServiceId(serviceId)
+ .setOMNodeId(nodeId)
+ .setRpcAddress(addr)
+ .setRatisPort(ratisPort)
+ .build();
+ peerNodesList.add(peerNodeInfo);
+ }
+ }
+ }
+ if (found == 1) {
+ LOG.debug("Found one matching OM address with service ID: {} and node" +
+ " ID: {}", localOMServiceId, localOMNodeId);
+
+ setOMNodeDetails(localOMServiceId, localOMNodeId, localRpcAddress,
+ localRatisPort);
+ this.peerNodes = peerNodesList;
+
+ LOG.info("Found matching OM address with OMServiceId: {}, " +
+ "OMNodeId: {}, RPC Address: {} and Ratis port: {}",
+ localOMServiceId, localOMNodeId,
+ NetUtils.getHostPortString(localRpcAddress), localRatisPort);
+ return;
+ } else if (found > 1) {
+ String msg = "Configuration has multiple " + OZONE_OM_ADDRESS_KEY +
+ " addresses that match local node's address. Please configure the" +
+ " system with " + OZONE_OM_SERVICE_IDS_KEY + " and " +
+ OZONE_OM_ADDRESS_KEY;
+ throw new OzoneIllegalArgumentException(msg);
+ }
+ }
+
+ if (!isOMAddressSet) {
+ // No OM address is set. Fallback to default
+ InetSocketAddress omAddress = OmUtils.getOmAddress(conf);
+ int ratisPort = conf.getInt(OZONE_OM_RATIS_PORT_KEY,
+ OZONE_OM_RATIS_PORT_DEFAULT);
+
+ LOG.info("Configuration either no {} set. Falling back to the default " +
+ "OM address {}", OZONE_OM_ADDRESS_KEY, omAddress);
+
+ setOMNodeDetails(null, null, omAddress, ratisPort);
+
+ } else {
+ String msg = "Configuration has no " + OZONE_OM_ADDRESS_KEY + " " +
+ "address that matches local node's address. Please configure the " +
+ "system with " + OZONE_OM_ADDRESS_KEY;
+ LOG.info(msg);
+ throw new OzoneIllegalArgumentException(msg);
+ }
+ }
+
+ /**
+ * Builds and sets OMNodeDetails object.
+ */
+ private void setOMNodeDetails(String serviceId, String nodeId,
+ InetSocketAddress rpcAddress, int ratisPort) {
+
+ if (serviceId == null) {
+ // If no serviceId is set, take the default serviceID om-service
+ serviceId = OzoneConsts.OM_SERVICE_ID_DEFAULT;
+ LOG.info("OM Service ID is not set. Setting it to the default ID: {}",
+ serviceId);
+ }
+ if (nodeId == null) {
+ // If no nodeId is set, take the omId from omStorage as the nodeID
+ nodeId = omId;
+ LOG.info("OM Node ID is not set. Setting it to the OmStorage's " +
+ "OmID: {}", nodeId);
+ }
+
+ this.omNodeDetails = new OMNodeDetails.Builder()
+ .setOMServiceId(serviceId)
+ .setOMNodeId(nodeId)
+ .setRpcAddress(rpcAddress)
+ .setRatisPort(ratisPort)
+ .build();
+
+ // Set this nodes OZONE_OM_ADDRESS_KEY to the discovered address.
+ configuration.set(OZONE_OM_ADDRESS_KEY,
+ NetUtils.getHostPortString(rpcAddress));
}
/**
@@ -479,7 +652,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
* @param conf
* @throws IOException, AuthenticationException
*/
- private static void loginOMUser(OzoneConfiguration conf)
+ private void loginOMUser(OzoneConfiguration conf)
throws IOException, AuthenticationException {
if (SecurityUtil.getAuthenticationMethod(conf).equals(
@@ -491,7 +664,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
UserGroupInformation.setConfiguration(conf);
- InetSocketAddress socAddr = getOmAddress(conf);
+ InetSocketAddress socAddr = OmUtils.getOmAddress(conf);
SecurityUtil.login(conf, OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
OZONE_OM_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
} else {
@@ -660,10 +833,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
securityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
- // Authenticate KSM if security is enabled
- if (securityEnabled) {
- loginOMUser(conf);
- }
+
switch (startOpt) {
case INIT:
if (printBanner) {
@@ -793,6 +963,16 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
@VisibleForTesting
+ public OzoneManagerRatisServer getOmRatisServer() {
+ return omRatisServer;
+ }
+
+ @VisibleForTesting
+ public InetSocketAddress getOmRpcServerAddr() {
+ return omRpcAddress;
+ }
+
+ @VisibleForTesting
public LifeCycle.State getOmRatisServerState() {
if (omRatisServer == null) {
return null;
@@ -866,7 +1046,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
omRpcAddress));
-
DefaultMetricsSystem.initialize("OzoneManager");
metadataManager.start(configuration);
@@ -894,6 +1073,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
omRpcServer = getRpcServer(configuration);
omRpcServer.start();
isOmRpcServerRunning = true;
+
+ startRatisServer();
+ startRatisClient();
+
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
@@ -919,40 +1102,65 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
return omRpcServer;
}
- InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
+ InetSocketAddress omNodeRpcAddr = OmUtils.getOmAddress(configuration);
+
+ final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
+ OZONE_OM_HANDLER_COUNT_DEFAULT);
+ RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
+ ProtobufRpcEngine.class);
+
+ BlockingService omService = newReflectiveBlockingService(
+ new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient,
+ isRatisEnabled));
+ return startRpcServer(configuration, omNodeRpcAddr,
+ OzoneManagerProtocolPB.class, omService,
+ handlerCount);
+ }
+
+ /**
+ * Creates an instance of ratis server.
+ */
+ private void startRatisServer() throws IOException {
// This is a temporary check. Once fully implemented, all OM state change
- // should go through Ratis - either standalone (for non-HA) or replicated
+ // should go through Ratis - be it standalone (for non-HA) or replicated
// (for HA).
- boolean omRatisEnabled = configuration.getBoolean(
+ isRatisEnabled = configuration.getBoolean(
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
- if (omRatisEnabled) {
- omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId,
- omNodeRpcAddr.getAddress(), configuration);
+ if (isRatisEnabled) {
+ if (omRatisServer == null) {
+ omRatisServer = OzoneManagerRatisServer.newOMRatisServer(
+ configuration, this, omNodeDetails, peerNodes);
+ }
omRatisServer.start();
LOG.info("OzoneManager Ratis server started at port {}",
omRatisServer.getServerPort());
+ } else {
+ omRatisServer = null;
+ }
+ }
- omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
- omId, omRatisServer.getRaftGroup(), configuration);
+ /**
+ * Creates an instance of ratis client.
+ */
+ private void startRatisClient() throws IOException {
+ // This is a temporary check. Once fully implemented, all OM state change
+ // should go through Ratis - be it standalone (for non-HA) or replicated
+ // (for HA).
+ isRatisEnabled = configuration.getBoolean(
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
+ if (isRatisEnabled) {
+ if (omRatisClient == null) {
+ omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
+ omNodeDetails.getOMNodeId(), omRatisServer.getRaftGroup(),
+ configuration);
+ }
omRatisClient.connect();
} else {
- omRatisServer = null;
omRatisClient = null;
}
-
- final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
- OZONE_OM_HANDLER_COUNT_DEFAULT);
- RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
- ProtobufRpcEngine.class);
-
- BlockingService omService = newReflectiveBlockingService(
- new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient,
- omRatisEnabled));
- return startRpcServer(configuration, omNodeRpcAddr,
- OzoneManagerProtocolPB.class, omService,
- handlerCount);
}
/**
@@ -970,6 +1178,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
if (omRatisServer != null) {
omRatisServer.stop();
}
+ if (omRatisClient != null) {
+ omRatisClient.close();
+ }
isOmRpcServerRunning = false;
keyManager.stop();
stopSecretManager();
@@ -2188,4 +2399,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
public static void setTestSecureOmFlag(boolean testSecureOmFlag) {
OzoneManager.testSecureOmFlag = testSecureOmFlag;
}
+
+ public String getOMNodId() {
+ return omNodeDetails.getOMNodeId();
+ }
+
+ public String getOMServiceId() {
+ return omNodeDetails.getOMServiceId();
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
index c18437c..9e1cafc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -19,12 +19,10 @@ package org.apache.hadoop.ozone.om.ratis;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils;
@@ -57,7 +55,7 @@ public final class OzoneManagerRatisClient implements Closeable {
private final RaftGroup raftGroup;
private final String omID;
private final RpcType rpcType;
- private final AtomicReference<RaftClient> client = new AtomicReference<>();
+ private RaftClient raftClient;
private final RetryPolicy retryPolicy;
private final Configuration conf;
@@ -101,32 +99,21 @@ public final class OzoneManagerRatisClient implements Closeable {
// maxOutstandingRequests so as to set the upper bound on max no of async
// requests to be handled by raft client
- if (!client.compareAndSet(null, OMRatisHelper.newRaftClient(
- rpcType, omID, raftGroup, retryPolicy, conf))) {
- throw new IllegalStateException("Client is already connected.");
- }
+ raftClient = OMRatisHelper.newRaftClient(rpcType, omID, raftGroup,
+ retryPolicy, conf);
}
@Override
public void close() {
- final RaftClient c = client.getAndSet(null);
- if (c != null) {
- closeRaftClient(c);
+ if (raftClient != null) {
+ try {
+ raftClient.close();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
}
}
- private void closeRaftClient(RaftClient raftClient) {
- try {
- raftClient.close();
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
- private RaftClient getClient() {
- return Objects.requireNonNull(client.get(), "client is null");
- }
-
/**
* Sends a given request to server and gets the reply back.
* @param request Request
@@ -188,7 +175,7 @@ public final class OzoneManagerRatisClient implements Closeable {
boolean isReadOnlyRequest = OmUtils.isReadOnly(request);
ByteString byteString = OMRatisHelper.convertRequestToByteString(request);
LOG.debug("sendOMRequestAsync {} {}", isReadOnlyRequest, request);
- return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) :
- getClient().sendAsync(() -> byteString);
+ return isReadOnlyRequest ? raftClient.sendReadOnlyAsync(() -> byteString) :
+ raftClient.sendAsync(() -> byteString);
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index d49d5e6..be4bf59 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -22,19 +22,17 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.Objects;
+import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMNodeDetails;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClientConfigKeys;
@@ -50,6 +48,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
@@ -71,28 +70,37 @@ public final class OzoneManagerRatisServer {
private final RaftPeerId raftPeerId;
private final OzoneManagerProtocol ozoneManager;
- private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
-
- private static long nextCallId() {
- return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
- }
-
- private OzoneManagerRatisServer(OzoneManagerProtocol om, String omId,
- InetAddress addr, int port, Configuration conf) throws IOException {
- Objects.requireNonNull(omId, "omId is null");
+ /**
+ * Returns an OM Ratis server.
+ * @param conf configuration
+ * @param om the OM instance starting the ratis server
+ * @param raftGroupIdStr raft group id string
+ * @param localRaftPeerId raft peer id of this Ratis server
+ * @param addr address of the ratis server
+ * @param raftPeers peer nodes in the raft ring
+ * @throws IOException
+ */
+ private OzoneManagerRatisServer(Configuration conf, OzoneManagerProtocol om,
+ String raftGroupIdStr, RaftPeerId localRaftPeerId,
+ InetSocketAddress addr, List<RaftPeer> raftPeers)
+ throws IOException {
this.ozoneManager = om;
- this.port = port;
- this.omRatisAddress = new InetSocketAddress(addr.getHostAddress(), port);
+ this.omRatisAddress = addr;
+ this.port = addr.getPort();
RaftProperties serverProperties = newRaftProperties(conf);
- // TODO: When implementing replicated OM ratis servers, RaftGroupID
- // should be the same across all the OMs. Add all the OM servers as Raft
- // Peers.
- this.raftGroupId = RaftGroupId.randomId();
- this.raftPeerId = RaftPeerId.getRaftPeerId(omId);
+ this.raftPeerId = localRaftPeerId;
+ this.raftGroupId = RaftGroupId.valueOf(
+ ByteString.copyFromUtf8(raftGroupIdStr));
+ this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
+
+ StringBuilder raftPeersStr = new StringBuilder();
+ for (RaftPeer peer : raftPeers) {
+ raftPeersStr.append(", ").append(peer.getAddress());
+ }
+ LOG.info("Instantiating OM Ratis server with GroupID: {} and " +
+ "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
- RaftPeer raftPeer = new RaftPeer(raftPeerId, omRatisAddress);
- this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeer);
this.server = RaftServer.newBuilder()
.setServerId(this.raftPeerId)
.setGroup(this.raftGroup)
@@ -101,31 +109,42 @@ public final class OzoneManagerRatisServer {
.build();
}
+ /**
+ * Creates an instance of OzoneManagerRatisServer.
+ */
public static OzoneManagerRatisServer newOMRatisServer(
- OzoneManagerProtocol om, String omId, InetAddress omAddress,
- Configuration ozoneConf) throws IOException {
- int localPort = ozoneConf.getInt(
- OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
- OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
-
- // Get an available port on current node and
- // use that as the container port
- if (ozoneConf.getBoolean(
- OMConfigKeys.OZONE_OM_RATIS_RANDOM_PORT_KEY,
- OMConfigKeys.OZONE_OM_RATIS_RANDOM_PORT_KEY_DEFAULT)) {
- try (ServerSocket socket = new ServerSocket()) {
- socket.setReuseAddress(true);
- SocketAddress address = new InetSocketAddress(0);
- socket.bind(address);
- localPort = socket.getLocalPort();
- LOG.info("Found a free port for the OM Ratis server : {}", localPort);
- } catch (IOException e) {
- LOG.error("Unable find a random free port for the server, "
- + "fallback to use default port {}", localPort, e);
- }
+ Configuration ozoneConf, OzoneManagerProtocol om,
+ OMNodeDetails omNodeDetails, List<OMNodeDetails> peerNodes)
+ throws IOException {
+
+ // RaftGroupId is the omServiceId
+ String omServiceId = omNodeDetails.getOMServiceId();
+
+ String omNodeId = omNodeDetails.getOMNodeId();
+ RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(omNodeId);
+
+ InetSocketAddress ratisAddr = new InetSocketAddress(
+ omNodeDetails.getAddress(), omNodeDetails.getRatisPort());
+
+ RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr);
+
+ List<RaftPeer> raftPeers = new ArrayList<>();
+ // Add this Ratis server to the Ratis ring
+ raftPeers.add(localRaftPeer);
+
+ for (OMNodeDetails peerInfo : peerNodes) {
+ String peerNodeId = peerInfo.getOMNodeId();
+ InetSocketAddress peerRatisAddr = new InetSocketAddress(
+ peerInfo.getAddress(), peerInfo.getRatisPort());
+ RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
+ RaftPeer raftPeer = new RaftPeer(raftPeerId, peerRatisAddr);
+
+ // Add other OM nodes belonging to the same OM service to the Ratis ring
+ raftPeers.add(raftPeer);
}
- return new OzoneManagerRatisServer(om, omId, omAddress, localPort,
- ozoneConf);
+
+ return new OzoneManagerRatisServer(ozoneConf, om, omServiceId,
+ localRaftPeerId, ratisAddr, raftPeers);
}
public RaftGroup getRaftGroup() {
@@ -139,6 +158,10 @@ public final class OzoneManagerRatisServer {
return new OzoneManagerStateMachine(ozoneManager);
}
+ /**
+ * Start the Ratis server.
+ * @throws IOException
+ */
public void start() throws IOException {
LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
server.getId(), port);
@@ -266,11 +289,6 @@ public final class OzoneManagerRatisServer {
// TODO: set max write buffer size
- /**
- * TODO: set following ratis leader election related configs when
- * replicated ratis server is implemented.
- * 1. node failure timeout
- */
// Set the ratis leader election timeout
TimeUnit leaderElectionMinTimeoutUnit =
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
@@ -288,6 +306,20 @@ public final class OzoneManagerRatisServer {
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
+ TimeUnit nodeFailureTimeoutUnit =
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+ .getUnit();
+ long nodeFailureTimeoutDuration = conf.getTimeDuration(
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+ .getDuration(), nodeFailureTimeoutUnit);
+ final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
+ nodeFailureTimeoutDuration, nodeFailureTimeoutUnit);
+ RaftServerConfigKeys.setLeaderElectionTimeout(properties,
+ nodeFailureTimeout);
+ RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
+ nodeFailureTimeout);
+
/**
* TODO: when ratis snapshots are implemented, set snapshot threshold and
* queue size.
@@ -305,6 +337,11 @@ public final class OzoneManagerRatisServer {
return server.getLifeCycleState();
}
+ @VisibleForTesting
+ public RaftPeerId getRaftPeerId() {
+ return this.raftPeerId;
+ }
+
/**
* Get the local directory where ratis logs will be stored.
*/
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
index 1f285db..ffa6680 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
@@ -19,14 +19,19 @@
package org.apache.hadoop.ozone.om.ratis;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMNodeDetails;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -64,8 +69,20 @@ public class TestOzoneManagerRatisServer {
conf.setTimeDuration(
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
- omRatisServer = OzoneManagerRatisServer.newOMRatisServer(null, omID,
- InetAddress.getLocalHost(), conf);
+ int ratisPort = conf.getInt(
+ OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
+ InetSocketAddress rpcAddress = new InetSocketAddress(
+ InetAddress.getLocalHost(), 0);
+ OMNodeDetails omNodeDetails = new OMNodeDetails.Builder()
+ .setRpcAddress(rpcAddress)
+ .setRatisPort(ratisPort)
+ .setOMNodeId(omID)
+ .setOMServiceId(OzoneConsts.OM_SERVICE_ID_DEFAULT)
+ .build();
+ // Starts a single node Ratis server
+ omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, null,
+ omNodeDetails, Collections.emptyList());
omRatisServer.start();
omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(omID,
omRatisServer.getRaftGroup(), conf);
@@ -94,8 +111,6 @@ public class TestOzoneManagerRatisServer {
/**
* Submit any request to OM Ratis server and check that the dummy response
* message is received.
- * TODO: Once state machine is implemented, submitting a request to Ratis
- * server should result in a valid response.
*/
@Test
public void testSubmitRatisRequest() throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
|