Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3EA22CC5A for ; Wed, 17 Jul 2013 12:58:26 +0000 (UTC) Received: (qmail 70304 invoked by uid 500); 17 Jul 2013 12:58:24 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 70077 invoked by uid 500); 17 Jul 2013 12:58:20 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 69771 invoked by uid 99); 17 Jul 2013 12:58:15 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Jul 2013 12:58:15 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4ED438ABC8E; Wed, 17 Jul 2013 12:58:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Wed, 17 Jul 2013 12:58:22 -0000 Message-Id: <7a1b235893fd4ee2b9033a50207b1787@git.apache.org> In-Reply-To: <686a29cba05b4479ae57caedf5b65102@git.apache.org> References: <686a29cba05b4479ae57caedf5b65102@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/10] git commit: added concurrency test for conditional writer added concurrency test for conditional writer Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3a2fca32 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3a2fca32 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3a2fca32 Branch: refs/heads/ACCUMULO-1000 Commit: 3a2fca32aca55c2d023c2b61401f84e664349d9d Parents: b08663e Author: Keith Turner Authored: Tue Jul 16 14:49:30 2013 -0400 Committer: Keith Turner Committed: Tue Jul 16 14:49:30 2013 -0400 ---------------------------------------------------------------------- .../accumulo/test/ConditionalWriterTest.java | 216 +++++++++++++++++++ 1 file changed, 216 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a2fca32/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java index 4bc7117..94f453f 100644 --- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java +++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java @@ -28,6 +28,10 @@ import java.util.Random; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; @@ -35,10 +39,14 @@ import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriter.Result; import org.apache.accumulo.core.client.ConditionalWriter.Status; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Condition; import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.data.Key; @@ -722,6 +730,214 @@ public class ConditionalWriterTest { cw.close(); } + private static class Stats { + + ByteSequence row = null; + int seq; + long sum; + int data[] = new int[10]; + + public Stats(Iterator> iterator) { + while (iterator.hasNext()) { + Entry entry = iterator.next(); + + if (row == null) + row = entry.getKey().getRowData(); + + String cf = entry.getKey().getColumnFamilyData().toString(); + String cq = entry.getKey().getColumnQualifierData().toString(); + + if (cf.equals("data")) { + data[Integer.parseInt(cq)] = Integer.parseInt(entry.getValue().toString()); + } else if (cf.equals("meta")) { + if (cq.equals("sum")) { + sum = Long.parseLong(entry.getValue().toString()); + } else if (cq.equals("seq")) { + seq = Integer.parseInt(entry.getValue().toString()); + } + } + } + + long sum2 = 0; + + for (int datum : data) { + sum2 += datum; + } + + Assert.assertEquals(sum2, sum); + } + + public Stats(ByteSequence row) { + this.row = row; + for (int i = 0; i < data.length; i++) { + this.data[i] = 0; + } + this.seq = -1; + this.sum = 0; + } + + void set(int index, int value) { + sum -= data[index]; + sum += value; + data[index] = value; + } + + ConditionalMutation toMutation() { + Condition cond = new Condition("meta", "seq"); + if (seq >= 0) + cond.setValue(seq + ""); + + ConditionalMutation cm = new ConditionalMutation(row, cond); + + cm.put("meta", "seq", (seq + 1) + ""); + cm.put("meta", "sum", (sum) + ""); + + for (int i = 0; i < data.length; i++) { + cm.put("data", i + "", data[i] + ""); + } + + return cm; + } + + public String toString() { + return row + " " + seq + " " + sum; + } + } + + private static class MutatorTask implements Runnable { + String table; + ArrayList rows; + ConditionalWriter cw; + Connector conn; + AtomicBoolean failed; + + public MutatorTask(String table, Connector conn, ArrayList rows, ConditionalWriter cw, AtomicBoolean failed) { + this.table = table; + this.rows = rows; + this.conn = conn; + this.cw = cw; + this.failed = failed; + } + + @Override + public void run() { + try { + Random rand = new Random(); + + Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY)); + + for (int i = 0; i < 100; i++) { + int numRows = rand.nextInt(10) + 1; + + ArrayList changes = new ArrayList(numRows); + ArrayList mutations = new ArrayList(); + + for (int j = 0; j < numRows; j++) + changes.add(rows.get(rand.nextInt(rows.size()))); + + for (ByteSequence row : changes) { + scanner.setRange(new Range(row.toString())); + Stats stats = new Stats(scanner.iterator()); + stats.set(rand.nextInt(10), Math.abs(rand.nextInt())); + mutations.add(stats.toMutation()); + } + + ArrayList changed = new ArrayList(numRows); + Iterator results = cw.write(mutations.iterator()); + while (results.hasNext()) { + Result result = results.next(); + changed.add(new ArrayByteSequence(result.getMutation().getRow())); + } + + Collections.sort(changes); + Collections.sort(changed); + + Assert.assertEquals(changes, changed); + + } + + } catch (Exception e) { + e.printStackTrace(); + failed.set(true); + } + } + } + + @Test + public void testThreads() throws Exception { + // test multiple threads using a single conditional writer + + String table = "foo9"; + + ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()); + Connector conn = zki.getConnector("root", new PasswordToken(secret)); + + conn.tableOperations().create(table); + + Random rand = new Random(); + + switch (rand.nextInt(3)) { + case 1: + conn.tableOperations().addSplits(table, nss("4")); + break; + case 2: + conn.tableOperations().addSplits(table, nss("3", "5")); + break; + } + + ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY); + + ArrayList rows = new ArrayList(); + + + for (int i = 0; i < 1000; i++) { + rows.add(new ArrayByteSequence(FastFormat.toZeroPaddedString(Math.abs(rand.nextLong()), 16, 16, new byte[0]))); + } + + ArrayList mutations = new ArrayList(); + + for (ByteSequence row : rows) + mutations.add(new Stats(row).toMutation()); + + ArrayList rows2 = new ArrayList(); + Iterator results = cw.write(mutations.iterator()); + while (results.hasNext()) { + Result result = results.next(); + Assert.assertEquals(Status.ACCEPTED, result.getStatus()); + rows2.add(new ArrayByteSequence(result.getMutation().getRow())); + } + + Collections.sort(rows); + Collections.sort(rows2); + + Assert.assertEquals(rows, rows2); + + AtomicBoolean failed = new AtomicBoolean(false); + + ExecutorService tp = Executors.newFixedThreadPool(20); + for (int i = 0; i < 20; i++) { + tp.submit(new MutatorTask(table, conn, rows, cw, failed)); + } + + tp.shutdown(); + + while (!tp.isTerminated()) { + tp.awaitTermination(1, TimeUnit.MINUTES); + } + + Assert.assertFalse(failed.get()); + + Scanner scanner = conn.createScanner(table, Authorizations.EMPTY); + + RowIterator rowIter = new RowIterator(scanner); + + while (rowIter.hasNext()) { + Iterator> row = rowIter.next(); + Stats stats = new Stats(row); + System.out.println(stats); + } + } + private SortedSet nss(String... splits) { TreeSet ret = new TreeSet(); for (String split : splits)