accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [03/14] accumulo git commit: ACCUMULO-3920 Convert more tests from mock
Date Thu, 30 Jul 2015 21:51:37 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java
new file mode 100644
index 0000000..bb48761
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapred;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloOutputFormatIT extends ConfigurableMacBase {
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "1");
+    cfg.setNumTservers(1);
+  }
+
+  // Prevent regression of ACCUMULO-3709.
+  @Test
+  public void testMapred() throws Exception {
+    Connector connector = getConnector();
+    // create a table and put some data in it
+    connector.tableOperations().create(testName.getMethodName());
+
+    JobConf job = new JobConf();
+    BatchWriterConfig batchConfig = new BatchWriterConfig();
+    // no flushes!!!!!
+    batchConfig.setMaxLatency(0, TimeUnit.MILLISECONDS);
+    // use a single thread to ensure our update session times out
+    batchConfig.setMaxWriteThreads(1);
+    // set the max memory so that we ensure we don't flush on the write.
+    batchConfig.setMaxMemory(Long.MAX_VALUE);
+    AccumuloOutputFormat outputFormat = new AccumuloOutputFormat();
+    AccumuloOutputFormat.setBatchWriterOptions(job, batchConfig);
+    AccumuloOutputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
+    AccumuloOutputFormat.setConnectorInfo(job, "root", new PasswordToken(ROOT_PASSWORD));
+    RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null);
+
+    try {
+      for (int i = 0; i < 3; i++) {
+        Mutation m = new Mutation(new Text(String.format("%08d", i)));
+        for (int j = 0; j < 3; j++) {
+          m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8)));
+        }
+        writer.write(new Text(testName.getMethodName()), m);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      // we don't want the exception to come from write
+    }
+
+    connector.securityOperations().revokeTablePermission("root", testName.getMethodName(), TablePermission.WRITE);
+
+    try {
+      writer.close(null);
+      fail("Did not throw exception");
+    } catch (IOException ex) {
+      log.info(ex.getMessage(), ex);
+      assertTrue(ex.getCause() instanceof MutationsRejectedException);
+    }
+  }
+
+  private static AssertionError e1 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+      OutputCollector<Text,Mutation> finalOutput;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) throws IOException {
+        finalOutput = output;
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        finalOutput.collect(new Text(), m);
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 6) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <inputtable> <outputtable> <instanceName> <zooKeepers>");
+      }
+
+      String user = args[0];
+      String pass = args[1];
+      String table1 = args[2];
+      String table2 = args[3];
+      String instanceName = args[4];
+      String zooKeepers = args[5];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      ClientConfiguration clientConfig = new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers);
+
+      AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloInputFormat.setInputTableName(job, table1);
+      AccumuloInputFormat.setZooKeeperInstance(job, clientConfig);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
+      AccumuloOutputFormat.setCreateTables(job, false);
+      AccumuloOutputFormat.setDefaultTableName(job, table2);
+      AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMR() throws Exception {
+    Connector c = getConnector();
+    String instanceName = getCluster().getInstanceName();
+    String table1 = instanceName + "_t1";
+    String table2 = instanceName + "_t2";
+    c.tableOperations().create(table1);
+    c.tableOperations().create(table2);
+    BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    MRTester.main(new String[] {"root", ROOT_PASSWORD, table1, table2, instanceName, getCluster().getZooKeepers()});
+    assertNull(e1);
+
+    Scanner scanner = c.createScanner(table2, new Authorizations());
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    assertTrue(iter.hasNext());
+    Entry<Key,Value> entry = iter.next();
+    assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+    assertFalse(iter.hasNext());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloRowInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloRowInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloRowInputFormatIT.java
new file mode 100644
index 0000000..d266d3d
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloRowInputFormatIT.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapred.AccumuloRowInputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AccumuloRowInputFormatIT extends AccumuloClusterHarness {
+
+  private static final String ROW1 = "row1";
+  private static final String ROW2 = "row2";
+  private static final String ROW3 = "row3";
+  private static final String COLF1 = "colf1";
+  private static List<Entry<Key,Value>> row1;
+  private static List<Entry<Key,Value>> row2;
+  private static List<Entry<Key,Value>> row3;
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  @BeforeClass
+  public static void prepareRows() {
+    row1 = new ArrayList<Entry<Key,Value>>();
+    row1.add(new KeyValue(new Key(ROW1, COLF1, "colq1"), "v1".getBytes()));
+    row1.add(new KeyValue(new Key(ROW1, COLF1, "colq2"), "v2".getBytes()));
+    row1.add(new KeyValue(new Key(ROW1, "colf2", "colq3"), "v3".getBytes()));
+    row2 = new ArrayList<Entry<Key,Value>>();
+    row2.add(new KeyValue(new Key(ROW2, COLF1, "colq4"), "v4".getBytes()));
+    row3 = new ArrayList<Entry<Key,Value>>();
+    row3.add(new KeyValue(new Key(ROW3, COLF1, "colq5"), "v5".getBytes()));
+  }
+
+  private static void checkLists(final List<Entry<Key,Value>> first, final Iterator<Entry<Key,Value>> second) {
+    int entryIndex = 0;
+    while (second.hasNext()) {
+      final Entry<Key,Value> entry = second.next();
+      assertEquals("Keys should be equal", first.get(entryIndex).getKey(), entry.getKey());
+      assertEquals("Values should be equal", first.get(entryIndex).getValue(), entry.getValue());
+      entryIndex++;
+    }
+  }
+
+  private static void insertList(final BatchWriter writer, final List<Entry<Key,Value>> list) throws MutationsRejectedException {
+    for (Entry<Key,Value> e : list) {
+      final Key key = e.getKey();
+      final Mutation mutation = new Mutation(key.getRow());
+      ColumnVisibility colVisibility = new ColumnVisibility(key.getColumnVisibility());
+      mutation.put(key.getColumnFamily(), key.getColumnQualifier(), colVisibility, key.getTimestamp(), e.getValue());
+      writer.addMutation(mutation);
+    }
+  }
+
+  private static class MRTester extends Configured implements Tool {
+    public static class TestMapper implements Mapper<Text,PeekingIterator<Entry<Key,Value>>,Key,Value> {
+      int count = 0;
+
+      @Override
+      public void map(Text k, PeekingIterator<Entry<Key,Value>> v, OutputCollector<Key,Value> output, Reporter reporter) throws IOException {
+        try {
+          switch (count) {
+            case 0:
+              assertEquals("Current key should be " + ROW1, new Text(ROW1), k);
+              checkLists(row1, v);
+              break;
+            case 1:
+              assertEquals("Current key should be " + ROW2, new Text(ROW2), k);
+              checkLists(row2, v);
+              break;
+            case 2:
+              assertEquals("Current key should be " + ROW3, new Text(ROW3), k);
+              checkLists(row3, v);
+              break;
+            default:
+              assertTrue(false);
+          }
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        try {
+          assertEquals(3, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 1) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table>");
+      }
+
+      String user = getAdminPrincipal();
+      AuthenticationToken pass = getAdminToken();
+      String table = args[0];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloRowInputFormat.class);
+
+      AccumuloInputFormat.setConnectorInfo(job, user, pass);
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloRowInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    final Connector conn = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+    BatchWriter writer = null;
+    try {
+      writer = conn.createBatchWriter(tableName, new BatchWriterConfig());
+      insertList(writer, row1);
+      insertList(writer, row2);
+      insertList(writer, row3);
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+    MRTester.main(new String[] {tableName});
+    assertNull(e1);
+    assertNull(e2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
new file mode 100644
index 0000000..e30a033
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TokenFileIT extends AccumuloClusterHarness {
+  private static AssertionError e1 = null;
+
+  private static class MRTokenFileTester extends Configured implements Tool {
+    private static class TestMapper implements Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+      OutputCollector<Text,Mutation> finalOutput;
+
+      @Override
+      public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) throws IOException {
+        finalOutput = output;
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      public void configure(JobConf job) {}
+
+      @Override
+      public void close() throws IOException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        finalOutput.collect(new Text(), m);
+      }
+
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 3) {
+        throw new IllegalArgumentException("Usage : " + MRTokenFileTester.class.getName() + " <token file> <inputtable> <outputtable>");
+      }
+
+      String user = getAdminPrincipal();
+      String tokenFile = args[0];
+      String table1 = args[1];
+      String table2 = args[2];
+
+      JobConf job = new JobConf(getConf());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormat(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.setConnectorInfo(job, user, tokenFile);
+      AccumuloInputFormat.setInputTableName(job, table1);
+      AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormat(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.setConnectorInfo(job, user, tokenFile);
+      AccumuloOutputFormat.setCreateTables(job, false);
+      AccumuloOutputFormat.setDefaultTableName(job, table2);
+      AccumuloOutputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      job.setNumReduceTasks(0);
+
+      return JobClient.runJob(job).isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = CachedConfiguration.getInstance();
+      conf.set("hadoop.tmp.dir", new File(args[0]).getParent());
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
+    }
+  }
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @Test
+  public void testMR() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    Connector c = getConnector();
+    c.tableOperations().create(table1);
+    c.tableOperations().create(table2);
+    BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    File tf = folder.newFile("root_test.pw");
+    PrintStream out = new PrintStream(tf);
+    String outString = new Credentials(getAdminPrincipal(), getAdminToken()).serialize();
+    out.println(outString);
+    out.close();
+
+    MRTokenFileTester.main(new String[] {tf.getAbsolutePath(), table1, table2});
+    assertNull(e1);
+
+    Scanner scanner = c.createScanner(table2, new Authorizations());
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    assertTrue(iter.hasNext());
+    Entry<Key,Value> entry = iter.next();
+    assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+    assertFalse(iter.hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
new file mode 100644
index 0000000..8f53378
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
+
+  private String PREFIX;
+  private String BAD_TABLE;
+  private String TEST_TABLE;
+  private String EMPTY_TABLE;
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @Before
+  public void setup() throws Exception {
+    PREFIX = testName.getMethodName() + "_";
+    BAD_TABLE = PREFIX + "_mapreduce_bad_table";
+    TEST_TABLE = PREFIX + "_mapreduce_test_table";
+    EMPTY_TABLE = PREFIX + "_mapreduce_empty_table";
+
+    Connector c = getConnector();
+    c.tableOperations().create(EMPTY_TABLE);
+    c.tableOperations().create(TEST_TABLE);
+    c.tableOperations().create(BAD_TABLE);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE, new BatchWriterConfig());
+    Mutation m = new Mutation("Key");
+    m.put("", "", "");
+    bw.addMutation(m);
+    bw.close();
+    bw = c.createBatchWriter(BAD_TABLE, new BatchWriterConfig());
+    m = new Mutation("r1");
+    m.put("cf1", "cq1", "A&B");
+    m.put("cf1", "cq1", "A&B");
+    m.put("cf1", "cq2", "A&");
+    bw.addMutation(m);
+    bw.close();
+  }
+
+  @Test
+  public void testEmptyWrite() throws Exception {
+    handleWriteTests(false);
+  }
+
+  @Test
+  public void testRealWrite() throws Exception {
+    handleWriteTests(true);
+  }
+
+  private static class MRTester extends Configured implements Tool {
+    private static class BadKeyMapper extends Mapper<Key,Value,Key,Value> {
+      int index = 0;
+
+      @Override
+      protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+        try {
+          try {
+            context.write(key, value);
+            if (index == 2)
+              assertTrue(false);
+          } catch (Exception e) {
+            assertEquals(2, index);
+          }
+        } catch (AssertionError e) {
+          assertionErrors.put(table + "_map", e);
+        }
+        index++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+        try {
+          assertEquals(2, index);
+        } catch (AssertionError e) {
+          assertionErrors.put(table + "_cleanup", e);
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table> <outputfile>");
+      }
+
+      String table = args[0];
+      assertionErrors.put(table + "_map", new AssertionError("Dummy_map"));
+      assertionErrors.put(table + "_cleanup", new AssertionError("Dummy_cleanup"));
+
+      Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+      AccumuloFileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+      job.setMapperClass(table.endsWith("_mapreduce_bad_table") ? BadKeyMapper.class : Mapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+      job.getConfiguration().set("MRTester_tableName", table);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  private void handleWriteTests(boolean content) throws Exception {
+    File f = folder.newFile(testName.getMethodName());
+    assertTrue(f.delete());
+    MRTester.main(new String[] {content ? TEST_TABLE : EMPTY_TABLE, f.getAbsolutePath()});
+
+    assertTrue(f.exists());
+    File[] files = f.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(File file) {
+        return file.getName().startsWith("part-m-");
+      }
+    });
+    assertNotNull(files);
+    if (content) {
+      assertEquals(1, files.length);
+      assertTrue(files[0].exists());
+    } else {
+      assertEquals(0, files.length);
+    }
+  }
+
+  // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to ensure test correctness),
+  // so error tests should check to see if there is at least one error (could be more depending on the test) rather than zero
+  private static Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create();
+
+  @Test
+  public void writeBadVisibility() throws Exception {
+    File f = folder.newFile(testName.getMethodName());
+    assertTrue(f.delete());
+    MRTester.main(new String[] {BAD_TABLE, f.getAbsolutePath()});
+    assertTrue(f.exists());
+    assertEquals(1, assertionErrors.get(BAD_TABLE + "_map").size());
+    assertEquals(1, assertionErrors.get(BAD_TABLE + "_cleanup").size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
new file mode 100644
index 0000000..1ca4f92
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapreduce;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.lang.System.currentTimeMillis;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
+
+  AccumuloInputFormat inputFormat;
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(1);
+  }
+
+  @Before
+  public void before() {
+    inputFormat = new AccumuloInputFormat();
+  }
+
+  /**
+   * Tests several different paths through the getSplits() method by setting different properties and verifying the results.
+   */
+  @Test
+  public void testGetSplits() throws Exception {
+    Connector conn = getConnector();
+    String table = getUniqueNames(1)[0];
+    conn.tableOperations().create(table);
+    insertData(table, currentTimeMillis());
+
+    ClientConfiguration clientConf = cluster.getClientConfig();
+    AccumuloConfiguration clusterClientConf = new ConfigurationCopy(new DefaultConfiguration());
+
+    // Pass SSL and CredentialProvider options into the ClientConfiguration given to AccumuloInputFormat
+    boolean sslEnabled = Boolean.valueOf(clusterClientConf.get(Property.INSTANCE_RPC_SSL_ENABLED));
+    if (sslEnabled) {
+      ClientProperty[] sslProperties = new ClientProperty[] {ClientProperty.INSTANCE_RPC_SSL_ENABLED, ClientProperty.INSTANCE_RPC_SSL_CLIENT_AUTH,
+          ClientProperty.RPC_SSL_KEYSTORE_PATH, ClientProperty.RPC_SSL_KEYSTORE_TYPE, ClientProperty.RPC_SSL_KEYSTORE_PASSWORD,
+          ClientProperty.RPC_SSL_TRUSTSTORE_PATH, ClientProperty.RPC_SSL_TRUSTSTORE_TYPE, ClientProperty.RPC_SSL_TRUSTSTORE_PASSWORD,
+          ClientProperty.RPC_USE_JSSE, ClientProperty.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS};
+
+      for (ClientProperty prop : sslProperties) {
+        // The default property is returned if it's not in the ClientConfiguration so we don't have to check if the value is actually defined
+        clientConf.setProperty(prop, clusterClientConf.get(prop.getKey()));
+      }
+    }
+
+    Job job = Job.getInstance();
+    AccumuloInputFormat.setInputTableName(job, table);
+    AccumuloInputFormat.setZooKeeperInstance(job, clientConf);
+    AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+
+    // split table
+    TreeSet<Text> splitsToAdd = new TreeSet<Text>();
+    for (int i = 0; i < 10000; i += 1000)
+      splitsToAdd.add(new Text(String.format("%09d", i)));
+    conn.tableOperations().addSplits(table, splitsToAdd);
+    sleepUninterruptibly(500, TimeUnit.MILLISECONDS); // wait for splits to be propagated
+
+    // get splits without setting any range
+    Collection<Text> actualSplits = conn.tableOperations().listSplits(table);
+    List<InputSplit> splits = inputFormat.getSplits(job);
+    assertEquals(actualSplits.size() + 1, splits.size()); // No ranges set on the job so it'll start with -inf
+
+    // set ranges and get splits
+    List<Range> ranges = new ArrayList<Range>();
+    for (Text text : actualSplits)
+      ranges.add(new Range(text));
+    AccumuloInputFormat.setRanges(job, ranges);
+    splits = inputFormat.getSplits(job);
+    assertEquals(actualSplits.size(), splits.size());
+
+    // offline mode
+    AccumuloInputFormat.setOfflineTableScan(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IOException e) {}
+
+    conn.tableOperations().offline(table, true);
+    splits = inputFormat.getSplits(job);
+    assertEquals(actualSplits.size(), splits.size());
+
+    // auto adjust ranges
+    ranges = new ArrayList<Range>();
+    for (int i = 0; i < 5; i++)
+      // overlapping ranges
+      ranges.add(new Range(String.format("%09d", i), String.format("%09d", i + 2)));
+    AccumuloInputFormat.setRanges(job, ranges);
+    splits = inputFormat.getSplits(job);
+    assertEquals(2, splits.size());
+
+    AccumuloInputFormat.setAutoAdjustRanges(job, false);
+    splits = inputFormat.getSplits(job);
+    assertEquals(ranges.size(), splits.size());
+
+    // BatchScan not available for offline scans
+    AccumuloInputFormat.setBatchScan(job, true);
+    // Reset auto-adjust ranges too
+    AccumuloInputFormat.setAutoAdjustRanges(job, true);
+
+    AccumuloInputFormat.setOfflineTableScan(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IllegalArgumentException e) {}
+
+    conn.tableOperations().online(table, true);
+    AccumuloInputFormat.setOfflineTableScan(job, false);
+
+    // test for resumption of success
+    splits = inputFormat.getSplits(job);
+    assertEquals(2, splits.size());
+
+    // BatchScan not available with isolated iterators
+    AccumuloInputFormat.setScanIsolation(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IllegalArgumentException e) {}
+    AccumuloInputFormat.setScanIsolation(job, false);
+
+    // test for resumption of success
+    splits = inputFormat.getSplits(job);
+    assertEquals(2, splits.size());
+
+    // BatchScan not available with local iterators
+    AccumuloInputFormat.setLocalIterators(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IllegalArgumentException e) {}
+    AccumuloInputFormat.setLocalIterators(job, false);
+
+    // Check we are getting back correct type pf split
+    conn.tableOperations().online(table);
+    splits = inputFormat.getSplits(job);
+    for (InputSplit split : splits)
+      assert (split instanceof BatchInputSplit);
+
+    // We should divide along the tablet lines similar to when using `setAutoAdjustRanges(job, true)`
+    assertEquals(2, splits.size());
+  }
+
+  private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    BatchWriter bw = getConnector().createBatchWriter(tableName, null);
+
+    for (int i = 0; i < 10000; i++) {
+      String row = String.format("%09d", i);
+
+      Mutation m = new Mutation(new Text(row));
+      m.put(new Text("cf1"), new Text("cq1"), ts, new Value(("" + i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+  }
+
+  // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to ensure test correctness),
+  // so error tests should check to see if there is at least one error (could be more depending on the test) rather than zero
+  private static Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create();
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          assertionErrors.put(table + "_map", e);
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          assertionErrors.put(table + "_cleanup", e);
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2 && args.length != 3) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table> <inputFormatClass> [<batchScan>]");
+      }
+
+      String table = args[0];
+      String inputFormatClassName = args[1];
+      Boolean batchScan = false;
+      if (args.length == 3)
+        batchScan = Boolean.parseBoolean(args[2]);
+
+      assertionErrors.put(table + "_map", new AssertionError("Dummy_map"));
+      assertionErrors.put(table + "_cleanup", new AssertionError("Dummy_cleanup"));
+
+      @SuppressWarnings("unchecked")
+      Class<? extends InputFormat<?,?>> inputFormatClass = (Class<? extends InputFormat<?,?>>) Class.forName(inputFormatClassName);
+
+      Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+      job.getConfiguration().set("MRTester_tableName", table);
+
+      job.setInputFormatClass(inputFormatClass);
+
+      AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
+      AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloInputFormat.setBatchScan(job, batchScan);
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static int main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      return ToolRunner.run(conf, new MRTester(), args);
+    }
+  }
+
+  @Test
+  public void testMap() throws Exception {
+    final String TEST_TABLE_1 = getUniqueNames(1)[0];
+
+    Connector c = getConnector();
+    c.tableOperations().create(TEST_TABLE_1);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    Assert.assertEquals(0, MRTester.main(new String[] {TEST_TABLE_1, AccumuloInputFormat.class.getName()}));
+    assertEquals(1, assertionErrors.get(TEST_TABLE_1 + "_map").size());
+    assertEquals(1, assertionErrors.get(TEST_TABLE_1 + "_cleanup").size());
+  }
+
+  @Test
+  public void testMapWithBatchScanner() throws Exception {
+    final String TEST_TABLE_2 = getUniqueNames(1)[0];
+
+    Connector c = getConnector();
+    c.tableOperations().create(TEST_TABLE_2);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    Assert.assertEquals(0, MRTester.main(new String[] {TEST_TABLE_2, AccumuloInputFormat.class.getName(), "True"}));
+    assertEquals(1, assertionErrors.get(TEST_TABLE_2 + "_map").size());
+    assertEquals(1, assertionErrors.get(TEST_TABLE_2 + "_cleanup").size());
+  }
+
+  @Test
+  public void testCorrectRangeInputSplits() throws Exception {
+    Job job = Job.getInstance();
+
+    String table = getUniqueNames(1)[0];
+    Authorizations auths = new Authorizations("foo");
+    Collection<Pair<Text,Text>> fetchColumns = Collections.singleton(new Pair<Text,Text>(new Text("foo"), new Text("bar")));
+    boolean isolated = true, localIters = true;
+    Level level = Level.WARN;
+
+    Connector connector = getConnector();
+    connector.tableOperations().create(table);
+
+    AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
+    AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+
+    AccumuloInputFormat.setInputTableName(job, table);
+    AccumuloInputFormat.setScanAuthorizations(job, auths);
+    AccumuloInputFormat.setScanIsolation(job, isolated);
+    AccumuloInputFormat.setLocalIterators(job, localIters);
+    AccumuloInputFormat.fetchColumns(job, fetchColumns);
+    AccumuloInputFormat.setLogLevel(job, level);
+
+    AccumuloInputFormat aif = new AccumuloInputFormat();
+
+    List<InputSplit> splits = aif.getSplits(job);
+
+    Assert.assertEquals(1, splits.size());
+
+    InputSplit split = splits.get(0);
+
+    Assert.assertEquals(RangeInputSplit.class, split.getClass());
+
+    RangeInputSplit risplit = (RangeInputSplit) split;
+
+    Assert.assertEquals(getAdminPrincipal(), risplit.getPrincipal());
+    Assert.assertEquals(table, risplit.getTableName());
+    Assert.assertEquals(getAdminToken(), risplit.getToken());
+    Assert.assertEquals(auths, risplit.getAuths());
+    Assert.assertEquals(getConnector().getInstance().getInstanceName(), risplit.getInstanceName());
+    Assert.assertEquals(isolated, risplit.isIsolatedScan());
+    Assert.assertEquals(localIters, risplit.usesLocalIterators());
+    Assert.assertEquals(fetchColumns, risplit.getFetchedColumns());
+    Assert.assertEquals(level, risplit.getLogLevel());
+  }
+
+  @Test
+  public void testPartialInputSplitDelegationToConfiguration() throws Exception {
+    String table = getUniqueNames(1)[0];
+    Connector c = getConnector();
+    c.tableOperations().create(table);
+    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    Assert.assertEquals(0, MRTester.main(new String[] {table, EmptySplitsAccumuloInputFormat.class.getName()}));
+    assertEquals(1, assertionErrors.get(table + "_map").size());
+    assertEquals(1, assertionErrors.get(table + "_cleanup").size());
+  }
+
+  @Test
+  public void testPartialFailedInputSplitDelegationToConfiguration() throws Exception {
+    String table = getUniqueNames(1)[0];
+    Connector c = getConnector();
+    c.tableOperations().create(table);
+    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    Assert.assertEquals(1, MRTester.main(new String[] {table, BadPasswordSplitsAccumuloInputFormat.class.getName()}));
+    assertEquals(1, assertionErrors.get(table + "_map").size());
+    // We should fail when the RecordReader fails to get the next key/value pair, because the record reader is set up with a clientcontext, rather than a
+    // connector, so it doesn't do fast-fail on bad credentials
+    assertEquals(2, assertionErrors.get(table + "_cleanup").size());
+  }
+
+  /**
+   * AccumuloInputFormat which returns an "empty" RangeInputSplit
+   */
+  public static class BadPasswordSplitsAccumuloInputFormat extends AccumuloInputFormat {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context) throws IOException {
+      List<InputSplit> splits = super.getSplits(context);
+
+      for (InputSplit split : splits) {
+        org.apache.accumulo.core.client.mapreduce.RangeInputSplit rangeSplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split;
+        rangeSplit.setToken(new PasswordToken("anythingelse"));
+      }
+
+      return splits;
+    }
+  }
+
+  /**
+   * AccumuloInputFormat which returns an "empty" RangeInputSplit
+   */
+  public static class EmptySplitsAccumuloInputFormat extends AccumuloInputFormat {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context) throws IOException {
+      List<InputSplit> oldSplits = super.getSplits(context);
+      List<InputSplit> newSplits = new ArrayList<InputSplit>(oldSplits.size());
+
+      // Copy only the necessary information
+      for (InputSplit oldSplit : oldSplits) {
+        org.apache.accumulo.core.client.mapreduce.RangeInputSplit newSplit = new org.apache.accumulo.core.client.mapreduce.RangeInputSplit(
+            (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) oldSplit);
+        newSplits.add(newSplit);
+      }
+
+      return newSplits;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloMultiTableInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloMultiTableInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloMultiTableInputFormatIT.java
new file mode 100644
index 0000000..1694f6b
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloMultiTableInputFormatIT.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapreduce.AccumuloMultiTableInputFormat;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
+import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloMultiTableInputFormatIT extends AccumuloClusterHarness {
+
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  private static class MRTester extends Configured implements Tool {
+
+    private static class TestMapper extends Mapper<Key,Value,Key,Value> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          String tableName = ((RangeInputSplit) context.getInputSplit()).getTableName();
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+          assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+          assertEquals(100, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table1> <table2>");
+      }
+
+      String user = getAdminPrincipal();
+      AuthenticationToken pass = getAdminToken();
+      String table1 = args[0];
+      String table2 = args[1];
+
+      Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloMultiTableInputFormat.class);
+
+      AccumuloMultiTableInputFormat.setConnectorInfo(job, user, pass);
+
+      InputTableConfig tableConfig1 = new InputTableConfig();
+      InputTableConfig tableConfig2 = new InputTableConfig();
+
+      Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
+      configMap.put(table1, tableConfig1);
+      configMap.put(table2, tableConfig2);
+
+      AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
+      AccumuloMultiTableInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  /**
+   * Generate incrementing counts and attach table name to the key/value so that order and multi-table data can be verified.
+   */
+  @Test
+  public void testMap() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    Connector c = getConnector();
+    c.tableOperations().create(table1);
+    c.tableOperations().create(table2);
+    BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+    BatchWriter bw2 = c.createBatchWriter(table2, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation t1m = new Mutation(new Text(String.format("%s_%09x", table1, i + 1)));
+      t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table1, i).getBytes()));
+      bw.addMutation(t1m);
+      Mutation t2m = new Mutation(new Text(String.format("%s_%09x", table2, i + 1)));
+      t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table2, i).getBytes()));
+      bw2.addMutation(t2m);
+    }
+    bw.close();
+    bw2.close();
+
+    MRTester.main(new String[] {table1, table2});
+    assertNull(e1);
+    assertNull(e2);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java
new file mode 100644
index 0000000..c9307ae
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+public class AccumuloOutputFormatIT extends AccumuloClusterHarness {
+  private static AssertionError e1 = null;
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        context.write(new Text(), m);
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 2) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <inputtable> <outputtable>");
+      }
+
+      String user = getAdminPrincipal();
+      AuthenticationToken pass = getAdminToken();
+      String table1 = args[0];
+      String table2 = args[1];
+
+      Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.setConnectorInfo(job, user, pass);
+      AccumuloInputFormat.setInputTableName(job, table1);
+      AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.setConnectorInfo(job, user, pass);
+      AccumuloOutputFormat.setCreateTables(job, false);
+      AccumuloOutputFormat.setDefaultTableName(job, table2);
+      AccumuloOutputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void testMR() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    Connector c = getConnector();
+    c.tableOperations().create(table1);
+    c.tableOperations().create(table2);
+    BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    MRTester.main(new String[] {table1, table2});
+    assertNull(e1);
+
+    Scanner scanner = c.createScanner(table2, new Authorizations());
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    assertTrue(iter.hasNext());
+    Entry<Key,Value> entry = iter.next();
+    assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+    assertFalse(iter.hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloRowInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloRowInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloRowInputFormatIT.java
new file mode 100644
index 0000000..617a8c2
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloRowInputFormatIT.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AccumuloRowInputFormatIT extends AccumuloClusterHarness {
+
+  private static final String ROW1 = "row1";
+  private static final String ROW2 = "row2";
+  private static final String ROW3 = "row3";
+  private static final String COLF1 = "colf1";
+  private static List<Entry<Key,Value>> row1;
+  private static List<Entry<Key,Value>> row2;
+  private static List<Entry<Key,Value>> row3;
+  private static AssertionError e1 = null;
+  private static AssertionError e2 = null;
+
+  @BeforeClass
+  public static void prepareRows() {
+    row1 = new ArrayList<Entry<Key,Value>>();
+    row1.add(new KeyValue(new Key(ROW1, COLF1, "colq1"), "v1".getBytes()));
+    row1.add(new KeyValue(new Key(ROW1, COLF1, "colq2"), "v2".getBytes()));
+    row1.add(new KeyValue(new Key(ROW1, "colf2", "colq3"), "v3".getBytes()));
+    row2 = new ArrayList<Entry<Key,Value>>();
+    row2.add(new KeyValue(new Key(ROW2, COLF1, "colq4"), "v4".getBytes()));
+    row3 = new ArrayList<Entry<Key,Value>>();
+    row3.add(new KeyValue(new Key(ROW3, COLF1, "colq5"), "v5".getBytes()));
+  }
+
+  private static void checkLists(final List<Entry<Key,Value>> first, final Iterator<Entry<Key,Value>> second) {
+    int entryIndex = 0;
+    while (second.hasNext()) {
+      final Entry<Key,Value> entry = second.next();
+      assertEquals("Keys should be equal", first.get(entryIndex).getKey(), entry.getKey());
+      assertEquals("Values should be equal", first.get(entryIndex).getValue(), entry.getValue());
+      entryIndex++;
+    }
+  }
+
+  private static void insertList(final BatchWriter writer, final List<Entry<Key,Value>> list) throws MutationsRejectedException {
+    for (Entry<Key,Value> e : list) {
+      final Key key = e.getKey();
+      final Mutation mutation = new Mutation(key.getRow());
+      ColumnVisibility colVisibility = new ColumnVisibility(key.getColumnVisibility());
+      mutation.put(key.getColumnFamily(), key.getColumnQualifier(), colVisibility, key.getTimestamp(), e.getValue());
+      writer.addMutation(mutation);
+    }
+  }
+
+  private static class MRTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Text,PeekingIterator<Entry<Key,Value>>,Key,Value> {
+      int count = 0;
+
+      @Override
+      protected void map(Text k, PeekingIterator<Entry<Key,Value>> v, Context context) throws IOException, InterruptedException {
+        try {
+          switch (count) {
+            case 0:
+              assertEquals("Current key should be " + ROW1, new Text(ROW1), k);
+              checkLists(row1, v);
+              break;
+            case 1:
+              assertEquals("Current key should be " + ROW2, new Text(ROW2), k);
+              checkLists(row2, v);
+              break;
+            case 2:
+              assertEquals("Current key should be " + ROW3, new Text(ROW3), k);
+              checkLists(row3, v);
+              break;
+            default:
+              assertTrue(false);
+          }
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+          assertEquals(3, count);
+        } catch (AssertionError e) {
+          e2 = e;
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 1) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table>");
+      }
+
+      String user = getAdminPrincipal();
+      AuthenticationToken pass = getAdminToken();
+      String table = args[0];
+
+      Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloRowInputFormat.class);
+
+      AccumuloInputFormat.setConnectorInfo(job, user, pass);
+      AccumuloInputFormat.setInputTableName(job, table);
+      AccumuloRowInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTester(), args));
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    final Connector conn = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    conn.tableOperations().create(tableName);
+    BatchWriter writer = null;
+    try {
+      writer = conn.createBatchWriter(tableName, new BatchWriterConfig());
+      insertList(writer, row1);
+      insertList(writer, row2);
+      insertList(writer, row3);
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+    MRTester.main(new String[] {tableName});
+    assertNull(e1);
+    assertNull(e2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
new file mode 100644
index 0000000..f5deebb
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TokenFileIT extends AccumuloClusterHarness {
+  private static AssertionError e1 = null;
+
+  private static class MRTokenFileTester extends Configured implements Tool {
+    private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
+      Key key = null;
+      int count = 0;
+
+      @Override
+      protected void map(Key k, Value v, Context context) throws IOException, InterruptedException {
+        try {
+          if (key != null)
+            assertEquals(key.getRow().toString(), new String(v.get()));
+          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+          assertEquals(new String(v.get()), String.format("%09x", count));
+        } catch (AssertionError e) {
+          e1 = e;
+        }
+        key = new Key(k);
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        Mutation m = new Mutation("total");
+        m.put("", "", Integer.toString(count));
+        context.write(new Text(), m);
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+      if (args.length != 3) {
+        throw new IllegalArgumentException("Usage : " + MRTokenFileTester.class.getName() + " <token file> <inputtable> <outputtable>");
+      }
+
+      String user = getAdminPrincipal();
+      String tokenFile = args[0];
+      String table1 = args[1];
+      String table2 = args[2];
+
+      Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(AccumuloInputFormat.class);
+
+      AccumuloInputFormat.setConnectorInfo(job, user, tokenFile);
+      AccumuloInputFormat.setInputTableName(job, table1);
+      AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      job.setMapperClass(TestMapper.class);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(AccumuloOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Mutation.class);
+
+      AccumuloOutputFormat.setConnectorInfo(job, user, tokenFile);
+      AccumuloOutputFormat.setCreateTables(job, false);
+      AccumuloOutputFormat.setDefaultTableName(job, table2);
+      AccumuloOutputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static void main(String[] args) throws Exception {
+      Configuration conf = CachedConfiguration.getInstance();
+      conf.set("hadoop.tmp.dir", new File(args[0]).getParent());
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(), args));
+    }
+  }
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @Test
+  public void testMR() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    Connector c = getConnector();
+    c.tableOperations().create(table1);
+    c.tableOperations().create(table2);
+    BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    File tf = folder.newFile("root_test.pw");
+    PrintStream out = new PrintStream(tf);
+    String outString = new Credentials(getAdminPrincipal(), getAdminToken()).serialize();
+    out.println(outString);
+    out.close();
+
+    MRTokenFileTester.main(new String[] {tf.getAbsolutePath(), table1, table2});
+    assertNull(e1);
+
+    Scanner scanner = c.createScanner(table2, new Authorizations());
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    assertTrue(iter.hasNext());
+    Entry<Key,Value> entry = iter.next();
+    assertEquals(Integer.parseInt(new String(entry.getValue().get())), 100);
+    assertFalse(iter.hasNext());
+  }
+}


Mime
View raw message