accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject accumulo git commit: ACCUMULO-1755: Reverted changes, simply add a queue for sending mutations to the MutationWriter
Date Fri, 26 Feb 2016 18:31:59 GMT
Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-1755 9bd48cdbd -> be3843c26


ACCUMULO-1755: Reverted changes, simply add a queue for sending mutations to the MutationWriter


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/be3843c2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/be3843c2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/be3843c2

Branch: refs/heads/ACCUMULO-1755
Commit: be3843c267b6b3f91fdd270aeb142a6d2cb9d9b3
Parents: 9bd48cd
Author: Dave Marion <dlmarion@apache.org>
Authored: Fri Feb 26 13:30:38 2016 -0500
Committer: Dave Marion <dlmarion@apache.org>
Committed: Fri Feb 26 13:30:38 2016 -0500

----------------------------------------------------------------------
 .../client/impl/TabletServerBatchWriter.java    | 58 +++++++++++---------
 1 file changed, 31 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/be3843c2/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index 3e35e13..b494d8f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -33,7 +33,6 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -116,8 +115,7 @@ public class TabletServerBatchWriter {
   private MutationSet mutations;
 
   // background writer
-  private final ThreadLocal<MutationWriter> writer = new ThreadLocal<>();
-  private final ExecutorService sendThreadPool;
+  private final MutationWriter writer;
 
   // latency timers
   private final Timer jtimer = new Timer("BatchWriterLatencyTimer", true);
@@ -138,8 +136,8 @@ public class TabletServerBatchWriter {
   private double initialSystemLoad;
 
   private int tabletServersBatchSum = 0;
-  private final AtomicInteger tabletBatchSum = new AtomicInteger(0);
-  private final AtomicInteger numBatches = new AtomicInteger(0);
+  private int tabletBatchSum = 0;
+  private int numBatches = 0;
   private int maxTabletBatch = Integer.MIN_VALUE;
   private int minTabletBatch = Integer.MAX_VALUE;
   private int minTabletServersBatch = Integer.MAX_VALUE;
@@ -200,7 +198,8 @@ public class TabletServerBatchWriter {
     this.mutations = new MutationSet();
     this.lastProcessingStartTime = System.currentTimeMillis();
     this.durability = config.getDurability();
-    sendThreadPool = new SimpleThreadPool(config.getMaxWriteThreads(), MutationWriter.class.getName());
+
+    this.writer = new MutationWriter(config.getMaxWriteThreads());
 
     if (this.maxLatency != Long.MAX_VALUE) {
       jtimer.schedule(new TimerTask() {
@@ -219,19 +218,12 @@ public class TabletServerBatchWriter {
     }
   }
 
-  private void startProcessing() {
-    MutationSet prev = null;
-    synchronized (this) {
-      if (mutations.getMemoryUsed() == 0)
-        return;
-      if (null == writer.get()) {
-        writer.set(new MutationWriter());
-      }
-      lastProcessingStartTime = System.currentTimeMillis();
-      prev = mutations;
-      mutations = new MutationSet();
-    }
-    writer.get().addMutations(prev);
+  private synchronized void startProcessing() {
+    if (mutations.getMemoryUsed() == 0)
+      return;
+    lastProcessingStartTime = System.currentTimeMillis();
+    writer.queueMutations(mutations);
+    mutations = new MutationSet();
   }
 
   private synchronized void decrementMemUsed(long amount) {
@@ -368,7 +360,8 @@ public class TabletServerBatchWriter {
       checkForFailures();
     } finally {
       // make a best effort to release these resources
-      sendThreadPool.shutdownNow();
+      writer.queueThreadPool.shutdownNow();
+      writer.sendThreadPool.shutdownNow();
       jtimer.cancel();
       span.stop();
     }
@@ -411,10 +404,9 @@ public class TabletServerBatchWriter {
       log.trace(String.format("Total bin time       : %,10.2f secs %6.2f%s", totalBinTime.get()
/ 1000.0,
           100.0 * totalBinTime.get() / (finishTime - startTime), "%"));
       log.trace(String.format("Average bin rate     : %,10.2f mutations/sec", totalBinned.get()
/ (totalBinTime.get() / 1000.0)));
-      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", tabletServersBatchSum
/ (double) numBatches.get(), minTabletServersBatch,
+      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", tabletServersBatchSum
/ (double) numBatches, minTabletServersBatch,
           maxTabletServersBatch));
-      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", tabletBatchSum.get()
/ (double) numBatches.get(), minTabletBatch,
-          maxTabletBatch));
+      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", tabletBatchSum
/ (double) numBatches, minTabletBatch, maxTabletBatch));
       log.trace("");
       log.trace("SYSTEM STATISTICS");
       log.trace(String.format("JVM GC Time          : %,10.2f secs", ((finalGCTimes - initialGCTimes)
/ 1000.0)));
@@ -449,12 +441,12 @@ public class TabletServerBatchWriter {
       numTablets += tsm.getMutations().size();
     }
 
-    tabletBatchSum.getAndAdd(numTablets);
+    tabletBatchSum += numTablets;
 
     minTabletBatch = Math.min(minTabletBatch, numTablets);
     maxTabletBatch = Math.max(maxTabletBatch, numTablets);
 
-    numBatches.getAndIncrement();
+    numBatches++;
   }
 
   private interface WaitCondition {
@@ -637,13 +629,16 @@ public class TabletServerBatchWriter {
   private class MutationWriter {
 
     private static final int MUTATION_BATCH_SIZE = 1 << 17;
+    private final ExecutorService sendThreadPool;
+    private final ExecutorService queueThreadPool = new SimpleThreadPool(1, "QueueMutations");
     private final Map<String,TabletServerMutations<Mutation>> serversMutations;
     private final Set<String> queued;
     private final Map<String,TabletLocator> locators;
 
-    public MutationWriter() {
+    public MutationWriter(int numSendThreads) {
       serversMutations = new HashMap<String,TabletServerMutations<Mutation>>();
       queued = new HashSet<String>();
+      sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
       locators = new HashMap<String,TabletLocator>();
     }
 
@@ -706,7 +701,16 @@ public class TabletServerBatchWriter {
 
     }
 
-    void addMutations(MutationSet mutationsToSend) {
+    void queueMutations(final MutationSet mutationsToSend) {
+      queueThreadPool.submit(new Runnable() {
+        public void run() {
+          addMutations(mutationsToSend);
+        }
+      });
+
+    }
+
+    private void addMutations(MutationSet mutationsToSend) {
       Map<String,TabletServerMutations<Mutation>> binnedMutations = new HashMap<String,TabletServerMutations<Mutation>>();
       Span span = Trace.start("binMutations");
       try {


Mime
View raw message