accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 1.9 updated: Fix multiple concurrency bugs in Master.gatherTableInformation() (#546)
Date Tue, 10 Jul 2018 22:05:02 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new 42a3534  Fix multiple concurrency bugs in Master.gatherTableInformation() (#546)
42a3534 is described below

commit 42a3534ebcd1cd27df863f10487708d4648ab03a
Author: Keith Turner <keith@deenlo.com>
AuthorDate: Tue Jul 10 18:04:59 2018 -0400

    Fix multiple concurrency bugs in Master.gatherTableInformation() (#546)
    
    Master.gatherTableInformation() had the following problems :
    
     * If Property.MASTER_STATUS_THREAD_POOL_SIZE set > 1, then multiple threads
       would put into a TreeMap
     * Create a thread pool and never called shutdown now
     * Returns a reference to a treemap that threads in thread pool may still
       be adding to.
    
    This patch also attempts to address the issues brought up in #402 by switching
    to a cached thread pool.  This will allow the thread pool to expand so that
    unresponsive tservers do not prevent gathering status from responsive tservers.
---
 .../org/apache/accumulo/core/conf/Property.java    |  5 +--
 .../java/org/apache/accumulo/master/Master.java    | 36 ++++++++++++++++------
 2 files changed, 29 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 1eed867..6a2024e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -340,8 +340,9 @@ public enum Property {
   MASTER_REPLICATION_COORDINATOR_THREADCHECK("master.replication.coordinator.threadcheck.time",
       "5s", PropertyType.TIMEDURATION,
       "The time between adjustments of the coordinator thread pool"),
-  MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "1", PropertyType.COUNT,
-      "The number of threads to use when fetching the tablet server status for balancing."),
+  MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "0", PropertyType.COUNT,
+      "The number of threads to use when fetching the tablet server status for balancing.
 Zero "
+          + "indicates an unlimited number of threads will be used."),
   MASTER_METADATA_SUSPENDABLE("master.metadata.suspendable", "false", PropertyType.BOOLEAN,
       "Allow tablets for the " + MetadataTable.NAME
           + " table to be suspended via table.suspend.duration."),
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9414b98..2f124e3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -163,6 +164,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.Iterables;
 
 /**
@@ -1065,7 +1067,7 @@ public class Master extends AccumuloServerContext
     private long updateStatus()
         throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
       Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
-      tserverStatus = Collections.synchronizedSortedMap(gatherTableInformation(currentServers));
+      tserverStatus = gatherTableInformation(currentServers);
       checkForHeldServer(tserverStatus);
 
       if (!badServers.isEmpty()) {
@@ -1146,12 +1148,20 @@ public class Master extends AccumuloServerContext
 
   private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation(
       Set<TServerInstance> currentServers) {
+    final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
+    int threads = getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE);
+    ExecutorService tp = threads == 0 ? Executors.newCachedThreadPool()
+        : Executors.newFixedThreadPool(threads);
     long start = System.currentTimeMillis();
-    int threads = Math.max(getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE),
1);
-    ExecutorService tp = Executors.newFixedThreadPool(threads);
-    final SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<>();
+    final SortedMap<TServerInstance,TabletServerStatus> result = new ConcurrentSkipListMap<>();
     for (TServerInstance serverInstance : currentServers) {
       final TServerInstance server = serverInstance;
+      if (threads == 0) {
+        // Since an unbounded thread pool is being used, rate limit how fast task are added
to the
+        // executor. This prevents the threads from growing large unless there are lots of
+        // unresponsive tservers.
+        sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), TimeUnit.MILLISECONDS);
+      }
       tp.submit(new Runnable() {
         @Override
         public void run() {
@@ -1191,18 +1201,24 @@ public class Master extends AccumuloServerContext
     }
     tp.shutdown();
     try {
-      tp.awaitTermination(getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT)
* 2,
-          TimeUnit.MILLISECONDS);
+      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), TimeUnit.MILLISECONDS);
     } catch (InterruptedException e) {
       log.debug("Interrupted while fetching status");
     }
+
+    tp.shutdownNow();
+
+    // Threads may still modify map after shutdownNow is called, so create an immutable snapshot.
+    SortedMap<TServerInstance,TabletServerStatus> info = ImmutableSortedMap.copyOf(result);
+
     synchronized (badServers) {
       badServers.keySet().retainAll(currentServers);
-      badServers.keySet().removeAll(result.keySet());
+      badServers.keySet().removeAll(info.keySet());
     }
-    log.debug(String.format("Finished gathering information from %d servers in %.2f seconds",
-        result.size(), (System.currentTimeMillis() - start) / 1000.));
-    return result;
+    log.debug(String.format("Finished gathering information from %d of %d servers in %.2f
seconds",
+        info.size(), currentServers.size(), (System.currentTimeMillis() - start) / 1000.));
+
+    return info;
   }
 
   public void run() throws IOException, InterruptedException, KeeperException {


Mime
View raw message