accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From md...@apache.org
Subject [15/19] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT
Date Fri, 28 Mar 2014 21:26:19 GMT
Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT

Conflicts:
	src/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
	src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
	src/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
	src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0d2cd1c0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0d2cd1c0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0d2cd1c0

Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 0d2cd1c06cef923aa4026d9bee3df1966ee50d9c
Parents: 2cca3ee f0759dc
Author: Mike Drob <mdrob@cloudera.com>
Authored: Fri Mar 28 17:23:48 2014 -0400
Committer: Mike Drob <mdrob@cloudera.com>
Committed: Fri Mar 28 17:23:48 2014 -0400

----------------------------------------------------------------------
 .../simple/mapreduce/TeraSortIngest.java        |  4 +-
 .../accumulo/server/util/CountRowKeys.java      |  3 +-
 .../server/util/reflection/CounterUtils.java    | 43 ++++++++++++++++++++
 .../test/continuous/ContinuousMoru.java         |  3 +-
 .../test/continuous/ContinuousVerify.java       | 29 +++----------
 .../accumulo/test/functional/RunTests.java      |  3 +-
 6 files changed, 56 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
----------------------------------------------------------------------
diff --cc examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
index f06aeec,0000000..f131e6c
mode 100644,000000..100644
--- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
+++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
@@@ -1,404 -1,0 +1,404 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements. See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership. The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License. You may obtain a copy of the License at
 + * 
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * 
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.accumulo.examples.simple.mapreduce;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Random;
 +
 +import org.apache.accumulo.core.cli.ClientOnRequiredTable;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.io.LongWritable;
 +import org.apache.hadoop.io.NullWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.io.WritableUtils;
 +import org.apache.hadoop.mapreduce.InputFormat;
 +import org.apache.hadoop.mapreduce.InputSplit;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.JobContext;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.mapreduce.RecordReader;
 +import org.apache.hadoop.mapreduce.TaskAttemptContext;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +
 +import com.beust.jcommander.Parameter;
 +
 +/**
 + * Generate the *almost* official terasort input data set. (See below) The user specifies the number of rows and the output directory and this class runs a
 + * map/reduce program to generate the data. The format of the data is:
 + * <ul>
 + * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n
 + * <li>The keys are random characters from the set ' ' .. '~'.
 + * <li>The rowid is the right justified row id as a int.
 + * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'.
 + * </ul>
 + * 
 + * This TeraSort is slightly modified to allow for variable length key sizes and value sizes. The row length isn't variable. To generate a terabyte of data in
 + * the same way TeraSort does use 10000000000 rows and 10/10 byte key length and 78/78 byte value length. Along with the 10 byte row id and \r\n this gives you
 + * 100 byte row * 10000000000 rows = 1tb. Min/Max ranges for key and value parameters are inclusive/inclusive respectively.
 + * 
 + * 
 + */
 +public class TeraSortIngest extends Configured implements Tool {
 +  /**
 +   * An input format that assigns ranges of longs to each mapper.
 +   */
 +  static class RangeInputFormat extends InputFormat<LongWritable,NullWritable> {
 +    /**
 +     * An input split consisting of a range on numbers.
 +     */
 +    static class RangeInputSplit extends InputSplit implements Writable {
 +      long firstRow;
 +      long rowCount;
 +      
 +      public RangeInputSplit() {}
 +      
 +      public RangeInputSplit(long offset, long length) {
 +        firstRow = offset;
 +        rowCount = length;
 +      }
 +      
 +      @Override
 +      public long getLength() throws IOException {
 +        return 0;
 +      }
 +      
 +      @Override
 +      public String[] getLocations() throws IOException {
 +        return new String[] {};
 +      }
 +      
 +      @Override
 +      public void readFields(DataInput in) throws IOException {
 +        firstRow = WritableUtils.readVLong(in);
 +        rowCount = WritableUtils.readVLong(in);
 +      }
 +      
 +      @Override
 +      public void write(DataOutput out) throws IOException {
 +        WritableUtils.writeVLong(out, firstRow);
 +        WritableUtils.writeVLong(out, rowCount);
 +      }
 +    }
 +    
 +    /**
 +     * A record reader that will generate a range of numbers.
 +     */
 +    static class RangeRecordReader extends RecordReader<LongWritable,NullWritable> {
 +      long startRow;
 +      long finishedRows;
 +      long totalRows;
 +      
 +      LongWritable currentKey;
 +      
 +      public RangeRecordReader(RangeInputSplit split) {
 +        startRow = split.firstRow;
 +        finishedRows = 0;
 +        totalRows = split.rowCount;
 +      }
 +      
 +      @Override
 +      public void close() throws IOException {}
 +      
 +      @Override
 +      public float getProgress() throws IOException {
 +        return finishedRows / (float) totalRows;
 +      }
 +      
 +      @Override
 +      public LongWritable getCurrentKey() throws IOException, InterruptedException {
 +        return new LongWritable(startRow + finishedRows);
 +      }
 +      
 +      @Override
 +      public NullWritable getCurrentValue() throws IOException, InterruptedException {
 +        return NullWritable.get();
 +      }
 +      
 +      @Override
 +      public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {}
 +      
 +      @Override
 +      public boolean nextKeyValue() throws IOException, InterruptedException {
 +        if (finishedRows < totalRows) {
 +          ++finishedRows;
 +          return true;
 +        }
 +        return false;
 +      }
 +    }
 +    
 +    @Override
 +    public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
 +      // reporter.setStatus("Creating record reader");
 +      return new RangeRecordReader((RangeInputSplit) split);
 +    }
 +    
 +    /**
 +     * Create the desired number of splits, dividing the number of rows between the mappers.
 +     */
 +    @Override
 +    public List<InputSplit> getSplits(JobContext job) {
-       long totalRows = job.getConfiguration().getLong(NUMROWS, 0);
-       int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1);
++      long totalRows = InputFormatBase.getConfiguration(job).getLong(NUMROWS, 0);
++      int numSplits = InputFormatBase.getConfiguration(job).getInt(NUMSPLITS, 1);
 +      long rowsPerSplit = totalRows / numSplits;
 +      System.out.println("Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit);
 +      ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
 +      long currentRow = 0;
 +      for (int split = 0; split < numSplits - 1; ++split) {
 +        splits.add(new RangeInputSplit(currentRow, rowsPerSplit));
 +        currentRow += rowsPerSplit;
 +      }
 +      splits.add(new RangeInputSplit(currentRow, totalRows - currentRow));
 +      System.out.println("Done Generating.");
 +      return splits;
 +    }
 +    
 +  }
 +  
 +  private static String NUMSPLITS = "terasort.overridesplits";
 +  private static String NUMROWS = "terasort.numrows";
 +  
 +  static class RandomGenerator {
 +    private long seed = 0;
 +    private static final long mask32 = (1l << 32) - 1;
 +    /**
 +     * The number of iterations separating the precomputed seeds.
 +     */
 +    private static final int seedSkip = 128 * 1024 * 1024;
 +    /**
 +     * The precomputed seed values after every seedSkip iterations. There should be enough values so that a 2**32 iterations are covered.
 +     */
 +    private static final long[] seeds = new long[] {0L, 4160749568L, 4026531840L, 3892314112L, 3758096384L, 3623878656L, 3489660928L, 3355443200L, 3221225472L,
 +        3087007744L, 2952790016L, 2818572288L, 2684354560L, 2550136832L, 2415919104L, 2281701376L, 2147483648L, 2013265920L, 1879048192L, 1744830464L,
 +        1610612736L, 1476395008L, 1342177280L, 1207959552L, 1073741824L, 939524096L, 805306368L, 671088640L, 536870912L, 402653184L, 268435456L, 134217728L,};
 +    
 +    /**
 +     * Start the random number generator on the given iteration.
 +     * 
 +     * @param initalIteration
 +     *          the iteration number to start on
 +     */
 +    RandomGenerator(long initalIteration) {
 +      int baseIndex = (int) ((initalIteration & mask32) / seedSkip);
 +      seed = seeds[baseIndex];
 +      for (int i = 0; i < initalIteration % seedSkip; ++i) {
 +        next();
 +      }
 +    }
 +    
 +    RandomGenerator() {
 +      this(0);
 +    }
 +    
 +    long next() {
 +      seed = (seed * 3141592621l + 663896637) & mask32;
 +      return seed;
 +    }
 +  }
 +  
 +  /**
 +   * The Mapper class that given a row number, will generate the appropriate output line.
 +   */
 +  public static class SortGenMapper extends Mapper<LongWritable,NullWritable,Text,Mutation> {
 +    private Text table = null;
 +    private int minkeylength = 0;
 +    private int maxkeylength = 0;
 +    private int minvaluelength = 0;
 +    private int maxvaluelength = 0;
 +    
 +    private Text key = new Text();
 +    private Text value = new Text();
 +    private RandomGenerator rand;
 +    private byte[] keyBytes; // = new byte[12];
 +    private byte[] spaces = "          ".getBytes();
 +    private byte[][] filler = new byte[26][];
 +    {
 +      for (int i = 0; i < 26; ++i) {
 +        filler[i] = new byte[10];
 +        for (int j = 0; j < 10; ++j) {
 +          filler[i][j] = (byte) ('A' + i);
 +        }
 +      }
 +    }
 +    
 +    /**
 +     * Add a random key to the text
 +     */
 +    private Random random = new Random();
 +    
 +    private void addKey() {
 +      int range = random.nextInt(maxkeylength - minkeylength + 1);
 +      int keylen = range + minkeylength;
 +      int keyceil = keylen + (4 - (keylen % 4));
 +      keyBytes = new byte[keyceil];
 +      
 +      long temp = 0;
 +      for (int i = 0; i < keyceil / 4; i++) {
 +        temp = rand.next() / 52;
 +        keyBytes[3 + 4 * i] = (byte) (' ' + (temp % 95));
 +        temp /= 95;
 +        keyBytes[2 + 4 * i] = (byte) (' ' + (temp % 95));
 +        temp /= 95;
 +        keyBytes[1 + 4 * i] = (byte) (' ' + (temp % 95));
 +        temp /= 95;
 +        keyBytes[4 * i] = (byte) (' ' + (temp % 95));
 +      }
 +      key.set(keyBytes, 0, keylen);
 +    }
 +    
 +    /**
 +     * Add the rowid to the row.
 +     * 
 +     * @param rowId
 +     */
 +    private Text getRowIdString(long rowId) {
 +      Text paddedRowIdString = new Text();
 +      byte[] rowid = Integer.toString((int) rowId).getBytes();
 +      int padSpace = 10 - rowid.length;
 +      if (padSpace > 0) {
 +        paddedRowIdString.append(spaces, 0, 10 - rowid.length);
 +      }
 +      paddedRowIdString.append(rowid, 0, Math.min(rowid.length, 10));
 +      return paddedRowIdString;
 +    }
 +    
 +    /**
 +     * Add the required filler bytes. Each row consists of 7 blocks of 10 characters and 1 block of 8 characters.
 +     * 
 +     * @param rowId
 +     *          the current row number
 +     */
 +    private void addFiller(long rowId) {
 +      int base = (int) ((rowId * 8) % 26);
 +      
 +      // Get Random var
 +      Random random = new Random(rand.seed);
 +      
 +      int range = random.nextInt(maxvaluelength - minvaluelength + 1);
 +      int valuelen = range + minvaluelength;
 +      
 +      while (valuelen > 10) {
 +        value.append(filler[(base + valuelen) % 26], 0, 10);
 +        valuelen -= 10;
 +      }
 +      
 +      if (valuelen > 0)
 +        value.append(filler[(base + valuelen) % 26], 0, valuelen);
 +    }
 +    
 +    @Override
 +    public void map(LongWritable row, NullWritable ignored, Context context) throws IOException, InterruptedException {
 +      context.setStatus("Entering");
 +      long rowId = row.get();
 +      if (rand == null) {
 +        // we use 3 random numbers per a row
 +        rand = new RandomGenerator(rowId * 3);
 +      }
 +      addKey();
 +      value.clear();
 +      // addRowId(rowId);
 +      addFiller(rowId);
 +      
 +      // New
 +      Mutation m = new Mutation(key);
 +      m.put(new Text("c"), // column family
 +          getRowIdString(rowId), // column qual
 +          new Value(value.toString().getBytes())); // data
 +      
 +      context.setStatus("About to add to accumulo");
 +      context.write(table, m);
 +      context.setStatus("Added to accumulo " + key.toString());
 +    }
 +    
 +    @Override
 +    public void setup(Context job) {
 +      minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0);
 +      maxkeylength = job.getConfiguration().getInt("cloudgen.maxkeylength", 0);
 +      minvaluelength = job.getConfiguration().getInt("cloudgen.minvaluelength", 0);
 +      maxvaluelength = job.getConfiguration().getInt("cloudgen.maxvaluelength", 0);
 +      table = new Text(job.getConfiguration().get("cloudgen.tablename"));
 +    }
 +  }
 +  
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(CachedConfiguration.getInstance(), new TeraSortIngest(), args);
 +    System.exit(res);
 +  }
 +  
 +  static class Opts extends ClientOnRequiredTable {
 +    @Parameter(names = "--count", description = "number of rows to ingest", required = true)
 +    long numRows;
 +    @Parameter(names = {"-nk", "--minKeySize"}, description = "miniumum key size", required = true)
 +    int minKeyLength;
 +    @Parameter(names = {"-xk", "--maxKeySize"}, description = "maximum key size", required = true)
 +    int maxKeyLength;
 +    @Parameter(names = {"-nv", "--minValueSize"}, description = "minimum key size", required = true)
 +    int minValueLength;
 +    @Parameter(names = {"-xv", "--maxValueSize"}, description = "maximum key size", required = true)
 +    int maxValueLength;
 +    @Parameter(names = "--splits", description = "number of splits to create in the table")
 +    int splits = 0;
 +  }
 +  
 +  @Override
 +  public int run(String[] args) throws Exception {
 +    Job job = new Job(getConf(), "TeraSortCloud");
 +    job.setJarByClass(this.getClass());
 +    Opts opts = new Opts();
 +    opts.parseArgs(TeraSortIngest.class.getName(), args);
 +    
 +    job.setInputFormatClass(RangeInputFormat.class);
 +    job.setMapperClass(SortGenMapper.class);
 +    job.setMapOutputKeyClass(Text.class);
 +    job.setMapOutputValueClass(Mutation.class);
 +    
 +    job.setNumReduceTasks(0);
 +    
 +    job.setOutputFormatClass(AccumuloOutputFormat.class);
 +    opts.setAccumuloConfigs(job);
 +    BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(10L * 1000 * 1000);
 +    AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
 +    
 +    Configuration conf = job.getConfiguration();
 +    conf.setLong(NUMROWS, opts.numRows);
 +    conf.setInt("cloudgen.minkeylength", opts.minKeyLength);
 +    conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength);
 +    conf.setInt("cloudgen.minvaluelength", opts.minValueLength);
 +    conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength);
 +    conf.set("cloudgen.tablename", opts.tableName);
 +    
 +    if (args.length > 10)
 +      conf.setInt(NUMSPLITS, opts.splits);
 +    
 +    job.waitForCompletion(true);
 +    return job.isSuccessful() ? 0 : 1;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
