accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject accumulo git commit: ACCUMULO-1755: Updated from comments in review
Date Fri, 26 Feb 2016 23:55:35 GMT
Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-1755 06a5f2d13 -> e30c77ca3


ACCUMULO-1755: Updated from comments in review


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e30c77ca
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e30c77ca
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e30c77ca

Branch: refs/heads/ACCUMULO-1755
Commit: e30c77ca3c7b7038dc4bb2512a41ea4dc3d65862
Parents: 06a5f2d
Author: Dave Marion <dlmarion@hotmail.com>
Authored: Fri Feb 26 18:55:07 2016 -0500
Committer: Dave Marion <dlmarion@hotmail.com>
Committed: Fri Feb 26 18:55:07 2016 -0500

----------------------------------------------------------------------
 .../client/impl/TabletServerBatchWriter.java    | 32 ++++++++++++++------
 1 file changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e30c77ca/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index e1242b6..dead19f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -225,7 +226,7 @@ public class TabletServerBatchWriter {
     lastProcessingStartTime = System.currentTimeMillis();
     try {
       writer.queueMutations(mutations);
-    } catch (RejectedExecutionException e) {
+    } catch (InterruptedException e) {
       log.warn("Mutations rejected from binning thread, retrying...");
       failedMutations.add(mutations);
     }
@@ -366,7 +367,7 @@ public class TabletServerBatchWriter {
       checkForFailures();
     } finally {
       // make a best effort to release these resources
-      writer.queueThreadPool.shutdownNow();
+      writer.binTimer.cancel();
       writer.sendThreadPool.shutdownNow();
       jtimer.cancel();
       span.stop();
@@ -636,7 +637,8 @@ public class TabletServerBatchWriter {
 
     private static final int MUTATION_BATCH_SIZE = 1 << 17;
     private final ExecutorService sendThreadPool;
-    private final ExecutorService queueThreadPool = new SimpleThreadPool(1, "QueueMutationsForBinning");
+    private final LinkedBlockingQueue<MutationSet> queue = new LinkedBlockingQueue<>(10);
+    private final Timer binTimer = new Timer("Timer-BinningQueuedMutations", true);
     private final Map<String,TabletServerMutations<Mutation>> serversMutations;
     private final Set<String> queued;
     private final Map<String,TabletLocator> locators;
@@ -646,6 +648,21 @@ public class TabletServerBatchWriter {
       queued = new HashSet<String>();
       sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
       locators = new HashMap<String,TabletLocator>();
+      binTimer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          MutationSet m = queue.poll();
+          while (null != m) {
+            try {
+              addMutations(m);
+            } catch (Exception e) {
+              updateUnknownErrors("Error processing mutation set", e);
+            }
+            m = queue.poll();
+          }
+        }
+      }, 0, 500);
+
     }
 
     private TabletLocator getLocator(String tableId) {
@@ -707,13 +724,8 @@ public class TabletServerBatchWriter {
 
     }
 
-    void queueMutations(final MutationSet mutationsToSend) {
-      queueThreadPool.submit(new Runnable() {
-        public void run() {
-          addMutations(mutationsToSend);
-        }
-      });
-
+    void queueMutations(final MutationSet mutationsToSend) throws InterruptedException {
+      queue.put(mutationsToSend);
     }
 
     private void addMutations(MutationSet mutationsToSend) {


Mime
View raw message