hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From binli...@apache.org
Subject hbase git commit: HBASE-19290 Reduce zk request when doing split log
Date Wed, 29 Nov 2017 10:44:18 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 0c4c39553 -> 64ddce303


HBASE-19290 Reduce zk request when doing split log


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/64ddce30
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/64ddce30
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/64ddce30

Branch: refs/heads/branch-2
Commit: 64ddce303ee352a3ab1eb9cedf3fad097be7569c
Parents: 0c4c395
Author: binlijin <binlijin@gmail.com>
Authored: Wed Nov 29 18:43:41 2017 +0800
Committer: binlijin <binlijin@gmail.com>
Committed: Wed Nov 29 18:43:41 2017 +0800

----------------------------------------------------------------------
 .../ZkSplitLogWorkerCoordination.java           | 78 +++++++++++++-------
 1 file changed, 51 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/64ddce30/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
index e64907c..0540a8f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.coordination;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
@@ -199,27 +200,28 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
    * try to grab a 'lock' on the task zk node to own and execute the task.
    * <p>
    * @param path zk node for the task
+   * @return boolean value when grab a task success return true otherwise false
    */
-  private void grabTask(String path) {
+  private boolean grabTask(String path) {
     Stat stat = new Stat();
     byte[] data;
     synchronized (grabTaskLock) {
       currentTask = path;
       workerInGrabTask = true;
       if (Thread.interrupted()) {
-        return;
+        return false;
       }
     }
     try {
       try {
         if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
           SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment();
-          return;
+          return false;
         }
       } catch (KeeperException e) {
         LOG.warn("Failed to get data for znode " + path, e);
         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
-        return;
+        return false;
       }
       SplitLogTask slt;
       try {
@@ -227,18 +229,18 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
       } catch (DeserializationException e) {
         LOG.warn("Failed parse data for znode " + path, e);
         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
-        return;
+        return false;
       }
       if (!slt.isUnassigned()) {
         SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment();
-        return;
+        return false;
       }
 
       currentVersion =
           attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion());
       if (currentVersion < 0) {
         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment();
-        return;
+        return false;
       }
 
       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
@@ -249,7 +251,7 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
 
         endTask(new SplitLogTask.Done(server.getServerName()),
           SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
-        return;
+        return false;
       }
 
       LOG.info("worker " + server.getServerName() + " acquired task " + path);
@@ -266,6 +268,7 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
         LOG.warn("Interrupted while yielding for other region servers", e);
         Thread.currentThread().interrupt();
       }
+      return true;
     } finally {
       synchronized (grabTaskLock) {
         workerInGrabTask = false;
@@ -316,12 +319,13 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
   }
 
   /**
-   * This function calculates how many splitters it could create based on expected average
tasks per
-   * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
+   * This function calculates how many splitters this RS should create based on expected
average
+   * tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
<br>
    * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
-   * @param numTasks current total number of available tasks
+   * @param numTasks total number of split tasks available
+   * @return number of tasks this RS can grab
    */
-  private int calculateAvailableSplitters(int numTasks) {
+  private int getNumExpectedTasksPerRS(int numTasks) {
     // at lease one RS(itself) available
     int availableRSs = 1;
     try {
@@ -332,12 +336,17 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
       // do nothing
       LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
     }
-
     int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0)
? 0 : 1);
-    expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
-    // calculate how many more splitters we could spawn
-    return Math.min(expectedTasksPerRS, maxConcurrentTasks)
-        - this.tasksInProgress.get();
+    return Math.max(1, expectedTasksPerRS); // at least be one
+  }
+
+  /**
+   * @param expectedTasksPerRS Average number of tasks to be handled by each RS
+   * @return true if more splitters are available, otherwise false.
+   */
+  private boolean areSplittersAvailable(int expectedTasksPerRS) {
+    return (Math.min(expectedTasksPerRS, maxConcurrentTasks)
+        - this.tasksInProgress.get()) > 0;
   }
 
   /**
@@ -406,8 +415,11 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
             + " ... worker thread exiting.");
         return;
       }
+      // shuffle the paths to prevent different split log worker start from the same log
file after
+      // meta log (if any)
+      Collections.shuffle(paths);
       // pick meta wal firstly
-      int offset = (int) (Math.random() * paths.size());
+      int offset = 0;
       for (int i = 0; i < paths.size(); i++) {
         if (AbstractFSWALProvider.isMetaFile(paths.get(i))) {
           offset = i;
@@ -415,21 +427,33 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
         }
       }
       int numTasks = paths.size();
+      int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks);
+      boolean taskGrabbed = false;
       for (int i = 0; i < numTasks; i++) {
-        int idx = (i + offset) % paths.size();
-        // don't call ZKSplitLog.getNodeName() because that will lead to
-        // double encoding of the path name
-        if (this.calculateAvailableSplitters(numTasks) > 0) {
-          grabTask(ZNodePaths.joinZNode(watcher.znodePaths.splitLogZNode, paths.get(idx)));
-        } else {
-          LOG.debug("Current region server " + server.getServerName() + " has "
-              + this.tasksInProgress.get() + " tasks in progress and can't take more.");
-          break;
+        while (!shouldStop) {
+          if (this.areSplittersAvailable(expectedTasksPerRS)) {
+            LOG.debug("Current region server " + server.getServerName()
+                + " is ready to take more tasks, will get task list and try grab tasks again.");
+            int idx = (i + offset) % paths.size();
+            // don't call ZKSplitLog.getNodeName() because that will lead to
+            // double encoding of the path name
+            taskGrabbed |= grabTask(ZNodePaths.joinZNode(
+                watcher.znodePaths.splitLogZNode, paths.get(idx)));
+            break;
+          } else {
+            LOG.debug("Current region server " + server.getServerName() + " has "
+                + this.tasksInProgress.get() + " tasks in progress and can't take more.");
+            Thread.sleep(100);
+          }
         }
         if (shouldStop) {
           return;
         }
       }
+      if (!taskGrabbed && !shouldStop) {
+        // do not grab any tasks, sleep a little bit to reduce zk request.
+        Thread.sleep(1000);
+      }
       SplitLogCounters.tot_wkr_task_grabing.increment();
       synchronized (taskReadySeq) {
         while (seq_start == taskReadySeq.get()) {


Mime
View raw message