accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject [2/3] accumulo git commit: Merge branch '1.6' into 1.7
Date Wed, 02 Mar 2016 21:03:09 GMT
Merge branch '1.6' into 1.7


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d1a9c524
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d1a9c524
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d1a9c524

Branch: refs/heads/master
Commit: d1a9c524c69b9ea650d9579ed7f3272fa8d2a8e5
Parents: 85b5582 f446b90
Author: Dave Marion <dlmarion@apache.org>
Authored: Wed Mar 2 15:38:20 2016 -0500
Committer: Dave Marion <dlmarion@apache.org>
Committed: Wed Mar 2 15:38:20 2016 -0500

----------------------------------------------------------------------
 .../client/impl/TabletServerBatchWriter.java    | 126 +++++++++++++------
 .../accumulo/core/util/SimpleThreadPool.java    |   6 +
 .../test/functional/BatchWriterFlushIT.java     |  88 ++++++++++++-
 3 files changed, 180 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1a9c524/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index f14116a,491bcc1..8922ac5
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@@ -136,27 -140,22 +139,27 @@@ public class TabletServerBatchWriter 
    private long initialCompileTimes;
    private double initialSystemLoad;
  
-   private int tabletServersBatchSum = 0;
-   private int tabletBatchSum = 0;
-   private int numBatches = 0;
-   private int maxTabletBatch = Integer.MIN_VALUE;
-   private int minTabletBatch = Integer.MAX_VALUE;
-   private int minTabletServersBatch = Integer.MAX_VALUE;
-   private int maxTabletServersBatch = Integer.MIN_VALUE;
+   private AtomicInteger tabletServersBatchSum = new AtomicInteger(0);
+   private AtomicInteger tabletBatchSum = new AtomicInteger(0);
+   private AtomicInteger numBatches = new AtomicInteger(0);
+   private AtomicInteger maxTabletBatch = new AtomicInteger(Integer.MIN_VALUE);
+   private AtomicInteger minTabletBatch = new AtomicInteger(Integer.MAX_VALUE);
+   private AtomicInteger minTabletServersBatch = new AtomicInteger(Integer.MAX_VALUE);
+   private AtomicInteger maxTabletServersBatch = new AtomicInteger(Integer.MIN_VALUE);
  
 +  // error handling
 +  private final Violations violations = new Violations();
 +  private final Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures
= new HashMap<KeyExtent,Set<SecurityErrorCode>>();
 +  private final HashSet<String> serverSideErrors = new HashSet<String>();
 +  private final FailedMutations failedMutations = new FailedMutations();
 +  private int unknownErrors = 0;
 +  private boolean somethingFailed = false;
    private Throwable lastUnknownError = null;
  
 -  private Map<String,TimeoutTracker> timeoutTrackers;
 -
    private static class TimeoutTracker {
  
 -    String server;
 -    long timeOut;
 +    final String server;
 +    final long timeOut;
      long activityTime;
      Long firstErrorTime = null;
  
@@@ -441,23 -460,17 +467,23 @@@
        numTablets += tsm.getMutations().size();
      }
  
-     tabletBatchSum += numTablets;
+     tabletBatchSum.addAndGet(numTablets);
  
-     minTabletBatch = Math.min(minTabletBatch, numTablets);
-     maxTabletBatch = Math.max(maxTabletBatch, numTablets);
+     computeMin(minTabletBatch, numTablets);
+     computeMax(maxTabletBatch, numTablets);
  
-     numBatches++;
+     numBatches.incrementAndGet();
    }
  
 -  private void waitRTE() {
 +  private interface WaitCondition {
 +    boolean shouldWait();
 +  }
 +
 +  private void waitRTE(WaitCondition condition) {
      try {
 -      wait();
 +      while (condition.shouldWait()) {
 +        wait();
 +      }
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
@@@ -639,12 -653,14 +666,14 @@@
        queued = new HashSet<String>();
        sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
        locators = new HashMap<String,TabletLocator>();
+       binningThreadPool = new SimpleThreadPool(1, "BinMutations", new SynchronousQueue<Runnable>());
+       binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
      }
  
-     private TabletLocator getLocator(String tableId) {
+     private synchronized TabletLocator getLocator(String tableId) {
        TabletLocator ret = locators.get(tableId);
        if (ret == null) {
 -        ret = TabletLocator.getLocator(instance, new Text(tableId));
 +        ret = TabletLocator.getLocator(context, new Text(tableId));
          ret = new TimeoutTabletLocator(ret, timeout);
          locators.put(tableId, ret);
        }
@@@ -701,7 -715,27 +730,26 @@@
  
      }
  
-     void addMutations(MutationSet mutationsToSend) {
+     void queueMutations(final MutationSet mutationsToSend) throws InterruptedException {
+       if (null == mutationsToSend)
+         return;
+       binningThreadPool.execute(new Runnable() {
+ 
+         @Override
+         public void run() {
+           if (null != mutationsToSend) {
+             try {
 -              if (log.isTraceEnabled())
 -                log.trace(Thread.currentThread().getName() + " - binning " + mutationsToSend.size()
+ " mutations");
++              log.trace("{} - binning {} mutations", Thread.currentThread().getName(), mutationsToSend.size());
+               addMutations(mutationsToSend);
+             } catch (Exception e) {
+               updateUnknownErrors("Error processing mutation set", e);
+             }
+           }
+         }
+       });
+     }
+ 
+     private void addMutations(MutationSet mutationsToSend) {
        Map<String,TabletServerMutations<Mutation>> binnedMutations = new HashMap<String,TabletServerMutations<Mutation>>();
        Span span = Trace.start("binMutations");
        try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1a9c524/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
index 6378480,e2277a3..63bee16
--- a/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@@ -16,11 -16,18 +16,18 @@@
   */
  package org.apache.accumulo.test.functional;
  
 -import static com.google.common.base.Charsets.UTF_8;
 +import static java.nio.charset.StandardCharsets.UTF_8;
  
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
  import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.List;
  import java.util.Map.Entry;
  import java.util.Random;
+ import java.util.Set;
+ import java.util.TreeSet;
  import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.client.AccumuloException;
@@@ -39,10 -47,9 +47,11 @@@ import org.apache.accumulo.core.util.Si
  import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.accumulo.harness.AccumuloClusterIT;
  import org.apache.hadoop.io.Text;
+ import org.junit.Assert;
  import org.junit.Test;
  
 +import com.google.common.collect.Iterators;
 +
  public class BatchWriterFlushIT extends AccumuloClusterIT {
  
    private static final int NUM_TO_FLUSH = 100000;


Mime
View raw message