accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject accumulo git commit: ACCUMULO-1755: Updated based on comments from review
Date Mon, 29 Feb 2016 14:32:02 GMT
Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-1755 e30c77ca3 -> bcbe7417c


ACCUMULO-1755: Updated based on comments from review


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bcbe7417
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bcbe7417
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bcbe7417

Branch: refs/heads/ACCUMULO-1755
Commit: bcbe7417c98773c914a206e378f9b3265caeff6b
Parents: e30c77c
Author: Dave Marion <dlmarion@apache.org>
Authored: Mon Feb 29 09:31:07 2016 -0500
Committer: Dave Marion <dlmarion@apache.org>
Committed: Mon Feb 29 09:31:07 2016 -0500

----------------------------------------------------------------------
 .../client/impl/TabletServerBatchWriter.java    | 87 +++++++++++---------
 1 file changed, 48 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/bcbe7417/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index dead19f..3c848fe 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -32,9 +32,10 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -137,13 +138,13 @@ public class TabletServerBatchWriter {
   private long initialCompileTimes;
   private double initialSystemLoad;
 
-  private int tabletServersBatchSum = 0;
-  private int tabletBatchSum = 0;
-  private int numBatches = 0;
-  private int maxTabletBatch = Integer.MIN_VALUE;
-  private int minTabletBatch = Integer.MAX_VALUE;
-  private int minTabletServersBatch = Integer.MAX_VALUE;
-  private int maxTabletServersBatch = Integer.MIN_VALUE;
+  private AtomicInteger tabletServersBatchSum = new AtomicInteger(0);
+  private AtomicInteger tabletBatchSum = new AtomicInteger(0);
+  private AtomicInteger numBatches = new AtomicInteger(0);
+  private AtomicInteger maxTabletBatch = new AtomicInteger(Integer.MIN_VALUE);
+  private AtomicInteger minTabletBatch = new AtomicInteger(Integer.MAX_VALUE);
+  private AtomicInteger minTabletServersBatch = new AtomicInteger(Integer.MAX_VALUE);
+  private AtomicInteger maxTabletServersBatch = new AtomicInteger(Integer.MIN_VALUE);
 
   // error handling
   private final Violations violations = new Violations();
@@ -367,7 +368,7 @@ public class TabletServerBatchWriter {
       checkForFailures();
     } finally {
       // make a best effort to release these resources
-      writer.binTimer.cancel();
+      writer.executor.shutdownNow();
       writer.sendThreadPool.shutdownNow();
       jtimer.cancel();
       span.stop();
@@ -411,9 +412,10 @@ public class TabletServerBatchWriter {
       log.trace(String.format("Total bin time       : %,10.2f secs %6.2f%s", totalBinTime.get()
/ 1000.0,
           100.0 * totalBinTime.get() / (finishTime - startTime), "%"));
       log.trace(String.format("Average bin rate     : %,10.2f mutations/sec", totalBinned.get()
/ (totalBinTime.get() / 1000.0)));
-      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", tabletServersBatchSum
/ (double) numBatches, minTabletServersBatch,
-          maxTabletServersBatch));
-      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", tabletBatchSum
/ (double) numBatches, minTabletBatch, maxTabletBatch));
+      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", tabletServersBatchSum.get()
/ (double) numBatches.get(),
+          minTabletServersBatch, maxTabletServersBatch));
+      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", tabletBatchSum.get()
/ (double) numBatches.get(), minTabletBatch,
+          maxTabletBatch));
       log.trace("");
       log.trace("SYSTEM STATISTICS");
       log.trace(String.format("JVM GC Time          : %,10.2f secs", ((finalGCTimes - initialGCTimes)
/ 1000.0)));
@@ -435,11 +437,11 @@ public class TabletServerBatchWriter {
     updateBatchStats(binnedMutations);
   }
 
-  private synchronized void updateBatchStats(Map<String,TabletServerMutations<Mutation>>
binnedMutations) {
-    tabletServersBatchSum += binnedMutations.size();
+  private void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations)
{
+    tabletServersBatchSum.addAndGet(binnedMutations.size());
 
-    minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size());
-    maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size());
+    minTabletServersBatch.set(Math.min(minTabletServersBatch.get(), binnedMutations.size()));
+    maxTabletServersBatch.set(Math.max(maxTabletServersBatch.get(), binnedMutations.size()));
 
     int numTablets = 0;
 
@@ -448,12 +450,12 @@ public class TabletServerBatchWriter {
       numTablets += tsm.getMutations().size();
     }
 
-    tabletBatchSum += numTablets;
+    tabletBatchSum.addAndGet(numTablets);
 
-    minTabletBatch = Math.min(minTabletBatch, numTablets);
-    maxTabletBatch = Math.max(maxTabletBatch, numTablets);
+    minTabletBatch.set(Math.min(minTabletBatch.get(), numTablets));
+    maxTabletBatch.set(Math.max(maxTabletBatch.get(), numTablets));
 
-    numBatches++;
+    numBatches.incrementAndGet();
   }
 
   private interface WaitCondition {
@@ -637,8 +639,8 @@ public class TabletServerBatchWriter {
 
     private static final int MUTATION_BATCH_SIZE = 1 << 17;
     private final ExecutorService sendThreadPool;
-    private final LinkedBlockingQueue<MutationSet> queue = new LinkedBlockingQueue<>(10);
-    private final Timer binTimer = new Timer("Timer-BinningQueuedMutations", true);
+    private final LinkedTransferQueue<Runnable> queue = new LinkedTransferQueue<>();
+    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.SECONDS,
queue);
     private final Map<String,TabletServerMutations<Mutation>> serversMutations;
     private final Set<String> queued;
     private final Map<String,TabletLocator> locators;
@@ -648,21 +650,6 @@ public class TabletServerBatchWriter {
       queued = new HashSet<String>();
       sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
       locators = new HashMap<String,TabletLocator>();
-      binTimer.schedule(new TimerTask() {
-        @Override
-        public void run() {
-          MutationSet m = queue.poll();
-          while (null != m) {
-            try {
-              addMutations(m);
-            } catch (Exception e) {
-              updateUnknownErrors("Error processing mutation set", e);
-            }
-            m = queue.poll();
-          }
-        }
-      }, 0, 500);
-
     }
 
     private TabletLocator getLocator(String tableId) {
@@ -725,7 +712,29 @@ public class TabletServerBatchWriter {
     }
 
     void queueMutations(final MutationSet mutationsToSend) throws InterruptedException {
-      queue.put(mutationsToSend);
+      if (null == mutationsToSend)
+        return;
+      boolean transferred = queue.tryTransfer(new Runnable() {
+        final MutationSet m = mutationsToSend;
+
+        @Override
+        public void run() {
+          if (null != m) {
+            try {
+              addMutations(m);
+            } catch (Exception e) {
+              updateUnknownErrors("Error processing mutation set", e);
+            }
+          }
+        }
+      });
+      if (!transferred) {
+        try {
+          addMutations(mutationsToSend);
+        } catch (Exception e) {
+          updateUnknownErrors("Error processing mutation set", e);
+        }
+      }
     }
 
     private void addMutations(MutationSet mutationsToSend) {


Mime
View raw message