accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject accumulo git commit: ACCUMULO-1755: Applied Keiths changes and wrote a test for it.
Date Tue, 01 Mar 2016 20:16:08 GMT
Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-1755 585fab810 -> 123225668


ACCUMULO-1755: Applied Keiths changes and wrote a test for it.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/12322566
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/12322566
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/12322566

Branch: refs/heads/ACCUMULO-1755
Commit: 1232256683f007df926ac998dae8e76d58d6b590
Parents: 585fab8
Author: Dave Marion <dlmarion@apache.org>
Authored: Tue Mar 1 15:15:07 2016 -0500
Committer: Dave Marion <dlmarion@apache.org>
Committed: Tue Mar 1 15:15:07 2016 -0500

----------------------------------------------------------------------
 .../client/impl/TabletServerBatchWriter.java    |  28 ++---
 .../test/functional/BatchWriterFlushIT.java     | 122 ++++++++++++++++++-
 2 files changed, 132 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/12322566/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index 2b586ce..7fe7061 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -32,7 +32,8 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -367,7 +368,7 @@ public class TabletServerBatchWriter {
       checkForFailures();
     } finally {
       // make a best effort to release these resources
-      writer.executor.shutdownNow();
+      writer.binningThreadPool.shutdownNow();
       writer.sendThreadPool.shutdownNow();
       jtimer.cancel();
       span.stop();
@@ -411,10 +412,10 @@ public class TabletServerBatchWriter {
       log.trace(String.format("Total bin time       : %,10.2f secs %6.2f%s", totalBinTime.get()
/ 1000.0,
           100.0 * totalBinTime.get() / (finishTime - startTime), "%"));
       log.trace(String.format("Average bin rate     : %,10.2f mutations/sec", totalBinned.get()
/ (totalBinTime.get() / 1000.0)));
-      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", tabletServersBatchSum.get()
/ (double) numBatches.get(),
-          minTabletServersBatch, maxTabletServersBatch));
-      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", tabletBatchSum.get()
/ (double) numBatches.get(), minTabletBatch,
-          maxTabletBatch));
+      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", (float)
(tabletServersBatchSum.get() / numBatches.get()),
+          minTabletServersBatch.get(), maxTabletServersBatch.get()));
+      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", (float)
(tabletBatchSum.get() / numBatches.get()), minTabletBatch.get(),
+          maxTabletBatch.get()));
       log.trace("");
       log.trace("SYSTEM STATISTICS");
       log.trace(String.format("JVM GC Time          : %,10.2f secs", ((finalGCTimes - initialGCTimes)
/ 1000.0)));
@@ -638,8 +639,7 @@ public class TabletServerBatchWriter {
 
     private static final int MUTATION_BATCH_SIZE = 1 << 17;
     private final ExecutorService sendThreadPool;
-    private final LinkedTransferQueue<Runnable> queue = new LinkedTransferQueue<>();
-    private final SimpleThreadPool executor = new SimpleThreadPool(1, "BinMutations", queue);
+    private final SimpleThreadPool binningThreadPool;
     private final Map<String,TabletServerMutations<Mutation>> serversMutations;
     private final Set<String> queued;
     private final Map<String,TabletLocator> locators;
@@ -649,6 +649,8 @@ public class TabletServerBatchWriter {
       queued = new HashSet<String>();
       sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
       locators = new HashMap<String,TabletLocator>();
+      binningThreadPool = new SimpleThreadPool(1, "BinMutations", new SynchronousQueue<Runnable>());
+      binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
     }
 
     private TabletLocator getLocator(String tableId) {
@@ -713,13 +715,14 @@ public class TabletServerBatchWriter {
     void queueMutations(final MutationSet mutationsToSend) throws InterruptedException {
       if (null == mutationsToSend)
         return;
-      boolean transferred = queue.tryTransfer(new Runnable() {
+      binningThreadPool.execute(new Runnable() {
         final MutationSet m = mutationsToSend;
 
         @Override
         public void run() {
           if (null != m) {
             try {
+              log.trace("{} - binning {} mutations", Thread.currentThread().getName(), m.size());
               addMutations(m);
             } catch (Exception e) {
               updateUnknownErrors("Error processing mutation set", e);
@@ -727,13 +730,6 @@ public class TabletServerBatchWriter {
           }
         }
       });
-      if (!transferred) {
-        try {
-          addMutations(mutationsToSend);
-        } catch (Exception e) {
-          updateUnknownErrors("Error processing mutation set", e);
-        }
-      }
     }
 
     private void addMutations(MutationSet mutationsToSend) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/12322566/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
index 353a6b9..e479f69 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@ -16,11 +16,20 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -31,21 +40,27 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.TabletServerBatchWriter;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.commons.lang3.builder.CompareToBuilder;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Iterators;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 
 public class BatchWriterFlushIT extends AccumuloClusterHarness {
 
   private static final int NUM_TO_FLUSH = 100000;
+  private static final int NUM_THREADS = 3;
 
   @Override
   protected int defaultTimeoutSeconds() {
@@ -62,7 +77,6 @@ public class BatchWriterFlushIT extends AccumuloClusterHarness {
     c.tableOperations().create(bwlt);
     runFlushTest(bwft);
     runLatencyTest(bwlt);
-
   }
 
   private void runLatencyTest(String tableName) throws Exception {
@@ -165,6 +179,110 @@ public class BatchWriterFlushIT extends AccumuloClusterHarness {
     }
   }
 
+  @Test
+  public void runMultiThreadedBinningTest() throws Exception {
+    Connector c = getConnector();
+    String[] tableNames = getUniqueNames(1);
+    String tableName = tableNames[0];
+    c.tableOperations().create(tableName);
+    for (int x = 0; x < NUM_THREADS; x++) {
+      c.tableOperations().addSplits(tableName, new TreeSet<Text>(Collections.singleton(new
Text(Integer.toString(x * NUM_TO_FLUSH)))));
+    }
+    c.instanceOperations().waitForBalance();
+
+    //Logger.getLogger(TabletServerBatchWriter.class).setLevel(Level.TRACE);
+    final List<Set<Mutation>> allMuts = new LinkedList<>();
+    final ConcurrentSkipListSet<Mutation> data = new ConcurrentSkipListSet<>(new
Comparator<Mutation>() {
+      @Override
+      public int compare(Mutation o1, Mutation o2) {
+        CompareToBuilder compare = new CompareToBuilder();
+        compare.append(o1.getRow().toString(), o2.getRow().toString());
+        compare.append(o1.getUpdates().size(), o2.getUpdates().size());
+        if (o1.getUpdates().size() == o2.getUpdates().size()) {
+          for (int x = 0; x < o1.getUpdates().size(); x++) {
+            compare.append(o1.getUpdates().get(x).toString(), o2.getUpdates().get(x).toString());
+          }
+        }
+        return compare.toComparison();
+      }
+    });
+    SimpleThreadPool createThreads = new SimpleThreadPool(NUM_THREADS, "CreateThreads");
+    createThreads.allowCoreThreadTimeOut(false);
+    createThreads.prestartAllCoreThreads();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      final int thread = i;
+      createThreads.execute(new Runnable() {
+        @Override
+        public void run() {
+          for (int j = 0; j < NUM_TO_FLUSH; j++) {
+            int row = thread * NUM_TO_FLUSH + j;
+
+            Mutation m = new Mutation(new Text(String.format("%10d", row)));
+            m.put(new Text("cf" + thread), new Text("cq"), new Value(("" + row).getBytes()));
+            data.add(m);
+          }
+        }
+      });
+    }
+    createThreads.shutdown();
+    createThreads.awaitTermination(3, TimeUnit.MINUTES);
+    Assert.assertEquals(NUM_THREADS * NUM_TO_FLUSH, data.size());
+    List<Mutation> shuffled = new LinkedList<>(data);
+    Collections.shuffle(shuffled);
+    for (int n = 0; n < (NUM_THREADS * NUM_TO_FLUSH); n += NUM_TO_FLUSH) {
+      System.out.println("subList: " + (n) + " to " + (n + NUM_TO_FLUSH));
+      Set<Mutation> muts = new HashSet<>(shuffled.subList(n, n + NUM_TO_FLUSH));
+      allMuts.add(muts);
+    }
+
+    SimpleThreadPool threads = new SimpleThreadPool(NUM_THREADS, "ClientThreads");
+    threads.allowCoreThreadTimeOut(false);
+    threads.prestartAllCoreThreads();
+
+    BatchWriterConfig cfg = new BatchWriterConfig();
+    cfg.setMaxLatency(10, TimeUnit.SECONDS);
+    cfg.setMaxMemory(1 * 1024 * 1024);
+    cfg.setMaxWriteThreads(NUM_THREADS);
+    final BatchWriter bw = getConnector().createBatchWriter(tableName, cfg);
+
+    for (int k = 0; k < NUM_THREADS; k++) {
+      final int idx = k;
+      threads.execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            bw.addMutations(allMuts.get(idx));
+            bw.flush();
+          } catch (MutationsRejectedException e) {
+            Assert.fail("Error adding mutations to batch writer");
+          }
+        }
+      });
+    }
+    threads.shutdown();
+    threads.awaitTermination(3, TimeUnit.MINUTES);
+    bw.close();
+    Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
+    for (Entry<Key,Value> e : scanner) {
+      Mutation m = new Mutation(e.getKey().getRow());
+      m.put(e.getKey().getColumnFamily(), e.getKey().getColumnQualifier(), e.getValue());
+      boolean found = false;
+      for (int l = 0; l < NUM_THREADS; l++) {
+        if (allMuts.get(l).contains(m)) {
+          found = true;
+          allMuts.get(l).remove(m);
+          break;
+        }
+      }
+      Assert.assertTrue("Mutation not found: " + m.toString(), found);
+    }
+
+    for (int m = 0; m < NUM_THREADS; m++) {
+      Assert.assertEquals(0, allMuts.get(m).size());
+    }
+
+  }
+
   private void verifyEntry(int row, Entry<Key,Value> entry) throws Exception {
     if (!entry.getKey().getRow().toString().equals(String.format("r_%10d", row))) {
       throw new Exception("Unexpected key returned, expected " + row + " got " + entry.getKey());


Mime
View raw message