index 5676394,0000000..88b2dfb
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
@@@ -1,86 -1,0 +1,87 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.util;
 +
 +import java.io.IOException;
 +
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.server.ServerConstants;
++import org.apache.accumulo.server.util.reflection.CounterUtils;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.NullWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.mapreduce.Reducer;
 +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +
 +public class CountRowKeys extends Configured implements Tool {
 +  private static class MyMapper extends Mapper<Key,Value,Text,NullWritable> {
 +    Text k = new Text();
 +    
 +    public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
 +      context.write(key.getRow(k), NullWritable.get());
 +    }
 +  }
 +  
 +  private static class MyReducer extends Reducer<Text,NullWritable,Text,Text> {
 +    public enum Count {
 +      uniqueRows
 +    }
 +    
 +    public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException {
-       context.getCounter(Count.uniqueRows).increment(1);
++      CounterUtils.increment(context.getCounter(Count.uniqueRows));
 +    }
 +  }
 +  
 +  @Override
 +  public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
 +    if (args.length != 2) {
 +      System.out.println("Usage: CountRowKeys tableName outputPath");
 +      return 1;
 +    }
 +    
 +    Job job = new Job(getConf(), this.getClass().getName());
 +    job.setJarByClass(this.getClass());
 +    
 +    job.setInputFormatClass(SequenceFileInputFormat.class);
 +    SequenceFileInputFormat.addInputPath(job, new Path(ServerConstants.getTablesDir() + "/" + args[0] + "/*/*/data"));
 +    
 +    job.setMapperClass(MyMapper.class);
 +    job.setMapOutputKeyClass(Text.class);
 +    job.setMapOutputValueClass(NullWritable.class);
 +    
 +    job.setReducerClass(MyReducer.class);
 +    
 +    TextOutputFormat.setOutputPath(job, new Path(args[1]));
 +    
 +    job.waitForCompletion(true);
 +    return job.isSuccessful() ? 0 : 1;
 +  }
 +  
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(CachedConfiguration.getInstance(), new CountRowKeys(), args);
 +    if (res != 0)
 +      System.exit(res);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java
