hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xkro...@apache.org
Subject [hadoop] branch trunk updated: HDFS-13783. Add an option to the Balancer to make it run as a long-running service. Contributed by Chen Zhang.
Date Tue, 30 Jul 2019 22:46:02 GMT
This is an automated email from the ASF dual-hosted git repository.

xkrogen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1f26cc8  HDFS-13783. Add an option to the Balancer to make it run as a long-running
service. Contributed by Chen Zhang.
1f26cc8 is described below

commit 1f26cc8705b5af12eefedda019e7ab5c261d9bfb
Author: Erik Krogen <xkrogen@apache.org>
AuthorDate: Tue Jul 30 15:42:55 2019 -0700

    HDFS-13783. Add an option to the Balancer to make it run as a long-running service. Contributed
by Chen Zhang.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   8 +-
 .../hadoop/hdfs/server/balancer/Balancer.java      |  80 ++++++++-
 .../hdfs/server/balancer/BalancerParameters.java   |  13 ++
 .../src/main/resources/hdfs-default.xml            |  17 ++
 .../hdfs/server/balancer/TestBalancerService.java  | 189 +++++++++++++++++++++
 5 files changed, 302 insertions(+), 5 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index fb83baf..b839e51 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalcul
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.http.HttpConfig;
 
+import java.util.concurrent.TimeUnit;
+
 /** 
  * This class contains constants for configuration keys and default values
  * used in hdfs.
@@ -623,7 +625,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One
minute
   public static final String  DFS_BALANCER_MAX_ITERATION_TIME_KEY = "dfs.balancer.max-iteration-time";
   public static final long    DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT = 20 * 60 * 1000L;
// 20 mins
-
+  public static final String  DFS_BALANCER_SERVICE_INTERVAL_KEY = "dfs.balancer.service.interval";
+  public static final long    DFS_BALANCER_SERVICE_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(5);
//5 mins
+  public static final String  DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION = "dfs.balancer.service.retries.on.exception";
+  public static final int     DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION_DEFAULT = 5;
 
   public static final String  DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
   public static final long    DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
@@ -1639,5 +1644,4 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   @Deprecated
   public static final long    DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
       HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
-
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index fe187cb..684b2d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -196,7 +197,14 @@ public class Balancer {
       + "\n\t[-runDuringUpgrade]"
       + "\tWhether to run the balancer during an ongoing HDFS upgrade."
       + "This is usually not desired since it will not affect used space "
-      + "on over-utilized machines.";
+      + "on over-utilized machines."
+      + "\n\t[-asService]\tRun as a long running service.";
+
+  @VisibleForTesting
+  private static volatile boolean serviceRunning = false;
+
+  private static volatile int exceptionsSinceLastBalance = 0;
+  private static volatile int failedTimesSinceLastSuccessfulBalance = 0;
 
   private final Dispatcher dispatcher;
   private final NameNodeConnector nnc;
@@ -256,6 +264,14 @@ public class Balancer {
     return v;
   }
 
+  static int getExceptionsSinceLastBalance() {
+    return exceptionsSinceLastBalance;
+  }
+
+  static int getFailedTimesSinceLastSuccessfulBalance() {
+    return failedTimesSinceLastSuccessfulBalance;
+  }
+
   /**
    * Construct a balancer.
    * Initialize balancer. It sets the value of the threshold, and 
@@ -672,8 +688,9 @@ public class Balancer {
    * for each namenode,
    * execute a {@link Balancer} to work through all datanodes once.  
    */
