accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject accumulo git commit: ACCUMULO-1755: Removed threading from test when pre-creating mutations, removed unused imports
Date Tue, 01 Mar 2016 21:06:12 GMT
Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-1755 123225668 -> 03c256fd4


ACCUMULO-1755: Removed threading from test when pre-creating mutations, removed unused imports


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/03c256fd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/03c256fd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/03c256fd

Branch: refs/heads/ACCUMULO-1755
Commit: 03c256fd44dd28e83ac05a32b30e73e5462d316e
Parents: 1232256
Author: Dave Marion <dlmarion@apache.org>
Authored: Tue Mar 1 16:05:12 2016 -0500
Committer: Dave Marion <dlmarion@apache.org>
Committed: Tue Mar 1 16:05:12 2016 -0500

----------------------------------------------------------------------
 .../client/impl/TabletServerBatchWriter.java    |  7 ++-
 .../test/functional/BatchWriterFlushIT.java     | 53 ++++----------------
 2 files changed, 14 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/03c256fd/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index 7fe7061..89561da 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -716,14 +716,13 @@ public class TabletServerBatchWriter {
       if (null == mutationsToSend)
         return;
       binningThreadPool.execute(new Runnable() {
-        final MutationSet m = mutationsToSend;
 
         @Override
         public void run() {
-          if (null != m) {
+          if (null != mutationsToSend) {
             try {
-              log.trace("{} - binning {} mutations", Thread.currentThread().getName(), m.size());
-              addMutations(m);
+              log.trace("{} - binning {} mutations", Thread.currentThread().getName(), mutationsToSend.size());
+              addMutations(mutationsToSend);
             } catch (Exception e) {
               updateUnknownErrors("Error processing mutation set", e);
             }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/03c256fd/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
index e479f69..17254bf 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@ -19,8 +19,8 @@ package org.apache.accumulo.test.functional;
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -29,7 +29,6 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -40,7 +39,6 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.impl.TabletServerBatchWriter;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -48,10 +46,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
-import org.apache.commons.lang3.builder.CompareToBuilder;
 import org.apache.hadoop.io.Text;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -190,48 +185,22 @@ public class BatchWriterFlushIT extends AccumuloClusterHarness {
     }
     c.instanceOperations().waitForBalance();
 
-    //Logger.getLogger(TabletServerBatchWriter.class).setLevel(Level.TRACE);
+    // Logger.getLogger(TabletServerBatchWriter.class).setLevel(Level.TRACE);
     final List<Set<Mutation>> allMuts = new LinkedList<>();
-    final ConcurrentSkipListSet<Mutation> data = new ConcurrentSkipListSet<>(new
Comparator<Mutation>() {
-      @Override
-      public int compare(Mutation o1, Mutation o2) {
-        CompareToBuilder compare = new CompareToBuilder();
-        compare.append(o1.getRow().toString(), o2.getRow().toString());
-        compare.append(o1.getUpdates().size(), o2.getUpdates().size());
-        if (o1.getUpdates().size() == o2.getUpdates().size()) {
-          for (int x = 0; x < o1.getUpdates().size(); x++) {
-            compare.append(o1.getUpdates().get(x).toString(), o2.getUpdates().get(x).toString());
-          }
-        }
-        return compare.toComparison();
-      }
-    });
-    SimpleThreadPool createThreads = new SimpleThreadPool(NUM_THREADS, "CreateThreads");
-    createThreads.allowCoreThreadTimeOut(false);
-    createThreads.prestartAllCoreThreads();
+    List<Mutation> data = new ArrayList<>();
     for (int i = 0; i < NUM_THREADS; i++) {
       final int thread = i;
-      createThreads.execute(new Runnable() {
-        @Override
-        public void run() {
-          for (int j = 0; j < NUM_TO_FLUSH; j++) {
-            int row = thread * NUM_TO_FLUSH + j;
-
-            Mutation m = new Mutation(new Text(String.format("%10d", row)));
-            m.put(new Text("cf" + thread), new Text("cq"), new Value(("" + row).getBytes()));
-            data.add(m);
-          }
-        }
-      });
+      for (int j = 0; j < NUM_TO_FLUSH; j++) {
+        int row = thread * NUM_TO_FLUSH + j;
+        Mutation m = new Mutation(new Text(String.format("%10d", row)));
+        m.put(new Text("cf" + thread), new Text("cq"), new Value(("" + row).getBytes()));
+        data.add(m);
+      }
     }
-    createThreads.shutdown();
-    createThreads.awaitTermination(3, TimeUnit.MINUTES);
     Assert.assertEquals(NUM_THREADS * NUM_TO_FLUSH, data.size());
-    List<Mutation> shuffled = new LinkedList<>(data);
-    Collections.shuffle(shuffled);
+    Collections.shuffle(data);
     for (int n = 0; n < (NUM_THREADS * NUM_TO_FLUSH); n += NUM_TO_FLUSH) {
-      System.out.println("subList: " + (n) + " to " + (n + NUM_TO_FLUSH));
-      Set<Mutation> muts = new HashSet<>(shuffled.subList(n, n + NUM_TO_FLUSH));
+      Set<Mutation> muts = new HashSet<>(data.subList(n, n + NUM_TO_FLUSH));
       allMuts.add(muts);
     }
 


Mime
View raw message