index 0000000,0000000..dbd5f60
new file mode 100644
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/util/reflection/CounterUtils.java
@@@ -1,0 -1,0 +1,43 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.server.util.reflection;
++
++import java.lang.reflect.Method;
++
++import org.apache.hadoop.mapreduce.Counter;
++
++/**
++ * Utility class for incrementing counters in a compatible way between hadoop 1 and 2
++ */
++public class CounterUtils {
++  static private Method INCREMENT;
++  static {
++    try {
++      INCREMENT = Counter.class.getMethod("increment", Long.TYPE);
++    } catch (Exception ex) {
++      throw new RuntimeException(ex);
++    }
++  }
++
++  public static void increment(Counter counter) {
++    try {
++      INCREMENT.invoke(counter, 1L);
++    } catch (Exception ex) {
++      throw new RuntimeException(ex);
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
index bbe7fa3,0000000..a35ca66
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
@@@ -1,180 -1,0 +1,181 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.io.IOException;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.UUID;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.accumulo.core.util.CachedConfiguration;
++import org.apache.accumulo.server.util.reflection.CounterUtils;
 +import org.apache.accumulo.test.continuous.ContinuousIngest.BaseOpts;
 +import org.apache.accumulo.test.continuous.ContinuousIngest.ShortConverter;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.WritableComparator;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +
 +import com.beust.jcommander.Parameter;
 +import com.beust.jcommander.validators.PositiveInteger;
 +
 +/**
 + * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to
 + * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes.
 + * 
 + */
 +public class ContinuousMoru extends Configured implements Tool {
 +  private static final String PREFIX = ContinuousMoru.class.getSimpleName() + ".";
 +  private static final String MAX_CQ = PREFIX + "MAX_CQ";
 +  private static final String MAX_CF = PREFIX + "MAX_CF";
 +  private static final String MAX = PREFIX + "MAX";
 +  private static final String MIN = PREFIX + "MIN";
 +  private static final String CI_ID = PREFIX + "CI_ID";
 +  
 +  static enum Counts {
 +    SELF_READ;
 +  }
 +  
 +  public static class CMapper extends Mapper<Key,Value,Text,Mutation> {
 +    
 +    private short max_cf;
 +    private short max_cq;
 +    private Random random;
 +    private String ingestInstanceId;
 +    private byte[] iiId;
 +    private long count;
 +    
 +    private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility();
 +    
 +    @Override
 +    public void setup(Context context) throws IOException, InterruptedException {
 +      int max_cf = context.getConfiguration().getInt(MAX_CF, -1);
 +      int max_cq = context.getConfiguration().getInt(MAX_CQ, -1);
 +      
 +      if (max_cf > Short.MAX_VALUE || max_cq > Short.MAX_VALUE)
 +        throw new IllegalArgumentException();
 +      
 +      this.max_cf = (short) max_cf;
 +      this.max_cq = (short) max_cq;
 +      
 +      random = new Random();
 +      ingestInstanceId = context.getConfiguration().get(CI_ID);
 +      iiId = ingestInstanceId.getBytes(Constants.UTF8);
 +      
 +      count = 0;
 +    }
 +    
 +    @Override
 +    public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
 +      
 +      ContinuousWalk.validate(key, data);
 +      
 +      if (WritableComparator.compareBytes(iiId, 0, iiId.length, data.get(), 0, iiId.length) != 0) {
 +        // only rewrite data not written by this M/R job
 +        byte[] val = data.get();
 +        
 +        int offset = ContinuousWalk.getPrevRowOffset(val);
 +        if (offset > 0) {
 +          long rowLong = Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16);
 +          Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData()
 +              .toArray(), random, true);
 +          context.write(null, m);
 +        }
 +        
 +      } else {
-         ContinuousVerify.increment(context.getCounter(Counts.SELF_READ));
++        CounterUtils.increment(context.getCounter(Counts.SELF_READ));
 +      }
 +    }
 +  }
 +  
 +  static class Opts extends BaseOpts {
 +    @Parameter(names = "--maxColF", description = "maximum column family value to use", converter=ShortConverter.class)
 +    short maxColF = Short.MAX_VALUE;
 +    
 +    @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter=ShortConverter.class)
 +    short maxColQ = Short.MAX_VALUE;
 +    
 +    @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
 +    int maxMaps = 0;
 +  }
 +  
 +  @Override
 +  public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException {
 +    Opts opts = new Opts();
 +    BatchWriterOpts bwOpts = new BatchWriterOpts();
 +    opts.parseArgs(ContinuousMoru.class.getName(), args, bwOpts);
 +    
 +    Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
 +    job.setJarByClass(this.getClass());
 +    
 +    job.setInputFormatClass(AccumuloInputFormat.class);
 +    opts.setAccumuloConfigs(job);
 +    
 +    // set up ranges
 +    try {
 +      Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +      AccumuloInputFormat.setRanges(job, ranges);
 +      AccumuloInputFormat.setAutoAdjustRanges(job, false);
 +    } catch (Exception e) {
 +      throw new IOException(e);
 +    }
 +    
 +    job.setMapperClass(CMapper.class);
 +    
 +    job.setNumReduceTasks(0);
 +    
 +    job.setOutputFormatClass(AccumuloOutputFormat.class);
 +    AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig());
 +    
 +    Configuration conf = job.getConfiguration();
 +    conf.setLong(MIN, opts.min);
 +    conf.setLong(MAX, opts.max);
 +    conf.setInt(MAX_CF, opts.maxColF);
 +    conf.setInt(MAX_CQ, opts.maxColQ);
 +    conf.set(CI_ID, UUID.randomUUID().toString());
 +    
 +    job.waitForCompletion(true);
 +    opts.stopTracing();
 +    return job.isSuccessful() ? 0 : 1;
 +  }
 +  
 +  /**
 +   * 
 +   * @param args
 +   *          instanceName zookeepers username password table columns outputpath
 +   * @throws Exception
 +   */
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousMoru(), args);
 +    if (res != 0)
 +      System.exit(res);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
