accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject accumulo git commit: ACCUMULO-1755: Modified TSBW so that all client threads will not block on binMutations
Date Fri, 26 Feb 2016 16:31:34 GMT
Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-1755 [created] 9bd48cdbd


ACCUMULO-1755: Modified TSBW so that all client threads will not block on binMutations

Before this change all client threads that were adding mutations to a BatchWriter would block
when the mutations needed to be written to the tablet servers. This change gives each client
thread their own MutationWriter object using a ThreadLocal. With this change only one client
should block on adding the mutations to the sendThreadPool object, and allows the other client
threads to push mutations onto a new MutationSet.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9bd48cdb
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9bd48cdb
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9bd48cdb

Branch: refs/heads/ACCUMULO-1755
Commit: 9bd48cdbd25a7baf19bbda007a8e36f8ec81cf07
Parents: c4d6eee
Author: Dave Marion <dlmarion@apache.org>
Authored: Fri Feb 26 11:26:53 2016 -0500
Committer: Dave Marion <dlmarion@apache.org>
Committed: Fri Feb 26 11:26:53 2016 -0500

----------------------------------------------------------------------
 .../client/impl/TabletServerBatchWriter.java    | 45 +++++++++++---------
 1 file changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9bd48cdb/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index bc90d00..3e35e13 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -33,6 +33,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -115,7 +116,8 @@ public class TabletServerBatchWriter {
   private MutationSet mutations;
 
   // background writer
-  private final MutationWriter writer;
+  private final ThreadLocal<MutationWriter> writer = new ThreadLocal<>();
+  private final ExecutorService sendThreadPool;
 
   // latency timers
   private final Timer jtimer = new Timer("BatchWriterLatencyTimer", true);
@@ -136,8 +138,8 @@ public class TabletServerBatchWriter {
   private double initialSystemLoad;
 
   private int tabletServersBatchSum = 0;
-  private int tabletBatchSum = 0;
-  private int numBatches = 0;
+  private final AtomicInteger tabletBatchSum = new AtomicInteger(0);
+  private final AtomicInteger numBatches = new AtomicInteger(0);
   private int maxTabletBatch = Integer.MIN_VALUE;
   private int minTabletBatch = Integer.MAX_VALUE;
   private int minTabletServersBatch = Integer.MAX_VALUE;
@@ -198,8 +200,7 @@ public class TabletServerBatchWriter {
     this.mutations = new MutationSet();
     this.lastProcessingStartTime = System.currentTimeMillis();
     this.durability = config.getDurability();
-
-    this.writer = new MutationWriter(config.getMaxWriteThreads());
+    sendThreadPool = new SimpleThreadPool(config.getMaxWriteThreads(), MutationWriter.class.getName());
 
     if (this.maxLatency != Long.MAX_VALUE) {
       jtimer.schedule(new TimerTask() {
@@ -218,12 +219,19 @@ public class TabletServerBatchWriter {
     }
   }
 
-  private synchronized void startProcessing() {
-    if (mutations.getMemoryUsed() == 0)
-      return;
-    lastProcessingStartTime = System.currentTimeMillis();
-    writer.addMutations(mutations);
-    mutations = new MutationSet();
+  private void startProcessing() {
+    MutationSet prev = null;
+    synchronized (this) {
+      if (mutations.getMemoryUsed() == 0)
+        return;
+      if (null == writer.get()) {
+        writer.set(new MutationWriter());
+      }
+      lastProcessingStartTime = System.currentTimeMillis();
+      prev = mutations;
+      mutations = new MutationSet();
+    }
+    writer.get().addMutations(prev);
   }
 
   private synchronized void decrementMemUsed(long amount) {
@@ -360,7 +368,7 @@ public class TabletServerBatchWriter {
       checkForFailures();
     } finally {
       // make a best effort to release these resources
-      writer.sendThreadPool.shutdownNow();
+      sendThreadPool.shutdownNow();
       jtimer.cancel();
       span.stop();
     }
@@ -403,9 +411,10 @@ public class TabletServerBatchWriter {
       log.trace(String.format("Total bin time       : %,10.2f secs %6.2f%s", totalBinTime.get()
/ 1000.0,
           100.0 * totalBinTime.get() / (finishTime - startTime), "%"));
       log.trace(String.format("Average bin rate     : %,10.2f mutations/sec", totalBinned.get()
/ (totalBinTime.get() / 1000.0)));
-      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", tabletServersBatchSum
/ (double) numBatches, minTabletServersBatch,
+      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", tabletServersBatchSum
/ (double) numBatches.get(), minTabletServersBatch,
           maxTabletServersBatch));
-      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", tabletBatchSum
/ (double) numBatches, minTabletBatch, maxTabletBatch));
+      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", tabletBatchSum.get()
/ (double) numBatches.get(), minTabletBatch,
+          maxTabletBatch));
       log.trace("");
       log.trace("SYSTEM STATISTICS");
       log.trace(String.format("JVM GC Time          : %,10.2f secs", ((finalGCTimes - initialGCTimes)
/ 1000.0)));
@@ -440,12 +449,12 @@ public class TabletServerBatchWriter {
       numTablets += tsm.getMutations().size();
     }
 
-    tabletBatchSum += numTablets;
+    tabletBatchSum.getAndAdd(numTablets);
 
     minTabletBatch = Math.min(minTabletBatch, numTablets);
     maxTabletBatch = Math.max(maxTabletBatch, numTablets);
 
-    numBatches++;
+    numBatches.getAndIncrement();
   }
 
   private interface WaitCondition {
@@ -628,15 +637,13 @@ public class TabletServerBatchWriter {
   private class MutationWriter {
 
     private static final int MUTATION_BATCH_SIZE = 1 << 17;
-    private final ExecutorService sendThreadPool;
     private final Map<String,TabletServerMutations<Mutation>> serversMutations;
     private final Set<String> queued;
     private final Map<String,TabletLocator> locators;
 
-    public MutationWriter(int numSendThreads) {
+    public MutationWriter() {
       serversMutations = new HashMap<String,TabletServerMutations<Mutation>>();
       queued = new HashSet<String>();
-      sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
       locators = new HashMap<String,TabletLocator>();
     }
 


Mime
View raw message