accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [08/10] git commit: added concurrency test for conditional writer
Date Wed, 17 Jul 2013 12:58:22 GMT
added concurrency test for conditional writer


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3a2fca32
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3a2fca32
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3a2fca32

Branch: refs/heads/ACCUMULO-1000
Commit: 3a2fca32aca55c2d023c2b61401f84e664349d9d
Parents: b08663e
Author: Keith Turner <kturner@apache.org>
Authored: Tue Jul 16 14:49:30 2013 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Tue Jul 16 14:49:30 2013 -0400

----------------------------------------------------------------------
 .../accumulo/test/ConditionalWriterTest.java    | 216 +++++++++++++++++++
 1 file changed, 216 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a2fca32/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
index 4bc7117..94f453f 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
@@ -28,6 +28,10 @@ import java.util.Random;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -35,10 +39,14 @@ import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.ConditionalWriter.Result;
 import org.apache.accumulo.core.client.ConditionalWriter.Status;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Condition;
 import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.data.Key;
@@ -722,6 +730,214 @@ public class ConditionalWriterTest {
     cw.close();
   }
 
+  private static class Stats {
+    
+    ByteSequence row = null;
+    int seq;
+    long sum;
+    int data[] = new int[10];
+    
+    public Stats(Iterator<Entry<Key,Value>> iterator) {
+      while (iterator.hasNext()) {
+        Entry<Key,Value> entry = iterator.next();
+        
+        if (row == null)
+          row = entry.getKey().getRowData();
+
+        String cf = entry.getKey().getColumnFamilyData().toString();
+        String cq = entry.getKey().getColumnQualifierData().toString();
+        
+        if (cf.equals("data")) {
+          data[Integer.parseInt(cq)] = Integer.parseInt(entry.getValue().toString());
+        } else if (cf.equals("meta")) {
+          if (cq.equals("sum")) {
+            sum = Long.parseLong(entry.getValue().toString());
+          } else if (cq.equals("seq")) {
+            seq = Integer.parseInt(entry.getValue().toString());
+          }
+        }
+      }
+      
+      long sum2 = 0;
+      
+      for (int datum : data) {
+        sum2 += datum;
+      }
+      
+      Assert.assertEquals(sum2, sum);
+    }
+    
+    public Stats(ByteSequence row) {
+      this.row = row;
+      for (int i = 0; i < data.length; i++) {
+        this.data[i] = 0;
+      }
+      this.seq = -1;
+      this.sum = 0;
+    }
+
+    void set(int index, int value) {
+      sum -= data[index];
+      sum += value;
+      data[index] = value;
+    }
+    
+    ConditionalMutation toMutation() {
+      Condition cond = new Condition("meta", "seq");
+      if (seq >= 0)
+        cond.setValue(seq + "");
+      
+      ConditionalMutation cm = new ConditionalMutation(row, cond);
+      
+      cm.put("meta", "seq", (seq + 1) + "");
+      cm.put("meta", "sum", (sum) + "");
+      
+      for (int i = 0; i < data.length; i++) {
+        cm.put("data", i + "", data[i] + "");
+      }
+      
+      return cm;
+    }
+
+    public String toString() {
+      return row + " " + seq + " " + sum;
+    }
+  }
+
+  private static class MutatorTask implements Runnable {
+    String table;
+    ArrayList<ByteSequence> rows;
+    ConditionalWriter cw;
+    Connector conn;
+    AtomicBoolean failed;
+    
+    public MutatorTask(String table, Connector conn, ArrayList<ByteSequence> rows,
ConditionalWriter cw, AtomicBoolean failed) {
+      this.table = table;
+      this.rows = rows;
+      this.conn = conn;
+      this.cw = cw;
+      this.failed = failed;
+    }
+
+    @Override
+    public void run() {
+      try {
+        Random rand = new Random();
+        
+        Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY));
+        
+        for (int i = 0; i < 100; i++) {
+          int numRows = rand.nextInt(10) + 1;
+          
+          ArrayList<ByteSequence> changes = new ArrayList<ByteSequence>(numRows);
+          ArrayList<ConditionalMutation> mutations = new ArrayList<ConditionalMutation>();
+          
+          for (int j = 0; j < numRows; j++)
+            changes.add(rows.get(rand.nextInt(rows.size())));
+          
+          for (ByteSequence row : changes) {
+            scanner.setRange(new Range(row.toString()));
+            Stats stats = new Stats(scanner.iterator());
+            stats.set(rand.nextInt(10), Math.abs(rand.nextInt()));
+            mutations.add(stats.toMutation());
+          }
+          
+          ArrayList<ByteSequence> changed = new ArrayList<ByteSequence>(numRows);
+          Iterator<Result> results = cw.write(mutations.iterator());
+          while (results.hasNext()) {
+            Result result = results.next();
+            changed.add(new ArrayByteSequence(result.getMutation().getRow()));
+          }
+          
+          Collections.sort(changes);
+          Collections.sort(changed);
+          
+          Assert.assertEquals(changes, changed);
+
+        }
+      
+      } catch (Exception e) {
+        e.printStackTrace();
+        failed.set(true);
+      }
+    }
+  }
+
+  @Test
+  public void testThreads() throws Exception {
+    // test multiple threads using a single conditional writer
+    
+    String table = "foo9";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table);
+    
+    Random rand = new Random();
+    
+    switch (rand.nextInt(3)) {
+      case 1:
+        conn.tableOperations().addSplits(table, nss("4"));
+        break;
+      case 2:
+        conn.tableOperations().addSplits(table, nss("3", "5"));
+        break;
+    }
+    
+    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    
+    ArrayList<ByteSequence> rows = new ArrayList<ByteSequence>();
+
+    
+    for (int i = 0; i < 1000; i++) {
+      rows.add(new ArrayByteSequence(FastFormat.toZeroPaddedString(Math.abs(rand.nextLong()),
16, 16, new byte[0])));
+    }
+    
+    ArrayList<ConditionalMutation> mutations = new ArrayList<ConditionalMutation>();
+    
+    for (ByteSequence row : rows)
+      mutations.add(new Stats(row).toMutation());
+    
+    ArrayList<ByteSequence> rows2 = new ArrayList<ByteSequence>();
+    Iterator<Result> results = cw.write(mutations.iterator());
+    while (results.hasNext()) {
+      Result result = results.next();
+      Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+      rows2.add(new ArrayByteSequence(result.getMutation().getRow()));
+    }
+    
+    Collections.sort(rows);
+    Collections.sort(rows2);
+    
+    Assert.assertEquals(rows, rows2);
+    
+    AtomicBoolean failed = new AtomicBoolean(false);
+
+    ExecutorService tp = Executors.newFixedThreadPool(20);
+    for (int i = 0; i < 20; i++) {
+      tp.submit(new MutatorTask(table, conn, rows, cw, failed));
+    }
+
+    tp.shutdown();
+
+    while (!tp.isTerminated()) {
+      tp.awaitTermination(1, TimeUnit.MINUTES);
+    }
+
+    Assert.assertFalse(failed.get());
+
+    Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
+    
+    RowIterator rowIter = new RowIterator(scanner);
+
+    while (rowIter.hasNext()) {
+      Iterator<Entry<Key,Value>> row = rowIter.next();
+      Stats stats = new Stats(row);
+      System.out.println(stats);
+    }
+  }
+
   private SortedSet<Text> nss(String... splits) {
     TreeSet<Text> ret = new TreeSet<Text>();
     for (String split : splits)


Mime
View raw message