index 07e0c92,0000000..8095b50
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java
@@@ -1,243 -1,0 +1,224 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.continuous;
 +
 +import java.io.IOException;
- import java.lang.reflect.Method;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Random;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.ClientOnDefaultTable;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.util.CachedConfiguration;
++import org.apache.accumulo.server.util.reflection.CounterUtils;
 +import org.apache.accumulo.test.continuous.ContinuousWalk.BadChecksumException;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.LongWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.VLongWritable;
 +import org.apache.hadoop.mapred.Counters.Counter;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.mapreduce.Reducer;
 +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +
 +import com.beust.jcommander.Parameter;
 +import com.beust.jcommander.validators.PositiveInteger;
 +
 +/**
 + * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined.
 + */
 +
 +public class ContinuousVerify extends Configured implements Tool {
- 
-   // work around hadoop-1/hadoop-2 runtime incompatibility
-   static private Method INCREMENT;
-   static {
-     try {
-       INCREMENT = Counter.class.getMethod("increment", Long.TYPE);
-     } catch (Exception ex) {
-       throw new RuntimeException(ex);
-     }
-   }
- 
-   static void increment(Object obj) {
-     try {
-       INCREMENT.invoke(obj, 1L);
-     } catch (Exception ex) {
-       throw new RuntimeException(ex);
-     }
-   }
- 
 +  public static final VLongWritable DEF = new VLongWritable(-1);
 +
 +  public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> {
 +
 +    private LongWritable row = new LongWritable();
 +    private LongWritable ref = new LongWritable();
 +    private VLongWritable vrow = new VLongWritable();
 +
 +    private long corrupt = 0;
 +
 +    @Override
 +    public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
 +      long r = Long.parseLong(key.getRow().toString(), 16);
 +      if (r < 0)
 +        throw new IllegalArgumentException();
 +
 +      try {
 +        ContinuousWalk.validate(key, data);
 +      } catch (BadChecksumException bce) {
-         increment(context.getCounter(Counts.CORRUPT));
++        CounterUtils.increment(context.getCounter(Counts.CORRUPT));
 +        if (corrupt < 1000) {
 +          System.out.println("ERROR Bad checksum : " + key);
 +        } else if (corrupt == 1000) {
 +          System.out.println("Too many bad checksums, not printing anymore!");
 +        }
 +        corrupt++;
 +        return;
 +      }
 +
 +      row.set(r);
 +
 +      context.write(row, DEF);
 +      byte[] val = data.get();
 +
 +      int offset = ContinuousWalk.getPrevRowOffset(val);
 +      if (offset > 0) {
 +        ref.set(Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16));
 +        vrow.set(r);
 +        context.write(ref, vrow);
 +      }
 +    }
 +  }
 +
 +  public static enum Counts {
 +    UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
 +  }
 +
 +  public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> {
 +    private ArrayList<Long> refs = new ArrayList<Long>();
 +
 +    @Override
 +    public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException {
 +
 +      int defCount = 0;
 +
 +      refs.clear();
 +      for (VLongWritable type : values) {
 +        if (type.get() == -1) {
 +          defCount++;
 +        } else {
 +          refs.add(type.get());
 +        }
 +      }
 +
 +      if (defCount == 0 && refs.size() > 0) {
 +        StringBuilder sb = new StringBuilder();
 +        String comma = "";
 +        for (Long ref : refs) {
 +          sb.append(comma);
 +          comma = ",";
 +          sb.append(new String(ContinuousIngest.genRow(ref), Constants.UTF8));
 +        }
 +
 +        context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
-         increment(context.getCounter(Counts.UNDEFINED));
++        CounterUtils.increment(context.getCounter(Counts.UNDEFINED));
 +
 +      } else if (defCount > 0 && refs.size() == 0) {
-         increment(context.getCounter(Counts.UNREFERENCED));
++        CounterUtils.increment(context.getCounter(Counts.UNREFERENCED));
 +      } else {
-         increment(context.getCounter(Counts.REFERENCED));
++        CounterUtils.increment(context.getCounter(Counts.REFERENCED));
 +      }
 +
 +    }
 +  }
 +
 +  static class Opts extends ClientOnDefaultTable {
 +    @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist", required = true)
 +    String outputDir = "/tmp/continuousVerify";
 +
 +    @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
 +    int maxMaps = 0;
 +
 +    @Parameter(names = "--reducers", description = "the number of reducers to use", required = true, validateWith = PositiveInteger.class)
 +    int reducers = 0;
 +
 +    @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline")
 +    boolean scanOffline = false;
 +
 +    public Opts() {
 +      super("ci");
 +    }
 +  }
 +
 +  @Override
 +  public int run(String[] args) throws Exception {
 +    Opts opts = new Opts();
 +    opts.parseArgs(this.getClass().getName(), args);
 +
 +    Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
 +    job.setJarByClass(this.getClass());
 +
 +    job.setInputFormatClass(AccumuloInputFormat.class);
 +    opts.setAccumuloConfigs(job);
 +
 +    Set<Range> ranges = null;
 +    String clone = opts.getTableName();
 +    Connector conn = null;
 +
 +    if (opts.scanOffline) {
 +      Random random = new Random();
 +      clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl));
 +      conn = opts.getConnector();
 +      conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>());
 +      ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +      conn.tableOperations().offline(clone);
 +      AccumuloInputFormat.setInputTableName(job, clone);
 +      AccumuloInputFormat.setOfflineTableScan(job, true);
 +    } else {
 +      ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
 +    }
 +
 +    AccumuloInputFormat.setRanges(job, ranges);
 +    AccumuloInputFormat.setAutoAdjustRanges(job, false);
 +
 +    job.setMapperClass(CMapper.class);
 +    job.setMapOutputKeyClass(LongWritable.class);
 +    job.setMapOutputValueClass(VLongWritable.class);
 +
 +    job.setReducerClass(CReducer.class);
 +    job.setNumReduceTasks(opts.reducers);
 +
 +    job.setOutputFormatClass(TextOutputFormat.class);
 +
 +    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline);
 +
 +    TextOutputFormat.setOutputPath(job, new Path(opts.outputDir));
 +
 +    job.waitForCompletion(true);
 +
 +    if (opts.scanOffline) {
 +      conn.tableOperations().delete(clone);
 +    }
 +    opts.stopTracing();
 +    return job.isSuccessful() ? 0 : 1;
 +  }
 +
 +  /**
 +   * 
 +   * @param args
 +   *          instanceName zookeepers username password table columns outputpath
 +   * @throws Exception
 +   */
 +  public static void main(String[] args) throws Exception {
 +    int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args);
 +    if (res != 0)
 +      System.exit(res);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d2cd1c0/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
