From commits-return-22694-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Fri Mar 8 23:03:14 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 39B9A180626 for ; Sat, 9 Mar 2019 00:03:13 +0100 (CET) Received: (qmail 62775 invoked by uid 500); 8 Mar 2019 23:03:12 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 62766 invoked by uid 99); 8 Mar 2019 23:03:12 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Mar 2019 23:03:12 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id B3A968798C; Fri, 8 Mar 2019 23:03:11 +0000 (UTC) Date: Fri, 08 Mar 2019 23:03:11 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo-testing] branch master updated: Made CI bulk import configurable and scriptable (#64) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155208619166.12461.16643978190521654169@gitbox.apache.org> From: kturner@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo-testing X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 48d255d7be9c8ba4f10a212751128954f77f7b9b X-Git-Newrev: daf1b636567ef0f2c86232a76eaed623a2098d7b X-Git-Rev: daf1b636567ef0f2c86232a76eaed623a2098d7b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git The following commit(s) were added to refs/heads/master by this push: new daf1b63 Made CI bulk import configurable and scriptable (#64) daf1b63 is described below commit daf1b636567ef0f2c86232a76eaed623a2098d7b Author: Keith Turner AuthorDate: Fri Mar 8 18:03:07 2019 -0500 Made CI bulk import configurable and scriptable (#64) --- README.md | 1 + bin/cingest | 6 +- conf/accumulo-testing.properties.example | 11 ++ docs/bulk-test.md | 38 ++++ .../org/apache/accumulo/testing/TestProps.java | 5 +- .../accumulo/testing/continuous/BulkIngest.java | 213 ++++----------------- .../accumulo/testing/continuous/ContinuousEnv.java | 12 ++ .../testing/continuous/ContinuousIngest.java | 11 +- .../testing/continuous/ContinuousInputFormat.java | 195 +++++++++++++++++++ 9 files changed, 303 insertions(+), 189 deletions(-) diff --git a/README.md b/README.md index 463c52c..a2eea4c 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,7 @@ the referenced but undefined node. The MapReduce job produces two other counts: UNREFERENCED. It is expected that these two counts are non zero. REFERENCED counts nodes that are defined and referenced. UNREFERENCED counts nodes that defined and unreferenced, these are the latest nodes inserted. +* `bulk` - Runs a MapReduce job that generates data for bulk import. See [bulk-test.md](docs/bulk-test.md). * `moru` - Runs a MapReduce job that stresses Accumulo by reading and writing the continuous ingest table. This MapReduce job will write out an entry for every entry in the table (except for ones created by the MapReduce job itself). Stop ingest before running this MapReduce job. Do not run more diff --git a/bin/cingest b/bin/cingest index a004b3b..58ac51e 100755 --- a/bin/cingest +++ b/bin/cingest @@ -73,6 +73,10 @@ case "$1" in ci_main="${ci_package}.ContinuousMoru" ;; bulk) + if [ "$#" -ne 2 ]; then + echo "Usage : $0 $1 " + exit 1 + fi ci_main="${ci_package}.BulkIngest" ;; *) @@ -84,7 +88,7 @@ esac export CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH" case "$1" in - verify|moru) + verify|moru|bulk) if [ ! -z $HADOOP_HOME ]; then export HADOOP_USE_CLIENT_CLASSLOADER=true "$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$ci_main" "${@:2}" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS" diff --git a/conf/accumulo-testing.properties.example b/conf/accumulo-testing.properties.example index e60e1a7..502bcde 100644 --- a/conf/accumulo-testing.properties.example +++ b/conf/accumulo-testing.properties.example @@ -108,3 +108,14 @@ test.ci.verify.scan.offline=false test.ci.verify.auths= # Location in HDFS to store output. Must not exist. test.ci.verify.output.dir=/tmp/ci-verify + +# Bulk Ingest +# ----------- +# The number of map task to run. +test.ci.bulk.map.task=10 +# The number of nodes to generate per map task. +test.ci.bulk.map.nodes=1000000 +# The number of reducers will be the minimum of this prop and table splits+1. Each reducer will +# produce a bulk import file. +test.ci.bulk.reducers.max=1024 + diff --git a/docs/bulk-test.md b/docs/bulk-test.md new file mode 100644 index 0000000..3216882 --- /dev/null +++ b/docs/bulk-test.md @@ -0,0 +1,38 @@ +# Running a bulk ingest test + +Continous ingest supports bulk ingest in addition to live ingest. A map reduce +job that generates rfiles using the tables splits can be run. This can be run +in a loop like the following to continually bulk import data. + +```bash +# create the ci table if necessary +./bin/cingest createtable + +for i in $(seq 1 10); do + # run map reduce job to generate data for bulk import + ./bin/cingest bulk /tmp/bt/$i + # ask accumulo to import generated data + echo -e "table ci\nimportdirectory /tmp/bt/$i/files true" | accumulo shell -u root -p secret +done +./bin/cingest verify +``` + +Another way to use this in test is to generate a lot of data and then bulk import it all at once as follows. + +```bash +for i in $(seq 1 10); do + ./bin/cingest bulk /tmp/bt/$i +done + +( + echo "table ci" + for i in $(seq 1 10); do + echo "importdirectory /tmp/bt/$i/files true" + done +) | accumulo shell -u root -p secret +./bin/cingest verify +``` + +Bulk ingest could be run concurrently with live ingest into the same table. It +could also be run while the agitator is running. + diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java index 7d7808d..3f2ca15 100644 --- a/src/main/java/org/apache/accumulo/testing/TestProps.java +++ b/src/main/java/org/apache/accumulo/testing/TestProps.java @@ -118,8 +118,9 @@ public class TestProps { public static final String CI_VERIFY_OUTPUT_DIR = CI_VERIFY + "output.dir"; /** Bulk **/ - // Bulk ingest Job instance uuid - public static final String CI_BULK_UUID = CI_BULK + "uuid"; + public static final String CI_BULK_MAP_TASK = CI_BULK + "map.task"; + public static final String CI_BULK_MAP_NODES = CI_BULK + "map.nodes"; + public static final String CI_BULK_REDUCERS = CI_BULK + "reducers.max"; public static Properties loadFromFile(String propsFilePath) { try { diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java index 4b2e20b..bb3b3af 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java @@ -1,19 +1,27 @@ -package org.apache.accumulo.testing.continuous; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import static org.apache.accumulo.testing.TestProps.CI_BULK_UUID; -import static org.apache.accumulo.testing.continuous.ContinuousIngest.genCol; -import static org.apache.accumulo.testing.continuous.ContinuousIngest.genLong; +package org.apache.accumulo.testing.continuous; import java.io.BufferedOutputStream; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.io.PrintStream; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; import java.util.Collection; -import java.util.List; -import java.util.Random; import java.util.UUID; import org.apache.accumulo.core.client.AccumuloClient; @@ -22,19 +30,10 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat; import org.apache.accumulo.hadoop.mapreduce.partition.KeyRangePartitioner; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; @@ -45,8 +44,6 @@ import org.slf4j.LoggerFactory; * by running ContinuousVerify. */ public class BulkIngest extends Configured implements Tool { - public static final int NUM_KEYS = 1_000_000; - public static final String BULK_CI_DIR = "ci-bulk"; public static final Logger log = LoggerFactory.getLogger(BulkIngest.class); @@ -58,39 +55,41 @@ public class BulkIngest extends Configured implements Tool { job.getConfiguration().set("mapreduce.job.classloader", "true"); FileSystem fs = FileSystem.get(job.getConfiguration()); - final String JOB_DIR = BULK_CI_DIR + "/" + getCurrentJobNumber(fs); - final String RFILE_DIR = JOB_DIR + "/rfiles"; - log.info("Creating new job at {}", JOB_DIR); - String ingestInstanceId = UUID.randomUUID().toString(); - job.getConfiguration().set(CI_BULK_UUID, ingestInstanceId); log.info(String.format("UUID %d %s", System.currentTimeMillis(), ingestInstanceId)); - Path outputDir = new Path(RFILE_DIR); - - job.setInputFormatClass(RandomInputFormat.class); + job.setInputFormatClass(ContinuousInputFormat.class); // map the generated random longs to key values - job.setMapperClass(RandomMapper.class); job.setMapOutputKeyClass(Key.class); job.setMapOutputValueClass(Value.class); - // output RFiles for the import - job.setOutputFormatClass(AccumuloFileOutputFormat.class); - AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job); + String bulkDir = args[0]; + + // remove bulk dir from args + args = Arrays.asList(args).subList(1, 3).toArray(new String[2]); try (ContinuousEnv env = new ContinuousEnv(args)) { + fs.mkdirs(new Path(bulkDir)); + + // output RFiles for the import + job.setOutputFormatClass(AccumuloFileOutputFormat.class); + AccumuloFileOutputFormat.configure().outputPath(new Path(bulkDir + "/files")).store(job); + + ContinuousInputFormat.configure(job.getConfiguration(), ingestInstanceId, env); + String tableName = env.getAccumuloTableName(); // create splits file for KeyRangePartitioner - String splitsFile = JOB_DIR + "/splits.txt"; + String splitsFile = bulkDir + "/splits.txt"; try (AccumuloClient client = env.getAccumuloClient()) { // make sure splits file is closed before continuing try (PrintStream out = new PrintStream( new BufferedOutputStream(fs.create(new Path(splitsFile))))) { - Collection splits = client.tableOperations().listSplits(tableName, 100); + Collection splits = client.tableOperations().listSplits(tableName, + env.getBulkReducers() - 1); for (Text split : splits) { out.println(Base64.getEncoder().encodeToString(split.copyBytes())); } @@ -103,157 +102,13 @@ public class BulkIngest extends Configured implements Tool { job.waitForCompletion(true); boolean success = job.isSuccessful(); - // bulk import completed files - if (success) { - log.info("Sort and create job successful. Bulk importing {} to {}", RFILE_DIR, tableName); - client.tableOperations().importDirectory(RFILE_DIR).to(tableName).load(); - } else { - log.error("Job failed, not calling bulk import"); - } return success ? 0 : 1; } } } - private int getCurrentJobNumber(FileSystem fs) throws Exception { - Path jobPath = new Path(BULK_CI_DIR); - FileStatus jobDir = fs.getFileStatus(jobPath); - if (jobDir.isDirectory()) { - FileStatus[] jobs = fs.listStatus(jobPath); - return jobs.length; - } else { - log.info("{} directory doesn't exist yet, first job running will create it.", BULK_CI_DIR); - return 0; - } - } - public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new BulkIngest(), args); System.exit(ret); } - - /** - * Mapper that takes the longs from RandomInputFormat and output Key Value pairs - */ - public static class RandomMapper extends Mapper { - - private String uuid; - private Text currentRow; - private Text currentValue; - private Text emptyCfCq; - - @Override - protected void setup(Context context) { - uuid = context.getConfiguration().get(CI_BULK_UUID); - currentRow = new Text(); - currentValue = new Text(); - emptyCfCq = new Text(genCol(0)); - } - - @Override - protected void map(LongWritable key, LongWritable value, Context context) - throws IOException, InterruptedException { - currentRow.set(ContinuousIngest.genRow(key.get())); - - // hack since we can't pass null - don't set first val (prevRow), we want it to be null - long longVal = value.get(); - if (longVal != 1L) { - currentValue.set(ContinuousIngest.genRow(longVal)); - } - - Key outputKey = new Key(currentRow, emptyCfCq, emptyCfCq); - Value outputValue = ContinuousIngest.createValue(uuid.getBytes(), 0, currentValue.copyBytes(), - null); - - context.write(outputKey, outputValue); - } - } - - /** - * Generates a million LongWritable keys. The LongWritable value points to the previous key. The - * first key value pair has a value of 1L. This is translated to null in RandomMapper - */ - public static class RandomInputFormat extends InputFormat { - - public static class RandomSplit extends InputSplit implements Writable { - @Override - public void write(DataOutput dataOutput) {} - - @Override - public void readFields(DataInput dataInput) {} - - @Override - public long getLength() { - return 0; - } - - @Override - public String[] getLocations() { - return new String[0]; - } - } - - @Override - public List getSplits(JobContext jobContext) { - List splits = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - splits.add(new RandomSplit()); - } - return splits; - } - - @Override - public RecordReader createRecordReader(InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) { - return new RecordReader() { - int number; - int currentNumber; - LongWritable currentKey; - LongWritable prevRow; - private Random random; - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext job) { - // number = Integer.parseInt(job.getConfiguration().get(NUM_KEYS)); - number = NUM_KEYS; - currentKey = new LongWritable(1); - prevRow = new LongWritable(1); - random = new Random(); - currentNumber = 0; - } - - @Override - public boolean nextKeyValue() { - if (currentNumber < number) { - prevRow.set(currentKey.get()); - currentKey.set(genLong(0, Long.MAX_VALUE, random)); - currentNumber++; - return true; - } else { - return false; - } - } - - @Override - public LongWritable getCurrentKey() { - return currentKey; - } - - @Override - public LongWritable getCurrentValue() { - return prevRow; - } - - @Override - public float getProgress() { - return currentNumber * 1.0f / number; - } - - @Override - public void close() throws IOException { - - } - }; - } - } } diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java index 51ae3d5..0912475 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java @@ -59,6 +59,18 @@ public class ContinuousEnv extends TestEnv { return Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_MAX_CQ)); } + public int getBulkMapTask() { + return Integer.parseInt(testProps.getProperty(TestProps.CI_BULK_MAP_TASK)); + } + + public long getBulkMapNodes() { + return Long.parseLong(testProps.getProperty(TestProps.CI_BULK_MAP_NODES)); + } + + public int getBulkReducers() { + return Integer.parseInt(testProps.getProperty(TestProps.CI_BULK_REDUCERS)); + } + public String getAccumuloTableName() { return testProps.getProperty(TestProps.CI_COMMON_ACCUMULO_TABLE); } diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index 0499b09..85ce6c3 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -33,13 +33,11 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.trace.Trace; import org.apache.accumulo.core.trace.TraceSamplers; import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.testing.TestProps; -import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -260,10 +258,9 @@ public class ContinuousIngest { cksum.update(cv.getExpression()); } - Mutation m = new Mutation(new Text(rowString)); + Mutation m = new Mutation(rowString); - m.put(new Text(cfString), new Text(cqString), cv, - createValue(ingestInstanceId, count, prevRow, cksum)); + m.put(cfString, cqString, cv, createValue(ingestInstanceId, count, prevRow, cksum)); return m; } @@ -283,7 +280,7 @@ public class ContinuousIngest { return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES); } - public static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, + public static byte[] createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) { int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3; if (cksum != null) @@ -312,6 +309,6 @@ public class ContinuousIngest { // System.out.println("val "+new String(val)); - return new Value(val); + return val; } } diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java new file mode 100644 index 0000000..218e1c5 --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.testing.continuous; + +import static org.apache.accumulo.testing.continuous.ContinuousIngest.genCol; +import static org.apache.accumulo.testing.continuous.ContinuousIngest.genLong; +import static org.apache.accumulo.testing.continuous.ContinuousIngest.genRow; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.testing.TestProps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Generates a continuous ingest linked list per map reduce split. Each linked list is of + * configurable length. + */ +public class ContinuousInputFormat extends InputFormat { + + private static final String PROP_UUID = "mrbulk.uuid"; + private static final String PROP_MAP_TASK = "mrbulk.map.task"; + private static final String PROP_MAP_NODES = "mrbulk.map.nodes"; + private static final String PROP_ROW_MIN = "mrbulk.row.min"; + private static final String PROP_ROW_MAX = "mrbulk.row.max"; + private static final String PROP_FAM_MAX = "mrbulk.fam.max"; + private static final String PROP_QUAL_MAX = "mrbulk.qual.max"; + private static final String PROP_CHECKSUM = "mrbulk.checksum"; + + private static class RandomSplit extends InputSplit implements Writable { + @Override + public void write(DataOutput dataOutput) {} + + @Override + public void readFields(DataInput dataInput) {} + + @Override + public long getLength() { + return 0; + } + + @Override + public String[] getLocations() { + return new String[0]; + } + } + + @Override + public List getSplits(JobContext jobContext) { + int numTask = jobContext.getConfiguration().getInt(PROP_MAP_TASK, 1); + List splits = new ArrayList<>(); + for (int i = 0; i < numTask; i++) { + splits.add(new RandomSplit()); + } + return splits; + } + + public static void configure(Configuration conf, String uuid, ContinuousEnv env) { + conf.set(PROP_UUID, uuid); + conf.setInt(PROP_MAP_TASK, env.getBulkMapTask()); + conf.setLong(PROP_MAP_NODES, env.getBulkMapNodes()); + conf.setLong(PROP_ROW_MIN, env.getRowMin()); + conf.setLong(PROP_ROW_MAX, env.getRowMax()); + conf.setInt(PROP_FAM_MAX, env.getMaxColF()); + conf.setInt(PROP_QUAL_MAX, env.getMaxColQ()); + conf.setBoolean(PROP_CHECKSUM, + Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM))); + } + + @Override + public RecordReader createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) { + return new RecordReader() { + long numNodes; + long nodeCount; + private Random random; + + private byte[] uuid; + + long minRow; + long maxRow; + int maxFam; + int maxQual; + boolean checksum; + + Key prevKey; + Key currKey; + Value currValue; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext job) { + numNodes = job.getConfiguration().getLong(PROP_MAP_NODES, 1000000); + uuid = job.getConfiguration().get(PROP_UUID).getBytes(StandardCharsets.UTF_8); + + minRow = job.getConfiguration().getLong(PROP_ROW_MIN, 0); + maxRow = job.getConfiguration().getLong(PROP_ROW_MAX, Long.MAX_VALUE); + maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE); + maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, Short.MAX_VALUE); + checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false); + + random = new Random(); + nodeCount = 0; + } + + private Key genKey(CRC32 cksum) { + + byte[] row = genRow(genLong(minRow, maxRow, random)); + + byte[] fam = genCol(random.nextInt(maxFam)); + byte[] qual = genCol(random.nextInt(maxQual)); + + if (cksum != null) { + cksum.update(row); + cksum.update(fam); + cksum.update(qual); + cksum.update(new byte[0]); // TODO col vis + } + + return new Key(row, fam, qual); + } + + private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, Checksum cksum) { + return ContinuousIngest.createValue(ingestInstanceId, nodeCount, prevRow, cksum); + } + + @Override + public boolean nextKeyValue() { + + if (nodeCount < numNodes) { + CRC32 cksum = checksum ? new CRC32() : null; + byte[] prevRow = prevKey != null ? prevKey.getRowData().toArray() : null; + + prevKey = currKey; + currKey = genKey(cksum); + currValue = new Value(createValue(uuid, prevRow, cksum)); + + nodeCount++; + return true; + } else { + return false; + } + } + + @Override + public Key getCurrentKey() { + return currKey; + } + + @Override + public Value getCurrentValue() { + return currValue; + } + + @Override + public float getProgress() { + return nodeCount * 1.0f / numNodes; + } + + @Override + public void close() throws IOException { + + } + }; + } +}