accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [1/5] accumulo git commit: ACCUMULO-1177 use a thread pool for tablet assignment, but limit recovery to a single thread
Date Wed, 26 Nov 2014 18:28:18 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 09a761b82 -> 9be9576d9


ACCUMULO-1177 use a thread pool for tablet assignment, but limit recovery to a single thread


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/11e65076
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/11e65076
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/11e65076

Branch: refs/heads/master
Commit: 11e650767bfbac61441160ac0ed5f9ce557c32fb
Parents: bedbca5
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Mon Nov 10 16:01:42 2014 -0500
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Mon Nov 10 16:01:57 2014 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  1 +
 .../apache/accumulo/tserver/TabletServer.java   | 53 +++++++------
 .../tserver/TabletServerResourceManager.java    |  5 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  1 -
 .../accumulo/test/AssignmentThreadsIT.java      | 81 ++++++++++++++++++++
 5 files changed, 115 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/11e65076/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f59b654..e784bd2 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -295,6 +295,7 @@ public enum Property {
   TSERV_REPLICATION_DEFAULT_HANDLER("tserver.replication.default.replayer", "org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer",
       PropertyType.CLASSNAME, "Default AccumuloReplicationReplayer implementation"),
   TSERV_REPLICATION_BW_REPLAYER_MEMORY("tserver.replication.batchwriter.replayer.memory",
"50M", PropertyType.MEMORY, "Memory to provide to batchwriter to replay mutations for replication"),
+  TSERV_ASSIGNMENT_MAXCONCURRENT("tserver.assignement.concurrent.max", "2", PropertyType.COUNT,
"The number of threads available to load tablets."),
 
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect
the behavior of the write-ahead logger servers"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/11e65076/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1e81947..e96ff67 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -45,6 +45,7 @@ import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -238,7 +239,6 @@ import org.apache.thrift.TServiceClient;
 import org.apache.thrift.server.TServer;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
-
 import com.google.common.net.HostAndPort;
 
 public class TabletServer implements Runnable {
@@ -357,6 +357,7 @@ public class TabletServer implements Runnable {
   private final RowLocks rowLocks = new RowLocks();
 
   private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
+  private final Semaphore recoverySemaphore = new Semaphore(1, true);
 
   private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface
{
 
@@ -2898,31 +2899,37 @@ public class TabletServer implements Runnable {
   }
 
   public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf, List<LogEntry>
logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver)
-      throws IOException {
-    List<Path> recoveryLogs = new ArrayList<Path>();
-    List<LogEntry> sorted = new ArrayList<LogEntry>(logEntries);
-    Collections.sort(sorted, new Comparator<LogEntry>() {
-      @Override
-      public int compare(LogEntry e1, LogEntry e2) {
-        return (int) (e1.timestamp - e2.timestamp);
-      }
-    });
-    for (LogEntry entry : sorted) {
-      Path recovery = null;
-      for (String log : entry.logSet) {
-        Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log));
-        finished = SortedLogState.getFinishedMarkerPath(finished);
-        TabletServer.log.info("Looking for " + finished);
-        if (fs.exists(finished)) {
-          recovery = finished.getParent();
-          break;
+      throws IOException, InterruptedException {
+    recoverySemaphore.acquire();
+    try {
+      log.info("Starting Write-Ahead Log recovery for " + extent);
+      List<Path> recoveryLogs = new ArrayList<Path>();
+      List<LogEntry> sorted = new ArrayList<LogEntry>(logEntries);
+      Collections.sort(sorted, new Comparator<LogEntry>() {
+        @Override
+        public int compare(LogEntry e1, LogEntry e2) {
+          return (int) (e1.timestamp - e2.timestamp);
         }
+      });
+      for (LogEntry entry : sorted) {
+        Path recovery = null;
+        for (String log : entry.logSet) {
+          Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log));
+          finished = SortedLogState.getFinishedMarkerPath(finished);
+          TabletServer.log.info("Looking for " + finished);
+          if (fs.exists(finished)) {
+            recovery = finished.getParent();
+            break;
+          }
+        }
+        if (recovery == null)
+          throw new IOException("Unable to find recovery files for extent " + extent + "
logEntry: " + entry);
+        recoveryLogs.add(recovery);
       }
-      if (recovery == null)
-        throw new IOException("Unable to find recovery files for extent " + extent + " logEntry:
" + entry);
-      recoveryLogs.add(recovery);
+      logger.recover(fs, extent, tconf, recoveryLogs, tabletFiles, mutationReceiver);
+    } finally {
+      recoverySemaphore.release();
     }
-    logger.recover(fs, extent, tconf, recoveryLogs, tabletFiles, mutationReceiver);
   }
 
   public int createLogId(KeyExtent tablet) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/11e65076/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index ba86522..6ac6354 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -196,9 +196,10 @@ public class TabletServerResourceManager {
     // not sure if concurrent assignments can run safely... even if they could there is probably
no benefit at startup because
     // individual tablet servers are already running assignments concurrently... having each
individual tablet server run
     // concurrent assignments would put more load on the metadata table at startup
-    assignmentPool = createEs(1, "tablet assignment");
+    assignmentPool = createEs(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "tablet assignment");
 
-    assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment");
+    int max = acuConf.getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT);
+    assignMetaDataPool = createEs(0, max, 60, "metadata tablet assignment");
 
     activeAssignments = new ConcurrentHashMap<KeyExtent,RunnableStartedAt>();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/11e65076/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 9490903..6e4b459 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -585,7 +585,6 @@ public class Tablet implements TabletCommitter {
     configObserver.propertiesChanged();
 
     if (!logEntries.isEmpty()) {
-      log.info("Starting Write-Ahead Log recovery for " + this.extent);
       // count[0] = entries used on tablet
       // count[1] = track max time from walog entries wihtout timestamps
       final long[] count = new long[2];

http://git-wip-us.apache.org/repos/asf/accumulo/blob/11e65076/test/src/test/java/org/apache/accumulo/test/AssignmentThreadsIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/AssignmentThreadsIT.java b/test/src/test/java/org/apache/accumulo/test/AssignmentThreadsIT.java
new file mode 100644
index 0000000..01d53a3
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/AssignmentThreadsIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.accumulo.test;
+
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class AssignmentThreadsIT extends ConfigurableMacIT {
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(1);
+    cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "1");
+  }
+  
+  @Test(timeout = 5 * 60 * 1000)
+  public void test() throws Exception {
+    byte[] HEXCHARS = { 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x61,
0x62, 0x63, 0x64, 0x65, 0x66 };
+    // make a table with a lot of splits
+    String tableName = getUniqueNames(1)[0];
+    Connector c = getConnector();
+    c.tableOperations().create(tableName);
+    SortedSet<Text> splits = new TreeSet<Text>();
+    Random random = new Random();
+    byte[] split = new byte[8];
+    byte[] hex = new byte[split.length * 2];
+    for (int i = 0; i < 4000; i++) {
+      random.nextBytes(split);
+      int count = 0;
+      for (byte x : split) {
+        hex[count++] = HEXCHARS[(x >> 4)&0xf];
+        hex[count++] = HEXCHARS[x&0xf];
+      }
+      splits.add(new Text(hex));
+    }
+    c.tableOperations().addSplits(tableName, splits);
+    c.tableOperations().offline(tableName, true);
+    // time how long it takes to load
+    long now = System.currentTimeMillis();
+    c.tableOperations().online(tableName, true);
+    long diff = System.currentTimeMillis() - now;
+    log.debug("Loaded " + splits.size() + " tablets in " + diff + " ms");
+    c.instanceOperations().setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT.getKey(),
"20");
+    now = System.currentTimeMillis();
+    c.tableOperations().offline(tableName, true);
+    // wait >10 seconds for thread pool to update
+    UtilWaitThread.sleep(Math.max(0, now + 11 * 1000 - System.currentTimeMillis()));
+    now = System.currentTimeMillis();
+    c.tableOperations().online(tableName, true);
+    long diff2 = System.currentTimeMillis() - now;
+    log.debug("Loaded " + splits.size() + " tablets in " + diff2 + " ms");
+    assertTrue(diff2 < diff);
+  }
+  
+}


Mime
View raw message