Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E3E3D1020E for ; Tue, 4 Mar 2014 14:17:45 +0000 (UTC) Received: (qmail 9367 invoked by uid 500); 4 Mar 2014 14:17:44 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 9330 invoked by uid 500); 4 Mar 2014 14:17:44 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 5188 invoked by uid 99); 4 Mar 2014 14:17:30 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2014 14:17:30 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A6CF793552E; Tue, 4 Mar 2014 14:17:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mdrob@apache.org To: commits@accumulo.apache.org Date: Tue, 04 Mar 2014 14:17:38 -0000 Message-Id: <55fea405620041aa85715fea975727ee@git.apache.org> In-Reply-To: <3c0ab1eb801d44dfb253ccfbbe808905@git.apache.org> References: <3c0ab1eb801d44dfb253ccfbbe808905@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/16] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bd283aec Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bd283aec Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bd283aec Branch: refs/heads/1.5.2-SNAPSHOT Commit: bd283aec0aa10d4c9deba8b5df097d48a424780b Parents: bee78fa 759582b Author: Mike Drob Authored: Tue Mar 4 09:15:16 2014 -0500 Committer: Mike Drob Committed: Tue Mar 4 09:15:16 2014 -0500 ---------------------------------------------------------------------- .../test/continuous/ContinuousBatchWalker.java | 2 +- .../test/continuous/ContinuousIngest.java | 9 ++-- .../test/continuous/ContinuousQuery.java | 2 +- .../test/continuous/ContinuousScanner.java | 2 +- .../test/continuous/ContinuousUtil.java | 49 ++++++++++++++++++++ .../test/continuous/ContinuousWalk.java | 6 +-- 6 files changed, 59 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java index d021164,0000000..3304d24 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java @@@ -1,182 -1,0 +1,182 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.continuous; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.cli.BatchScannerOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.io.Text; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.validators.PositiveInteger; + +public class ContinuousBatchWalker { + + static class Opts extends ContinuousWalk.Opts { + @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class) + long numToScan = 0; + } + + public static void main(String[] args) throws Exception { + + Opts opts = new Opts(); + ScannerOpts scanOpts = new ScannerOpts(); + BatchScannerOpts bsOpts = new BatchScannerOpts(); + opts.parseArgs(ContinuousBatchWalker.class.getName(), args, scanOpts, bsOpts); + + Random r = new Random(); + Authorizations auths = opts.randomAuths.getAuths(r); + + Connector conn = opts.getConnector(); - Scanner scanner = conn.createScanner(opts.getTableName(), auths); ++ Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), auths); + scanner.setBatchSize(scanOpts.scanBatchSize); + + BatchScanner bs = conn.createBatchScanner(opts.getTableName(), auths, bsOpts.scanThreads); + bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS); + + while (true) { + Set batch = getBatch(scanner, opts.min, opts.max, scanOpts.scanBatchSize, r); + List ranges = new ArrayList(batch.size()); + + for (Text row : batch) { + ranges.add(new Range(row)); + } + + runBatchScan(scanOpts.scanBatchSize, bs, batch, ranges); + + UtilWaitThread.sleep(opts.sleepTime); + } + + } + + /* + * private static void runSequentialScan(Scanner scanner, List ranges) { Set srowsSeen = new HashSet(); long st1 = + * System.currentTimeMillis(); int scount = 0; for (Range range : ranges) { scanner.setRange(range); + * + * for (Entry entry : scanner) { srowsSeen.add(entry.getKey().getRow()); scount++; } } + * + * + * long st2 = System.currentTimeMillis(); System.out.println("SRQ "+(st2 - st1)+" "+srowsSeen.size() +" "+scount); } + */ + + private static void runBatchScan(int batchSize, BatchScanner bs, Set batch, List ranges) { + bs.setRanges(ranges); + + Set rowsSeen = new HashSet(); + + int count = 0; + + long t1 = System.currentTimeMillis(); + + for (Entry entry : bs) { + ContinuousWalk.validate(entry.getKey(), entry.getValue()); + + rowsSeen.add(entry.getKey().getRow()); + + addRow(batchSize, entry.getValue()); + + count++; + } + + long t2 = System.currentTimeMillis(); + + if (!rowsSeen.equals(batch)) { + HashSet copy1 = new HashSet(rowsSeen); + HashSet copy2 = new HashSet(batch); + + copy1.removeAll(batch); + copy2.removeAll(rowsSeen); + + System.out.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size()); + System.err.printf("DIF %d %d %d%n", t1, copy1.size(), copy2.size()); + System.err.println("Extra seen : " + copy1); + System.err.println("Not seen : " + copy2); + } else { + System.out.printf("BRQ %d %d %d %d %d%n", t1, (t2 - t1), rowsSeen.size(), count, (int) (rowsSeen.size() / ((t2 - t1) / 1000.0))); + } + + } + + private static void addRow(int batchSize, Value v) { + byte[] val = v.get(); + + int offset = ContinuousWalk.getPrevRowOffset(val); + if (offset > 1) { + Text prevRow = new Text(); + prevRow.set(val, offset, 16); + if (rowsToQuery.size() < 3 * batchSize) { + rowsToQuery.add(prevRow); + } + } + } + + private static HashSet rowsToQuery = new HashSet(); + + private static Set getBatch(Scanner scanner, long min, long max, int batchSize, Random r) { + + while (rowsToQuery.size() < batchSize) { + byte[] scanStart = ContinuousIngest.genRow(min, max, r); + scanner.setRange(new Range(new Text(scanStart), null)); + + int count = 0; + + long t1 = System.currentTimeMillis(); + + Iterator> iter = scanner.iterator(); + while (iter.hasNext() && rowsToQuery.size() < 3 * batchSize) { + Entry entry = iter.next(); + ContinuousWalk.validate(entry.getKey(), entry.getValue()); + addRow(batchSize, entry.getValue()); + count++; + } + + long t2 = System.currentTimeMillis(); + + System.out.println("FSB " + t1 + " " + (t2 - t1) + " " + count); + + UtilWaitThread.sleep(100); + } + + HashSet ret = new HashSet(); + + Iterator iter = rowsToQuery.iterator(); + + for (int i = 0; i < batchSize; i++) { + ret.add(iter.next()); + iter.remove(); + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java index 23cf15d,0000000..e3f0485 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java @@@ -1,312 -1,0 +1,311 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.continuous; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; - import org.apache.accumulo.core.client.TableExistsException; ++import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.FastFormat; +import org.apache.accumulo.trace.instrument.CountSampler; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.FileAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.Parameter; + + +public class ContinuousIngest { + + static public class BaseOpts extends ClientOnDefaultTable { + public class DebugConverter implements IStringConverter { + @Override + public String convert(String debugLog) { + Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME); + logger.setLevel(Level.TRACE); + logger.setAdditivity(false); + try { + logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true)); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + return debugLog; + } + } + + @Parameter(names="--min", description="lowest random row number to use") + long min = 0; + + @Parameter(names="--max", description="maximum random row number to use") + long max = Long.MAX_VALUE; + + @Parameter(names="--debugLog", description="file to write debugging output", converter=DebugConverter.class) + String debugLog = null; + + BaseOpts() { super("ci"); } + } + + public static class ShortConverter implements IStringConverter { + @Override + public Short convert(String value) { + return Short.valueOf(value); + } + } + + static public class Opts extends BaseOpts { + @Parameter(names="--num", description="the number of entries to ingest") + long num = Long.MAX_VALUE; + + @Parameter(names="--maxColF", description="maximum column family value to use", converter=ShortConverter.class) + short maxColF = Short.MAX_VALUE; + + @Parameter(names="--maxColQ", description="maximum column qualifier value to use", converter=ShortConverter.class) + short maxColQ = Short.MAX_VALUE; + + @Parameter(names="--addCheckSum", description="turn on checksums") + boolean checksum = false; + + @Parameter(names="--visibilities", description="read the visibilities to ingest with from a file") + String visFile = null; + } + + private static final byte[] EMPTY_BYTES = new byte[0]; + + private static List visibilities; + + private static void initVisibilities(Opts opts) throws Exception { + if (opts.visFile == null) { + visibilities = Collections.singletonList(new ColumnVisibility()); + return; + } + + visibilities = new ArrayList(); + + FileSystem fs = FileSystem.get(new Configuration()); + BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), Constants.UTF8)); + + String line; + + while ((line = in.readLine()) != null) { + visibilities.add(new ColumnVisibility(line)); + } + + in.close(); + } + + private static ColumnVisibility getVisibility(Random rand) { + return visibilities.get(rand.nextInt(visibilities.size())); + } + + public static void main(String[] args) throws Exception { + + Opts opts = new Opts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + opts.parseArgs(ContinuousIngest.class.getName(), args, bwOpts); + + initVisibilities(opts); + + if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) { + throw new IllegalArgumentException("bad min and max"); + } + Connector conn = opts.getConnector(); + - if (!conn.tableOperations().exists(opts.getTableName())) - try { - conn.tableOperations().create(opts.getTableName()); - } catch (TableExistsException tee) {} ++ if (!conn.tableOperations().exists(opts.getTableName())) { ++ throw new TableNotFoundException(null, opts.getTableName(), "Consult the README and create the table before starting ingest."); ++ } + + BatchWriter bw = conn.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig()); + bw = Trace.wrapAll(bw, new CountSampler(1024)); + + Random r = new Random(); + + byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(Constants.UTF8); + + System.out.printf("UUID %d %s%n", System.currentTimeMillis(), new String(ingestInstanceId, Constants.UTF8)); + + long count = 0; + final int flushInterval = 1000000; + final int maxDepth = 25; + + // always want to point back to flushed data. This way the previous item should + // always exist in accumulo when verifying data. To do this make insert N point + // back to the row from insert (N - flushInterval). The array below is used to keep + // track of this. + long prevRows[] = new long[flushInterval]; + long firstRows[] = new long[flushInterval]; + int firstColFams[] = new int[flushInterval]; + int firstColQuals[] = new int[flushInterval]; + + long lastFlushTime = System.currentTimeMillis(); + + out: while (true) { + // generate first set of nodes + ColumnVisibility cv = getVisibility(r); + + for (int index = 0; index < flushInterval; index++) { + long rowLong = genLong(opts.min, opts.max, r); + prevRows[index] = rowLong; + firstRows[index] = rowLong; + + int cf = r.nextInt(opts.maxColF); + int cq = r.nextInt(opts.maxColQ); + + firstColFams[index] = cf; + firstColQuals[index] = cq; + + Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum); + count++; + bw.addMutation(m); + } + + lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); + if (count >= opts.num) + break out; + + // generate subsequent sets of nodes that link to previous set of nodes + for (int depth = 1; depth < maxDepth; depth++) { + for (int index = 0; index < flushInterval; index++) { + long rowLong = genLong(opts.min, opts.max, r); + byte[] prevRow = genRow(prevRows[index]); + prevRows[index] = rowLong; + Mutation m = genMutation(rowLong, r.nextInt(opts.maxColF), r.nextInt(opts.maxColQ), cv, ingestInstanceId, count, prevRow, r, opts.checksum); + count++; + bw.addMutation(m); + } + + lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); + if (count >= opts.num) + break out; + } + + // create one big linked list, this makes all of the first inserts + // point to something + for (int index = 0; index < flushInterval - 1; index++) { + Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), r, + opts.checksum); + count++; + bw.addMutation(m); + } + lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); + if (count >= opts.num) + break out; + } + + bw.close(); + opts.stopTracing(); + } + + private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException { + long t1 = System.currentTimeMillis(); + bw.flush(); + long t2 = System.currentTimeMillis(); + System.out.printf("FLUSH %d %d %d %d %d%n", t2, (t2 - lastFlushTime), (t2 - t1), count, flushInterval); + lastFlushTime = t2; + return lastFlushTime; + } + + public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r, + boolean checksum) { + // Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead + CRC32 cksum = null; + + byte[] rowString = genRow(rowLong); + + byte[] cfString = FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES); + byte[] cqString = FastFormat.toZeroPaddedString(cqInt, 4, 16, EMPTY_BYTES); + + if (checksum) { + cksum = new CRC32(); + cksum.update(rowString); + cksum.update(cfString); + cksum.update(cqString); + cksum.update(cv.getExpression()); + } + + Mutation m = new Mutation(new Text(rowString)); + + m.put(new Text(cfString), new Text(cqString), cv, createValue(ingestInstanceId, count, prevRow, cksum)); + return m; + } + + public static final long genLong(long min, long max, Random r) { + return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min; + } + + static final byte[] genRow(long min, long max, Random r) { + return genRow(genLong(min, max, r)); + } + + static final byte[] genRow(long rowLong) { + return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES); + } + + private static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) { + int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3; + if (cksum != null) + dataLen += 8; + byte val[] = new byte[dataLen]; + System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length); + int index = ingestInstanceId.length; + val[index++] = ':'; + int added = FastFormat.toZeroPaddedString(val, index, count, 16, 16, EMPTY_BYTES); + if (added != 16) + throw new RuntimeException(" " + added); + index += 16; + val[index++] = ':'; + if (prevRow != null) { + System.arraycopy(prevRow, 0, val, index, prevRow.length); + index += prevRow.length; + } + + val[index++] = ':'; + + if (cksum != null) { + cksum.update(val, 0, index); + cksum.getValue(); + FastFormat.toZeroPaddedString(val, index, cksum.getValue(), 8, 16, EMPTY_BYTES); + } + + // System.out.println("val "+new String(val)); + + return new Value(val); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java index 117c136,0000000..4bbc85f mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousQuery.java @@@ -1,71 -1,0 +1,71 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.continuous; + +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.test.continuous.ContinuousIngest.BaseOpts; +import org.apache.hadoop.io.Text; + +import com.beust.jcommander.Parameter; + +public class ContinuousQuery { + + public static class Opts extends BaseOpts { + @Parameter(names="--sleep", description="the time to wait between queries", converter=TimeConverter.class) + long sleepTime = 100; + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + ScannerOpts scanOpts = new ScannerOpts(); + opts.parseArgs(ContinuousQuery.class.getName(), args, scanOpts); + + Connector conn = opts.getConnector(); - Scanner scanner = conn.createScanner(opts.getTableName(), opts.auths); ++ Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), opts.auths); + scanner.setBatchSize(scanOpts.scanBatchSize); + + Random r = new Random(); + + while (true) { + byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r); + + int count = 0; + + long t1 = System.currentTimeMillis(); + scanner.setRange(new Range(new Text(row))); + for (Entry entry : scanner) { + ContinuousWalk.validate(entry.getKey(), entry.getValue()); + count++; + } + long t2 = System.currentTimeMillis(); + + System.out.printf("SRQ %d %s %d %d%n", t1, new String(row, Constants.UTF8), (t2 - t1), count); + + if (opts.sleepTime > 0) + Thread.sleep(opts.sleepTime); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java index c331bab,0000000..fcc8fec mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java @@@ -1,104 -1,0 +1,104 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.continuous; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.io.Text; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.validators.PositiveInteger; + +public class ContinuousScanner { + + static class Opts extends ContinuousWalk.Opts { + @Parameter(names="--numToScan", description="Number rows to scan between sleeps", required=true, validateWith=PositiveInteger.class) + long numToScan = 0; + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + ScannerOpts scanOpts = new ScannerOpts(); + opts.parseArgs(ContinuousScanner.class.getName(), args, scanOpts); + + Random r = new Random(); + + long distance = 1000000000000l; + + Connector conn = opts.getConnector(); + Authorizations auths = opts.randomAuths.getAuths(r); - Scanner scanner = conn.createScanner(opts.getTableName(), auths); ++ Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), auths); + scanner.setBatchSize(scanOpts.scanBatchSize); + + double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0)); + + while (true) { + long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r); + byte[] scanStart = ContinuousIngest.genRow(startRow); + byte[] scanStop = ContinuousIngest.genRow(startRow + distance); + + scanner.setRange(new Range(new Text(scanStart), new Text(scanStop))); + + int count = 0; + Iterator> iter = scanner.iterator(); + + long t1 = System.currentTimeMillis(); + + while (iter.hasNext()) { + Entry entry = iter.next(); + ContinuousWalk.validate(entry.getKey(), entry.getValue()); + count++; + } + + long t2 = System.currentTimeMillis(); + + // System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan); + + if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) { + if (count == 0) { + distance = distance * 10; + if (distance < 0) + distance = 1000000000000l; + } else { + double ratio = (double) opts.numToScan / count; + // move ratio closer to 1 to make change slower + ratio = ratio - (ratio - 1.0) * (2.0 / 3.0); + distance = (long) (ratio * distance); + } + + // System.out.println("P2 "+delta +" "+numToScan+" "+distance+" "+((double)numToScan/count )); + } + + System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, Constants.UTF8), (t2 - t1), count); + + if (opts.sleepTime > 0) + UtilWaitThread.sleep(opts.sleepTime); + } + + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java index 0000000,0000000..a8b2930 new file mode 100644 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousUtil.java @@@ -1,0 -1,0 +1,49 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.accumulo.test.continuous; ++ ++import org.apache.accumulo.core.client.Connector; ++import org.apache.accumulo.core.client.Scanner; ++import org.apache.accumulo.core.client.TableNotFoundException; ++import org.apache.accumulo.core.security.Authorizations; ++ ++/** ++ * Useful utility methods common to the Continuous test suite. ++ */ ++final class ContinuousUtil { ++ private ContinuousUtil() {} ++ ++ /** ++ * Attempt to create a table scanner, or fail if the table does not exist. ++ * ++ * @param connector ++ * A populated connector object ++ * @param table ++ * The table name to scan over ++ * @param auths ++ * The authorizations to use for the scanner ++ * @return a scanner for the requested table ++ * @throws TableNotFoundException ++ * If the table does not exist ++ */ ++ static Scanner createScanner(Connector connector, String table, Authorizations auths) throws TableNotFoundException { ++ if (!connector.tableOperations().exists(table)) { ++ throw new TableNotFoundException(null, table, "Consult the README and create the table before starting test processes."); ++ } ++ return connector.createScanner(table, auths); ++ } ++} http://git-wip-us.apache.org/repos/asf/accumulo/blob/bd283aec/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java index 4032dfa,0000000..34a5e9b mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java @@@ -1,237 -1,0 +1,237 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.continuous; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.Random; +import java.util.zip.CRC32; + - import org.apache.accumulo.trace.instrument.Span; - import org.apache.accumulo.trace.instrument.Trace; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; ++import org.apache.accumulo.trace.instrument.Span; ++import org.apache.accumulo.trace.instrument.Trace; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.Parameter; + + +public class ContinuousWalk { + + static public class Opts extends ContinuousQuery.Opts { + class RandomAuthsConverter implements IStringConverter { + @Override + public RandomAuths convert(String value) { + try { + return new RandomAuths(value); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + @Parameter(names="--authsFile", description="read the authorities to use from a file") + RandomAuths randomAuths = new RandomAuths(); + } + + static class BadChecksumException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public BadChecksumException(String msg) { + super(msg); + } + + } + + static class RandomAuths { + private List auths; + + RandomAuths() { + auths = Collections.singletonList(Constants.NO_AUTHS); + } + + RandomAuths(String file) throws IOException { + if (file == null) { + auths = Collections.singletonList(Constants.NO_AUTHS); + return; + } + + auths = new ArrayList(); + + FileSystem fs = FileSystem.get(new Configuration()); + BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file)), Constants.UTF8)); + try { + String line; + while ((line = in.readLine()) != null) { + auths.add(new Authorizations(line.split(","))); + } + } finally { + in.close(); + } + } + + Authorizations getAuths(Random r) { + return auths.get(r.nextInt(auths.size())); + } + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(ContinuousWalk.class.getName(), args); + + Connector conn = opts.getConnector(); + + Random r = new Random(); + + ArrayList values = new ArrayList(); + + while (true) { - Scanner scanner = conn.createScanner(opts.getTableName(), opts.randomAuths.getAuths(r)); ++ Scanner scanner = ContinuousUtil.createScanner(conn, opts.getTableName(), opts.randomAuths.getAuths(r)); + String row = findAStartRow(opts.min, opts.max, scanner, r); + + while (row != null) { + + values.clear(); + + long t1 = System.currentTimeMillis(); + Span span = Trace.on("walk"); + try { + scanner.setRange(new Range(new Text(row))); + for (Entry entry : scanner) { + validate(entry.getKey(), entry.getValue()); + values.add(entry.getValue()); + } + } finally { + span.stop(); + } + long t2 = System.currentTimeMillis(); + + System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size()); + + if (values.size() > 0) { + row = getPrevRow(values.get(r.nextInt(values.size()))); + } else { + System.out.printf("MIS %d %s%n", t1, row); + System.err.printf("MIS %d %s%n", t1, row); + row = null; + } + + if (opts.sleepTime > 0) + Thread.sleep(opts.sleepTime); + } + + if (opts.sleepTime > 0) + Thread.sleep(opts.sleepTime); + } + } + + private static String findAStartRow(long min, long max, Scanner scanner, Random r) { + + byte[] scanStart = ContinuousIngest.genRow(min, max, r); + scanner.setRange(new Range(new Text(scanStart), null)); + scanner.setBatchSize(100); + + int count = 0; + String pr = null; + + long t1 = System.currentTimeMillis(); + + for (Entry entry : scanner) { + validate(entry.getKey(), entry.getValue()); + pr = getPrevRow(entry.getValue()); + count++; + if (pr != null) + break; + } + + long t2 = System.currentTimeMillis(); + + System.out.printf("FSR %d %s %d %d%n", t1, new String(scanStart, Constants.UTF8), (t2 - t1), count); + + return pr; + } + + static int getPrevRowOffset(byte val[]) { + if (val.length == 0) + throw new IllegalArgumentException(); + if (val[53] != ':') + throw new IllegalArgumentException(new String(val, Constants.UTF8)); + + // prev row starts at 54 + if (val[54] != ':') { + if (val[54 + 16] != ':') + throw new IllegalArgumentException(new String(val, Constants.UTF8)); + return 54; + } + + return -1; + } + + static String getPrevRow(Value value) { + + byte[] val = value.get(); + int offset = getPrevRowOffset(val); + if (offset > 0) { + return new String(val, offset, 16, Constants.UTF8); + } + + return null; + } + + static int getChecksumOffset(byte val[]) { + if (val[val.length - 1] != ':') { + if (val[val.length - 9] != ':') + throw new IllegalArgumentException(new String(val, Constants.UTF8)); + return val.length - 8; + } + + return -1; + } + + static void validate(Key key, Value value) throws BadChecksumException { + int ckOff = getChecksumOffset(value.get()); + if (ckOff < 0) + return; + + long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, Constants.UTF8), 16); + + CRC32 cksum = new CRC32(); + + cksum.update(key.getRowData().toArray()); + cksum.update(key.getColumnFamilyData().toArray()); + cksum.update(key.getColumnQualifierData().toArray()); + cksum.update(key.getColumnVisibilityData().toArray()); + cksum.update(value.get(), 0, ckOff); + + if (cksum.getValue() != storedCksum) { + throw new BadChecksumException("Checksum invalid " + key + " " + value); + } + } +}