index f6ebe87,0000000..2b775c5
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
@@@ -1,216 -1,0 +1,217 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.io.BufferedReader;
 +import java.io.File;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.InputStreamReader;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.cli.Help;
++import org.apache.accumulo.server.util.reflection.CounterUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.LongWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +import org.apache.log4j.Logger;
 +
 +import com.beust.jcommander.Parameter;
 +
 +/**
 + * Runs the functional tests via map-reduce.
 + * 
 + * First, be sure everything is compiled.
 + * 
 + * Second, get a list of the tests you want to run:
 + * 
 + * <pre>
 + *  $ python test/system/auto/run.py -l > tests
 + * </pre>
 + * 
 + * Put the list of tests into HDFS:
 + * 
 + * <pre>
 + *  $ hadoop fs -put tests /user/hadoop/tests
 + * </pre>
 + * 
 + * Run the map-reduce job:
 + * 
 + * <pre>
 + *  $ ./bin/accumulo accumulo.test.functional.RunTests --tests /user/hadoop/tests --output /user/hadoop/results
 + * </pre>
 + * 
 + * Note that you will need to have some configuration in conf/accumulo-site.xml (to locate zookeeper). The map-reduce jobs will not use your local accumulo
 + * instance.
 + * 
 + */
 +public class RunTests extends Configured implements Tool {
 +  
 +  static final public String JOB_NAME = "Functional Test Runner";
 +  private static final Logger log = Logger.getLogger(RunTests.class);
 +  
 +  private Job job = null;
 +
 +  private static final int DEFAULT_TIMEOUT_FACTOR = 1;
 +
 +  static class Opts extends Help {
 +    @Parameter(names="--tests", description="newline separated list of tests to run", required=true)
 +    String testFile;
 +    @Parameter(names="--output", description="destination for the results of tests in HDFS", required=true)
 +    String outputPath;
 +    @Parameter(names="--timeoutFactor", description="Optional scaling factor for timeout for both mapred.task.timeout and -f flag on run.py", required=false)
 +    Integer intTimeoutFactor = DEFAULT_TIMEOUT_FACTOR;
 +  }
 +  
 +  static final String TIMEOUT_FACTOR = RunTests.class.getName() + ".timeoutFactor";
 +
 +  static public class TestMapper extends Mapper<LongWritable,Text,Text,Text> {
 +    
 +    private static final String REDUCER_RESULT_START = "::::: ";
 +    private static final int RRS_LEN = REDUCER_RESULT_START.length();
 +    private Text result = new Text();
 +    String mapperTimeoutFactor = null;
 +
 +    private static enum Outcome {
 +      SUCCESS, FAILURE, ERROR, UNEXPECTED_SUCCESS, EXPECTED_FAILURE
 +    }
 +    private static final Map<Character, Outcome> OUTCOME_COUNTERS;
 +    static {
 +      OUTCOME_COUNTERS = new java.util.HashMap<Character, Outcome>();
 +      OUTCOME_COUNTERS.put('S', Outcome.SUCCESS);
 +      OUTCOME_COUNTERS.put('F', Outcome.FAILURE);
 +      OUTCOME_COUNTERS.put('E', Outcome.ERROR);
 +      OUTCOME_COUNTERS.put('T', Outcome.UNEXPECTED_SUCCESS);
 +      OUTCOME_COUNTERS.put('G', Outcome.EXPECTED_FAILURE);
 +    }
 +
 +    @Override
 +    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 +      List<String> cmd = Arrays.asList("/usr/bin/python", "test/system/auto/run.py", "-m", "-f", mapperTimeoutFactor, "-t", value.toString());
 +      log.info("Running test " + cmd);
 +      ProcessBuilder pb = new ProcessBuilder(cmd);
 +      pb.directory(new File(context.getConfiguration().get("accumulo.home")));
 +      pb.redirectErrorStream(true);
 +      Process p = pb.start();
 +      p.getOutputStream().close();
 +      InputStream out = p.getInputStream();
 +      InputStreamReader outr = new InputStreamReader(out, Constants.UTF8);
 +      BufferedReader br = new BufferedReader(outr);
 +      String line;
 +      try {
 +        while ((line = br.readLine()) != null) {
 +          log.info("More: " + line);
 +          if (line.startsWith(REDUCER_RESULT_START)) {
 +            String resultLine = line.substring(RRS_LEN);
 +            if (resultLine.length() > 0) {
 +              Outcome outcome = OUTCOME_COUNTERS.get(resultLine.charAt(0));
 +              if (outcome != null) {
-                 context.getCounter(outcome).increment(1);
++                CounterUtils.increment(context.getCounter(outcome));
 +              }
 +            }
 +            String taskAttemptId = context.getTaskAttemptID().toString();
 +            result.set(taskAttemptId + " " + resultLine);
 +            context.write(value, result);
 +          }
 +        }
 +      } catch (Exception ex) {
 +        log.error(ex);
 +        context.progress();
 +      }
 +
 +      p.waitFor();
 +    }
 +    
 +    @Override
 +    protected void setup(Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException, InterruptedException {
 +      mapperTimeoutFactor = Integer.toString(context.getConfiguration().getInt(TIMEOUT_FACTOR, DEFAULT_TIMEOUT_FACTOR));
 +    }
 +  }
 +  
 +  @Override
 +  public int run(String[] args) throws Exception {
 +    job = new Job(getConf(), JOB_NAME);
 +    job.setJarByClass(this.getClass());
 +    Opts opts = new Opts();
 +    opts.parseArgs(RunTests.class.getName(), args);
 +    
 +    // this is like 1-2 tests per mapper
 +    Configuration conf = job.getConfiguration();
 +    conf.setInt("mapred.max.split.size", 40);
 +    conf.set("accumulo.home", System.getenv("ACCUMULO_HOME"));
 +
 +    // Taking third argument as scaling factor to setting mapred.task.timeout
 +    // and TIMEOUT_FACTOR
 +    conf.setInt("mapred.task.timeout", opts.intTimeoutFactor * 8 * 60 * 1000);
 +    conf.setInt(TIMEOUT_FACTOR, opts.intTimeoutFactor);
 +    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
 +    
 +    // set input
 +    job.setInputFormatClass(TextInputFormat.class);
 +    TextInputFormat.setInputPaths(job, new Path(opts.testFile));
 +    
 +    // set output
 +    job.setOutputFormatClass(TextOutputFormat.class);
 +    FileSystem fs = FileSystem.get(conf);
 +    Path destination = new Path(opts.outputPath);
 +    if (fs.exists(destination)) {
 +      log.info("Deleting existing output directory " + opts.outputPath);
 +      fs.delete(destination, true);
 +    }
 +    TextOutputFormat.setOutputPath(job, destination);
 +    
 +    // configure default reducer: put the results into one file
 +    job.setNumReduceTasks(1);
 +    
 +    // set mapper
 +    job.setMapperClass(TestMapper.class);
 +    job.setOutputKeyClass(Text.class);
 +    job.setOutputValueClass(Text.class);
 +    
 +    // don't do anything with the results (yet) a summary would be nice
 +    job.setNumReduceTasks(0);
 +    
 +    // submit the job
 +    log.info("Starting tests");
 +    return 0;
 +  }
 +  
 +  /**
 +   * @param args
 +   * @throws Exception
 +   */
 +  public static void main(String[] args) throws Exception {
 +    RunTests tests = new RunTests();
 +    ToolRunner.run(new Configuration(), tests, args);
 +    tests.job.waitForCompletion(true);
 +    if (!tests.job.isSuccessful())
 +      System.exit(1);
 +  }
 +  
 +}


Mime
View raw message