-  static int run(Collection<URI> namenodes, final BalancerParameters p,
-      Configuration conf) throws IOException, InterruptedException {
+  static private int doBalance(Collection<URI> namenodes,
+      final BalancerParameters p, Configuration conf)
+      throws IOException, InterruptedException {
     final long sleeptime =
         conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
             DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT,
@@ -731,6 +748,60 @@ public class Balancer {
     return ExitStatus.SUCCESS.getExitCode();
   }
 
+  static int run(Collection<URI> namenodes, final BalancerParameters p,
+      Configuration conf) throws IOException, InterruptedException {
+    if (!p.getRunAsService()) {
+      return doBalance(namenodes, p, conf);
+    }
+    if (!serviceRunning) {
+      serviceRunning = true;
+    } else {
+      LOG.warn("Balancer already running as a long-service!");
+      return ExitStatus.ALREADY_RUNNING.getExitCode();
+    }
+
+    long scheduleInterval = conf.getTimeDuration(
+          DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_KEY,
+          DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+    int retryOnException =
+          conf.getInt(DFSConfigKeys.DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION,
+              DFSConfigKeys.DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION_DEFAULT);
+
+    while (serviceRunning) {
+      try {
+        int retCode = doBalance(namenodes, p, conf);
+        if (retCode < 0) {
+          LOG.info("Balance failed, error code: " + retCode);
+          failedTimesSinceLastSuccessfulBalance++;
+        } else {
+          LOG.info("Balance succeed!");
+          failedTimesSinceLastSuccessfulBalance = 0;
+        }
+        exceptionsSinceLastBalance = 0;
+      } catch (Exception e) {
+        if (++exceptionsSinceLastBalance > retryOnException) {
+          // The caller will process and log the exception
+          throw e;
+        }
+        LOG.warn(
+            "Encounter exception while do balance work. Already tried {} times",
+            exceptionsSinceLastBalance, e);
+      }
+
+      // sleep for next round, will retry for next round when it's interrupted
+      LOG.info("Finished one round, will wait for {} for next round",
+          time2Str(scheduleInterval));
+      Thread.sleep(scheduleInterval);
+    }
+    // normal stop
+    return 0;
+  }
+
+  static void stop() {
+    serviceRunning = false;
+  }
+
   private static void checkKeytabAndInit(Configuration conf)
       throws IOException {
     if (conf.getBoolean(DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY,
@@ -867,6 +938,9 @@ public class Balancer {
                   + "upgrade. Most users will not want to run the balancer "
                   + "during an upgrade since it will not affect used space "
                   + "on over-utilized machines.");
+            } else if ("-asService".equalsIgnoreCase(args[i])) {
+              b.setRunAsService(true);
+              LOG.info("Balancer will run as a long running service");
             } else {
               throw new IllegalArgumentException("args = "
                   + Arrays.toString(args));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
index 5d5e9b1..cdca39f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
@@ -45,6 +45,8 @@ final class BalancerParameters {
    */
   private final boolean runDuringUpgrade;
 
+  private final boolean runAsService;
+
   static final BalancerParameters DEFAULT = new BalancerParameters();
 
   private BalancerParameters() {
@@ -60,6 +62,7 @@ final class BalancerParameters {
     this.sourceNodes = builder.sourceNodes;
     this.blockpools = builder.blockpools;
     this.runDuringUpgrade = builder.runDuringUpgrade;
+    this.runAsService = builder.runAsService;
   }
 
   BalancingPolicy getBalancingPolicy() {
@@ -94,6 +97,10 @@ final class BalancerParameters {
     return this.runDuringUpgrade;
   }
 
+  boolean getRunAsService() {
+    return this.runAsService;
+  }
+
   @Override
   public String toString() {
     return String.format("%s.%s [%s," + " threshold = %s,"
@@ -117,6 +124,7 @@ final class BalancerParameters {
     private Set<String> sourceNodes = Collections.<String> emptySet();
     private Set<String> blockpools = Collections.<String> emptySet();
     private boolean runDuringUpgrade = false;
+    private boolean runAsService = false;
 
     Builder() {
     }
@@ -161,6 +169,11 @@ final class BalancerParameters {
       return this;
     }
 
+    Builder setRunAsService(boolean asService) {
+      this.runAsService = asService;
+      return this;
+    }
+
     BalancerParameters build() {
       return new BalancerParameters(this);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 890d034..7271503 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3655,6 +3655,23 @@
 </property>
 
 <property>
+  <name>dfs.balancer.service.interval</name>
+  <value>5m</value>
+  <description>
+    The schedule interval of balancer when running as a long service.
+  </description>
+</property>
+
+<property>
+  <name>dfs.balancer.service.retries.on.exception</name>
+  <value>5</value>
+  <description>
+    When the balancer is executed as a long-running service, it will retry upon encountering
an exception. This
+    configuration determines how many times it will retry before considering the exception
to be fatal and quitting.
+  </description>
+</property>
+
+<property>
   <name>dfs.block.misreplication.processing.limit</name>
   <value>10000</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java
new file mode 100644
index 0000000..43cc751
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Tool;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test balancer run as a service.
+ */
+public class TestBalancerService {
+  private MiniDFSCluster cluster;
+  private ClientProtocol client;
+  private long totalUsedSpace;
+
+  // array of racks for original nodes in cluster
+  private static final String[] TEST_RACKS =
+      {TestBalancer.RACK0, TestBalancer.RACK1};
+  // array of capacities for original nodes in cluster
+  private static final long[] TEST_CAPACITIES =
+      {TestBalancer.CAPACITY, TestBalancer.CAPACITY};
+  private static final double USED = 0.3;
+
+  static {
+    TestBalancer.initTestSetup();
+  }
+
+  private void setupCluster(Configuration conf) throws Exception {
+    MiniDFSNNTopology.NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
+    nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
+    Configuration copiedConf = new Configuration(conf);
+    cluster = new MiniDFSCluster.Builder(copiedConf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(TEST_CAPACITIES.length).racks(TEST_RACKS)
+        .simulatedCapacities(TEST_CAPACITIES).build();
+    HATestUtil.setFailoverConfigurations(cluster, conf);
+    cluster.waitActive();
+    cluster.transitionToActive(0);
+    client = NameNodeProxies
+        .createProxy(conf, FileSystem.getDefaultUri(conf), ClientProtocol.class)
+        .getProxy();
+
+    int numOfDatanodes = TEST_CAPACITIES.length;
+    long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
+    // fill up the cluster to be 30% full
+    totalUsedSpace = (long) (totalCapacity * USED);
+    TestBalancer.createFile(cluster, TestBalancer.filePath,
+        totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 0);
+  }
+
+  private long addOneDataNode(Configuration conf) throws Exception {
+    // start up an empty node with the same capacity and on the same rack
+    cluster.startDataNodes(conf, 1, true, null,
+        new String[] {TestBalancer.RACK2},
+        new long[] {TestBalancer.CAPACITY});
+    long totalCapacity = cluster.getDataNodes().size() * TestBalancer.CAPACITY;
+    TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
+        cluster);
+    return totalCapacity;
+  }
+
+  private Thread newBalancerService(Configuration conf, String[] args) {
+    return new Thread(new Runnable() {
+      @Override
+      public void run() {
+        Tool cli = new Balancer.Cli();
+        cli.setConf(conf);
+        try {
+          cli.run(args);
+        } catch (Exception e) {
+          fail("balancer failed for " + e);
+        }
+      }
+    });
+  }
+
+  /**
+   * The normal test case. Start with an imbalanced cluster, then balancer
+   * should balance succeed but not exit, then make the cluster imbalanced and
+   * wait for balancer to balance it again
+   */
+  @Test(timeout = 60000)
+  public void testBalancerServiceBalanceTwice() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setTimeDuration(DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_KEY, 5,
+        TimeUnit.SECONDS);
+    TestBalancer.initConf(conf);
+    try {
+      setupCluster(conf);
+      long totalCapacity = addOneDataNode(conf); // make cluster imbalanced
+
+      Thread balancerThread =
+          newBalancerService(conf, new String[] {"-asService"});
+      balancerThread.start();
+
+      TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
+          cluster, BalancerParameters.DEFAULT);
+      cluster.triggerHeartbeats();
+      cluster.triggerBlockReports();
+
+      // add another empty datanode, wait for cluster become balance again
+      totalCapacity = addOneDataNode(conf);
+      TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
+          cluster, BalancerParameters.DEFAULT);
+
+      Balancer.stop();
+      balancerThread.join();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testBalancerServiceOnError() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    // retry for every 5 seconds
+    conf.setTimeDuration(DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_KEY, 5,
+        TimeUnit.SECONDS);
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+    TestBalancer.initConf(conf);
+    try {
+      setupCluster(conf);
+
+      Thread balancerThread =
+          newBalancerService(conf, new String[] {"-asService"});
+      balancerThread.start();
+
+      // cluster is out of service for 10+ secs, the balancer service will retry
+      // for 2+ times
+      cluster.shutdownNameNode(0);
+      GenericTestUtils.waitFor(
+          () -> Balancer.getExceptionsSinceLastBalance() > 0, 1000, 10000);
+      assertTrue(Balancer.getExceptionsSinceLastBalance() > 0);
+      cluster.restartNameNode(0);
+      cluster.transitionToActive(0);
+      cluster.waitActive();
+
+      long totalCapacity = addOneDataNode(conf);
+      TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
+          cluster, BalancerParameters.DEFAULT);
+
+      Balancer.stop();
+      balancerThread.join();
+
+      // reset to 0 once the balancer finished without exception
+      assertEquals(Balancer.getExceptionsSinceLastBalance(), 0);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message