incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [04/11] Blur MR projects restructured.
Date Thu, 01 May 2014 20:49:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
new file mode 100644
index 0000000..811dba5
--- /dev/null
+++ b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -0,0 +1,427 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.TreeSet;
+
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TestMapperReducerCleanup.TrackingTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.lucene.index.DirectoryReader;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurOutputFormatTest {
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  private static MiniMRCluster mr;
+  private static Path TEST_ROOT_DIR;
+  private static JobConf jobConf;
+  private Path outDir = new Path(TEST_ROOT_DIR + "/out");
+  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    System.setProperty("test.build.data", "./target/BlurOutputFormatTest/data");
+    TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "target/tmp/BlurOutputFormatTest_tmp"));
+    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+    mr = new MiniMRCluster(1, "file:///", 1);
+    jobConf = mr.createJobConf();
+    BufferStore.initNewBuffer(128, 128 * 128);
+  }
+
+  @AfterClass
+  public static void teardown() {
+    if (mr != null) {
+      mr.shutdown();
+    }
+    rm(new File("build"));
+  }
+
+  private static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Before
+  public void setup() {
+    TableContext.clear();
+  }
+
+  @Test
+  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException {
+    localFs.delete(inDir, true);
+    localFs.delete(outDir, true);
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+    assertEquals(2, reader.numDocs());
+    reader.close();
+  }
+
+  private Collection<Path> getCommitedTasks(Path path) throws IOException {
+    Collection<Path> result = new TreeSet<Path>();
+    FileSystem fileSystem = path.getFileSystem(jobConf);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus fileStatus : listStatus) {
+      Path p = fileStatus.getPath();
+      if (fileStatus.isDir() && p.getName().endsWith(".commit")) {
+        result.add(p);
+      }
+    }
+    return result;
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, true);
+    BlurOutputFormat.setOptimizeInFlight(job, false);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+    assertEquals(80000, reader.numDocs());
+    reader.close();
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 2);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, false);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertEquals(1, commitedTasks.size());
+
+      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+      total += reader.numDocs();
+      reader.close();
+    }
+    assertEquals(80000, total);
+
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersWithReduceMultiplierTest() throws IOException,
+      InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(7);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 7);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    int multiple = 2;
+    BlurOutputFormat.setReducerMultiplier(job, multiple);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertTrue(multiple >= commitedTasks.size());
+      for (Path p : commitedTasks) {
+        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, p));
+        total += reader.numDocs();
+        reader.close();
+      }
+    }
+    assertEquals(80000, total);
+
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlurOutputFormatValidateReducerCount() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setReducerMultiplier(job, 2);
+    job.setNumReduceTasks(4);
+    job.submit();
+
+  }
+
+  // @TODO this test to fail sometimes due to issues in the MR MiniCluster
+  // @Test
+  public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 5000, 2000, 100, "cf1"); // 100 * 5000 =
+                                                             // 500,000
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 2);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, false);
+
+    job.submit();
+    boolean killCalled = false;
+    while (!job.isComplete()) {
+      Thread.sleep(1000);
+      System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress() * 100,
+          job.reduceProgress() * 100);
+      if (job.reduceProgress() > 0.7 && !killCalled) {
+        job.killJob();
+        killCalled = true;
+      }
+    }
+
+    assertFalse(job.isSuccessful());
+
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      assertEquals(toString(listStatus), 0, listStatus.length);
+    }
+  }
+
+  private String toString(FileStatus[] listStatus) {
+    if (listStatus == null || listStatus.length == 0) {
+      return "";
+    }
+    String s = "";
+    for (FileStatus fileStatus : listStatus) {
+      if (s.length() > 0) {
+        s += ",";
+      }
+      s += fileStatus.getPath();
+    }
+    return s;
+  }
+
+  public static String readFile(String name) throws IOException {
+    DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+      result.append(line);
+      result.append('\n');
+      line = b.readLine();
+    }
+    b.close();
+    return result.toString();
+  }
+
+  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
+      int numberOfRecords, String family) throws IOException {
+    // "1,1,cf1,val1"
+    Path file = new Path(TEST_ROOT_DIR + "/" + name);
+    localFs.delete(file, false);
+    DataOutputStream f = localFs.create(file);
+    PrintWriter writer = new PrintWriter(f);
+    for (int row = 0; row < numberOfRows; row++) {
+      for (int record = 0; record < numberOfRecords; record++) {
+        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
+      }
+    }
+    writer.close();
+    return file;
+  }
+
+  private void createShardDirectories(Path outDir, int shardCount) throws IOException {
+    localFs.mkdirs(outDir);
+    for (int i = 0; i < shardCount; i++) {
+      localFs.mkdirs(new Path(outDir, BlurUtil.getShardName(i)));
+    }
+  }
+
+  private String getRecord(int rowId, int recordId, String family) {
+    return rowId + "," + recordId + "," + family + ",valuetoindex";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
new file mode 100644
index 0000000..ec3239e
--- /dev/null
+++ b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.blur.mapreduce.lib.CsvBlurDriver.ControllerPool;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+public class CsvBlurDriverTest {
+
+  protected String tableUri = "file:///tmp/tmppath";
+  protected int shardCount = 13;
+
+  @Test
+  public void testCsvBlurDriverTestFail1() throws Exception {
+    Configuration configuration = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return null;
+      }
+    };
+    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, new String[] {}));
+  }
+
+  @Test
+  public void testCsvBlurDriverTest() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010", "-d", "family1", "col1",
+        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i", "file:///tmp/test2");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+  }
+
+  @Test
+  public void testCsvBlurDriverTest2() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010", "-d", "family1", "col1",
+        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i", "file:///tmp/test2",
+        "-S", "-C", "1000000", "2000000");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+  }
+
+  @Test
+  public void testCsvBlurDriverTest3() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010", "-d", "family1", "col1",
+        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i", "file:///tmp/test2",
+        "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+    assertEquals("true", configuration.get(CsvBlurDriver.MAPRED_COMPRESS_MAP_OUTPUT));
+    assertEquals(SnappyCodec.class.getName(), configuration.get(CsvBlurDriver.MAPRED_MAP_OUTPUT_COMPRESSION_CODEC));
+  }
+
+  protected Iface getMockIface() {
+    InvocationHandler handler = new InvocationHandler() {
+
+      @Override
+      public Object invoke(Object o, Method method, Object[] args) throws Throwable {
+        if (method.getName().equals("describe")) {
+          TableDescriptor tableDescriptor = new TableDescriptor();
+          tableDescriptor.setName((String) args[0]);
+          tableDescriptor.setTableUri(tableUri);
+          tableDescriptor.setShardCount(shardCount);
+          return tableDescriptor;
+        }
+        throw new RuntimeException("not implemented.");
+      }
+    };
+    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class }, handler);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
new file mode 100644
index 0000000..47aa8e5
--- /dev/null
+++ b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
@@ -0,0 +1,108 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CsvBlurMapperTest {
+
+  private MapDriver<Writable, Text, Text, BlurMutate> _mapDriver;
+  private CsvBlurMapper _mapper;
+
+  @Before
+  public void setUp() throws IOException {
+    _mapper = new CsvBlurMapper();
+    _mapDriver = MapDriver.newMapDriver(_mapper);
+  }
+
+  @Test
+  public void testMapperWithFamilyInData() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+
+  @Test
+  public void testMapperFamilyPerPath() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+  
+  @Test
+  public void testMapperAutoGenerateRecordId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "-25nqln3n2vb4cayex9y9tpxx3", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+  
+  @Test
+  public void testMapperAutoGenerateRowId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("record1,value1,value2"));
+    _mapDriver.withOutput(new Text("-50b4uzohynr7j7s9pve7ytz66"), new BlurMutate(MUTATE_TYPE.REPLACE, "-50b4uzohynr7j7s9pve7ytz66", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+  
+  @Test
+  public void testMapperAutoGenerateRowIdAndRecordId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("value1,value2"));
+    _mapDriver.withOutput(new Text("5q0tme15ph3h5pns8sv3u5wy2"), new BlurMutate(MUTATE_TYPE.REPLACE, "5q0tme15ph3h5pns8sv3u5wy2", "5q0tme15ph3h5pns8sv3u5wy2", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/pom.xml b/blur-mapred-hadoop2/pom.xml
new file mode 100644
index 0000000..e541417
--- /dev/null
+++ b/blur-mapred-hadoop2/pom.xml
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.blur</groupId>
+		<artifactId>blur</artifactId>
+		<version>0.2.2-incubating-SNAPSHOT</version>
+		<relativePath>../pom.xml</relativePath>
+	</parent>
+	<groupId>org.apache.blur</groupId>
+	<artifactId>blur-mapred-hadoop2</artifactId>
+	<version>${projectVersion}</version>
+	<packaging>jar</packaging>
+	<name>Blur Map Reduce Hadoop2</name>
+	<description>The Blur Map Reduce Hadoop2 module contains the testsuite for Hadoop2.</description>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-mapred-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-util</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>${log4j.version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.mail</groupId>
+					<artifactId>mail</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+	</dependencies>
+
+	<repositories>
+		<repository>
+			<id>libdir</id>
+			<url>file://${basedir}/../lib</url>
+		</repository>
+	</repositories>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<argLine>-XX:+UseConcMarkSweepGC -Xmx1g -Xms1g</argLine>
+					<forkCount>2</forkCount>
+					<forkMode>always</forkMode>
+					<reuseForks>false</reuseForks>
+					<systemPropertyVariables>
+						<blur.tmp.dir>${project.build.directory}/target/tmp</blur.tmp.dir>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+	
+	
+	<profiles>
+		<profile>
+			<id>hadoop-2.2</id>
+			<activation>
+				<property>
+					<name>hadoop2</name>
+				</property>
+			</activation>
+			<properties>
+				<projectVersion>${project.parent.version}-hadoop2</projectVersion>
+			</properties>
+			<dependencies>
+				<dependency>
+				      <groupId>org.apache.mrunit</groupId>
+				      <artifactId>mrunit</artifactId>
+				      <version>${mrunit.version}</version>
+				      <classifier>hadoop1</classifier>
+					  <scope>test</scope>
+                </dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-client</artifactId>
+					<version>${hadoop.version}</version>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
new file mode 100644
index 0000000..c14e86e
--- /dev/null
+++ b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
@@ -0,0 +1,229 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.GCWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurOutputFormatMiniClusterTest {
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem fileSystem;
+  private static MiniMRCluster mr;
+  private static Path TEST_ROOT_DIR;
+  private static JobConf jobConf;
+  private static MiniCluster miniCluster;
+  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
+  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir",
+      "./target/tmp_BlurOutputFormatMiniClusterTest"));
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    GCWatcher.init(0.60);
+    LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
+    File testDirectory = new File(TMPDIR, "blur-cluster-test").getAbsoluteFile();
+    testDirectory.mkdirs();
+
+    Path directory = new Path(testDirectory.getPath());
+    FsPermission dirPermissions = localFS.getFileStatus(directory).getPermission();
+    FsAction userAction = dirPermissions.getUserAction();
+    FsAction groupAction = dirPermissions.getGroupAction();
+    FsAction otherAction = dirPermissions.getOtherAction();
+
+    StringBuilder builder = new StringBuilder();
+    builder.append(userAction.ordinal());
+    builder.append(groupAction.ordinal());
+    builder.append(otherAction.ordinal());
+    String dirPermissionNum = builder.toString();
+    System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
+    testDirectory.delete();
+    miniCluster = new MiniCluster();
+    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true);
+
+    // System.setProperty("test.build.data",
+    // "./target/BlurOutputFormatTest/data");
+    // TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+    // "target/tmp/BlurOutputFormatTest_tmp"));
+    TEST_ROOT_DIR = new Path(miniCluster.getFileSystemUri().toString() + "/blur_test");
+    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
+    try {
+      fileSystem = TEST_ROOT_DIR.getFileSystem(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+    mr = new MiniMRCluster(1, miniCluster.getFileSystemUri().toString(), 1);
+    jobConf = mr.createJobConf();
+    BufferStore.initNewBuffer(128, 128 * 128);
+  }
+
+  @AfterClass
+  public static void teardown() {
+    if (mr != null) {
+      mr.shutdown();
+    }
+    miniCluster.shutdownBlurCluster();
+    rm(new File("build"));
+  }
+
+  private static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Before
+  public void setup() {
+    TableContext.clear();
+  }
+
+  @Test
+  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException, BlurException,
+      TException {
+    fileSystem.delete(inDir, true);
+    String tableName = "testBlurOutputFormat";
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatMiniClusterTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/blur/" + tableName).toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName(tableName);
+
+    Iface client = getClient();
+    client.createTable(tableDescriptor);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path tablePath = new Path(tableUri);
+    Path shardPath = new Path(tablePath, BlurUtil.getShardName(0));
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath);
+    assertEquals(3, listStatus.length);
+    System.out.println("======" + listStatus.length);
+    for (FileStatus fileStatus : listStatus) {
+      System.out.println(fileStatus.getPath());
+    }
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    while (true) {
+      TableStats tableStats = client.tableStats(tableName);
+      System.out.println(tableStats);
+      if (tableStats.getRowCount() > 0) {
+        break;
+      }
+      Thread.sleep(5000);
+    }
+
+    assertTrue(fileSystem.exists(tablePath));
+    assertFalse(fileSystem.isFile(tablePath));
+
+    FileStatus[] listStatusAfter = fileSystem.listStatus(shardPath);
+
+    assertEquals(11, listStatusAfter.length);
+
+  }
+
+  private Iface getClient() {
+    return BlurClient.getClient(miniCluster.getControllerConnectionStr());
+  }
+
+  public static String readFile(String name) throws IOException {
+    DataInputStream f = fileSystem.open(new Path(TEST_ROOT_DIR + "/" + name));
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+      result.append(line);
+      result.append('\n');
+      line = b.readLine();
+    }
+    b.close();
+    return result.toString();
+  }
+
+  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
+      int numberOfRecords, String family) throws IOException {
+    // "1,1,cf1,val1"
+    Path file = new Path(TEST_ROOT_DIR + "/" + name);
+    fileSystem.delete(file, false);
+    DataOutputStream f = fileSystem.create(file);
+    PrintWriter writer = new PrintWriter(f);
+    for (int row = 0; row < numberOfRows; row++) {
+      for (int record = 0; record < numberOfRecords; record++) {
+        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
+      }
+    }
+    writer.close();
+    return file;
+  }
+
+  private String getRecord(int rowId, int recordId, String family) {
+    return rowId + "," + recordId + "," + family + ",valuetoindex";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
new file mode 100644
index 0000000..d8f4f53
--- /dev/null
+++ b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -0,0 +1,431 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.Collection;
+import java.util.TreeSet;
+
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
+import org.apache.hadoop.mapred.MiniMRYarnClusterAdapter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TestMapperReducerCleanup.TrackingTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.lucene.index.DirectoryReader;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurOutputFormatTest {
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  private static Path TEST_ROOT_DIR;
+  private static MiniMRYarnClusterAdapter mr;
+  private Path outDir = new Path(TEST_ROOT_DIR + "/out");
+  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    System.setProperty("test.build.data", "./target/BlurOutputFormatTest/data");
+    TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "target/tmp/BlurOutputFormatTest_tmp"));
+    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+
+    FileSystem.setDefaultUri(conf, new URI("file:///"));
+    mr = (MiniMRYarnClusterAdapter) MiniMRClientClusterFactory.create(BlurOutputFormatTest.class, 1, conf);
+    mr.start();
+    conf = mr.getConfig();
+
+    BufferStore.initNewBuffer(128, 128 * 128);
+  }
+
+  @AfterClass
+  public static void teardown() throws IOException {
+    if (mr != null) {
+      mr.stop();
+    }
+    rm(new File("build"));
+  }
+
+  private static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Before
+  public void setup() {
+    TableContext.clear();
+  }
+
+  @Test
+  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException {
+    localFs.delete(inDir, true);
+    localFs.delete(outDir, true);
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+    assertEquals(2, reader.numDocs());
+    reader.close();
+  }
+
+  private Collection<Path> getCommitedTasks(Path path) throws IOException {
+    Collection<Path> result = new TreeSet<Path>();
+    FileSystem fileSystem = path.getFileSystem(conf);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus fileStatus : listStatus) {
+      Path p = fileStatus.getPath();
+      if (!fileStatus.isFile() && p.getName().endsWith(".commit")) {
+        result.add(p);
+      }
+    }
+    return result;
+  }
+
+//  @Test
+  public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, true);
+    BlurOutputFormat.setOptimizeInFlight(job, false);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+    assertEquals(80000, reader.numDocs());
+    reader.close();
+  }
+
+//  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 2);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, false);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertEquals(1, commitedTasks.size());
+
+      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+      total += reader.numDocs();
+      reader.close();
+    }
+    assertEquals(80000, total);
+
+  }
+
+//  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersWithReduceMultiplierTest() throws IOException,
+      InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(7);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 7);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    int multiple = 2;
+    BlurOutputFormat.setReducerMultiplier(job, multiple);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertTrue(multiple >= commitedTasks.size());
+      for (Path p : commitedTasks) {
+        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, p));
+        total += reader.numDocs();
+        reader.close();
+      }
+    }
+    assertEquals(80000, total);
+
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlurOutputFormatValidateReducerCount() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setReducerMultiplier(job, 2);
+    job.setNumReduceTasks(4);
+    job.submit();
+
+  }
+
+  // @TODO this test to fail sometimes due to issues in the MR MiniCluster
+  // @Test
+  public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 5000, 2000, 100, "cf1"); // 100 * 5000 =
+                                                             // 500,000
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 2);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, false);
+
+    job.submit();
+    boolean killCalled = false;
+    while (!job.isComplete()) {
+      Thread.sleep(1000);
+      System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress() * 100,
+          job.reduceProgress() * 100);
+      if (job.reduceProgress() > 0.7 && !killCalled) {
+        job.killJob();
+        killCalled = true;
+      }
+    }
+
+    assertFalse(job.isSuccessful());
+
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      assertEquals(toString(listStatus), 0, listStatus.length);
+    }
+  }
+
+  private String toString(FileStatus[] listStatus) {
+    if (listStatus == null || listStatus.length == 0) {
+      return "";
+    }
+    String s = "";
+    for (FileStatus fileStatus : listStatus) {
+      if (s.length() > 0) {
+        s += ",";
+      }
+      s += fileStatus.getPath();
+    }
+    return s;
+  }
+
+  public static String readFile(String name) throws IOException {
+    DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+      result.append(line);
+      result.append('\n');
+      line = b.readLine();
+    }
+    b.close();
+    return result.toString();
+  }
+
+  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
+      int numberOfRecords, String family) throws IOException {
+    // "1,1,cf1,val1"
+    Path file = new Path(TEST_ROOT_DIR + "/" + name);
+    localFs.delete(file, false);
+    DataOutputStream f = localFs.create(file);
+    PrintWriter writer = new PrintWriter(f);
+    for (int row = 0; row < numberOfRows; row++) {
+      for (int record = 0; record < numberOfRecords; record++) {
+        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
+      }
+    }
+    writer.close();
+    return file;
+  }
+
+  private void createShardDirectories(Path outDir, int shardCount) throws IOException {
+    localFs.mkdirs(outDir);
+    for (int i = 0; i < shardCount; i++) {
+      localFs.mkdirs(new Path(outDir, BlurUtil.getShardName(i)));
+    }
+  }
+
+  private String getRecord(int rowId, int recordId, String family) {
+    return rowId + "," + recordId + "," + family + ",valuetoindex";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
new file mode 100644
index 0000000..ec3239e
--- /dev/null
+++ b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.blur.mapreduce.lib.CsvBlurDriver.ControllerPool;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+public class CsvBlurDriverTest {
+
+  protected String tableUri = "file:///tmp/tmppath";
+  protected int shardCount = 13;
+
+  @Test
+  public void testCsvBlurDriverTestFail1() throws Exception {
+    Configuration configuration = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return null;
+      }
+    };
+    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, new String[] {}));
+  }
+
+  @Test
+  public void testCsvBlurDriverTest() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010", "-d", "family1", "col1",
+        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i", "file:///tmp/test2");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+  }
+
+  @Test
+  public void testCsvBlurDriverTest2() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010", "-d", "family1", "col1",
+        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i", "file:///tmp/test2",
+        "-S", "-C", "1000000", "2000000");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+  }
+
+  @Test
+  public void testCsvBlurDriverTest3() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010", "-d", "family1", "col1",
+        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i", "file:///tmp/test2",
+        "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+    assertEquals("true", configuration.get(CsvBlurDriver.MAPRED_COMPRESS_MAP_OUTPUT));
+    assertEquals(SnappyCodec.class.getName(), configuration.get(CsvBlurDriver.MAPRED_MAP_OUTPUT_COMPRESSION_CODEC));
+  }
+
+  protected Iface getMockIface() {
+    InvocationHandler handler = new InvocationHandler() {
+
+      @Override
+      public Object invoke(Object o, Method method, Object[] args) throws Throwable {
+        if (method.getName().equals("describe")) {
+          TableDescriptor tableDescriptor = new TableDescriptor();
+          tableDescriptor.setName((String) args[0]);
+          tableDescriptor.setTableUri(tableUri);
+          tableDescriptor.setShardCount(shardCount);
+          return tableDescriptor;
+        }
+        throw new RuntimeException("not implemented.");
+      }
+    };
+    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class }, handler);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
new file mode 100644
index 0000000..47aa8e5
--- /dev/null
+++ b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
@@ -0,0 +1,108 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CsvBlurMapperTest {
+
+  private MapDriver<Writable, Text, Text, BlurMutate> _mapDriver;
+  private CsvBlurMapper _mapper;
+
+  @Before
+  public void setUp() throws IOException {
+    _mapper = new CsvBlurMapper();
+    _mapDriver = MapDriver.newMapDriver(_mapper);
+  }
+
+  @Test
+  public void testMapperWithFamilyInData() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+
+  @Test
+  public void testMapperFamilyPerPath() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+  
+  @Test
+  public void testMapperAutoGenerateRecordId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "-25nqln3n2vb4cayex9y9tpxx3", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+  
+  @Test
+  public void testMapperAutoGenerateRowId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("record1,value1,value2"));
+    _mapDriver.withOutput(new Text("-50b4uzohynr7j7s9pve7ytz66"), new BlurMutate(MUTATE_TYPE.REPLACE, "-50b4uzohynr7j7s9pve7ytz66", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+  
+  @Test
+  public void testMapperAutoGenerateRowIdAndRecordId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("value1,value2"));
+    _mapDriver.withOutput(new Text("5q0tme15ph3h5pns8sv3u5wy2"), new BlurMutate(MUTATE_TYPE.REPLACE, "5q0tme15ph3h5pns8sv3u5wy2", "5q0tme15ph3h5pns8sv3u5wy2", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/Test.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/Test.java b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/Test.java
new file mode 100644
index 0000000..5999f80
--- /dev/null
+++ b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/Test.java
@@ -0,0 +1,38 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.MiniHadoopClusterManager;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class Test {
+
+  public static void main(String[] args) throws FileNotFoundException, IOException, URISyntaxException {
+    MiniHadoopClusterManager manager = new MiniHadoopClusterManager();
+    String[] sargs = new String[]{"-D" + MiniDFSCluster.HDFS_MINIDFS_BASEDIR + "=./dfs-mini-tmp"};
+    manager.run(sargs);
+    manager.start();
+  
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/pom.xml
----------------------------------------------------------------------
diff --git a/blur-mapred/pom.xml b/blur-mapred/pom.xml
deleted file mode 100644
index 3afeaf3..0000000
--- a/blur-mapred/pom.xml
+++ /dev/null
@@ -1,203 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.blur</groupId>
-		<artifactId>blur</artifactId>
-		<version>0.2.2-incubating-SNAPSHOT</version>
-		<relativePath>../pom.xml</relativePath>
-	</parent>
-	<groupId>org.apache.blur</groupId>
-	<artifactId>blur-mapred</artifactId>
-	<version>${projectVersion}</version>
-	<packaging>jar</packaging>
-	<name>Blur Map Reduce</name>
-	<description>The Blur Map Reduce module contains the BlurOutputFormat as well as a CSVLoader
-		program.</description>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.zookeeper</groupId>
-			<artifactId>zookeeper</artifactId>
-			<version>${zookeeper.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-core</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-store</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-util</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-util</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>log4j</groupId>
-			<artifactId>log4j</artifactId>
-			<version>${log4j.version}</version>
-			<scope>provided</scope>
-			<exclusions>
-				<exclusion>
-					<groupId>javax.mail</groupId>
-					<artifactId>mail</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>javax.jms</groupId>
-					<artifactId>jms</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.sun.jdmk</groupId>
-					<artifactId>jmxtools</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.sun.jmx</groupId>
-					<artifactId>jmxri</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-	</dependencies>
-
-	<repositories>
-		<repository>
-			<id>libdir</id>
-			<url>file://${basedir}/../lib</url>
-		</repository>
-	</repositories>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<argLine>-XX:+UseConcMarkSweepGC -Xmx1g -Xms1g</argLine>
-					<forkCount>2</forkCount>
-					<forkMode>always</forkMode>
-					<reuseForks>false</reuseForks>
-					<systemPropertyVariables>
-						<blur.tmp.dir>${project.build.directory}/target/tmp</blur.tmp.dir>
-					</systemPropertyVariables>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<configuration>
-					<source>1.6</source>
-					<target>1.6</target>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-	
-	
-	<profiles>
-		<profile>
-			<id>hadoop-1x</id>
-			<activation>
-				<property>
-					<name>hadoop1</name>
-				</property>
-			</activation>
-			<properties>
-				<projectVersion>${project.parent.version}-hadoop1</projectVersion>
-			</properties>
-			<dependencies>
-				<dependency>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-test</artifactId>
-					<version>${hadoop.version}</version>
-					<scope>test</scope>
-				</dependency>
-				<dependency>
-				      <groupId>org.apache.mrunit</groupId>
-				      <artifactId>mrunit</artifactId>
-				      <version>${mrunit.version}</version>
-				      <classifier>hadoop1</classifier>
-					  <scope>test</scope>
-                </dependency>
-			</dependencies>
-		</profile>
-		<profile>
-			<id>hadoop-2.2</id>
-			<activation>
-				<property>
-					<name>hadoop2</name>
-				</property>
-			</activation>
-			<properties>
-				<projectVersion>${project.parent.version}-hadoop2</projectVersion>
-			</properties>
-			<dependencies>
-				<dependency>
-				      <groupId>org.apache.mrunit</groupId>
-				      <artifactId>mrunit</artifactId>
-				      <version>${mrunit.version}</version>
-				      <classifier>hadoop1</classifier>
-					  <scope>test</scope>
-                </dependency>
-				<dependency>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-client</artifactId>
-					<version>${hadoop.version}</version>
-				</dependency>
-				<dependency>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-minicluster</artifactId>
-					<version>${hadoop.version}</version>
-					<scope>test</scope>
-				</dependency>
-			</dependencies>
-		</profile>
-	</profiles>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java b/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
deleted file mode 100644
index 8294738..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package org.apache.blur.mapred;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.mapreduce.lib.BlurOutputFormat;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.TaskAttemptID;
-
-public abstract class AbstractOutputCommitter extends OutputCommitter {
-
-  private final static Log LOG = LogFactory.getLog(AbstractOutputCommitter.class);
-
-  @Override
-  public void setupJob(JobContext jobContext) throws IOException {
-
-  }
-
-  @Override
-  public void commitJob(JobContext jobContext) throws IOException {
-    // look through all the shards for attempts that need to be cleaned up.
-    // also find all the attempts that are finished
-    // then rename all the attempts jobs to commits
-    LOG.info("Commiting Job [{0}]", jobContext.getJobID());
-    Configuration configuration = jobContext.getConfiguration();
-    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
-    makeSureNoEmptyShards(configuration, tableOutput);
-    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
-      if (isShard(fileStatus)) {
-        commitOrAbortJob(jobContext, fileStatus.getPath(), true);
-      }
-    }
-
-  }
-
-  private void makeSureNoEmptyShards(Configuration configuration, Path tableOutput) throws IOException {
-    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    int shardCount = tableDescriptor.getShardCount();
-    for (int i = 0; i < shardCount; i++) {
-      String shardName = BlurUtil.getShardName(i);
-      fileSystem.mkdirs(new Path(tableOutput, shardName));
-    }
-  }
-
-  private void commitOrAbortJob(JobContext jobContext, Path shardPath, boolean commit) throws IOException {
-    FileSystem fileSystem = shardPath.getFileSystem(jobContext.getConfiguration());
-    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        if (path.getName().endsWith(".task_complete")) {
-          return true;
-        }
-        return false;
-      }
-    });
-    for (FileStatus fileStatus : listStatus) {
-      Path path = fileStatus.getPath();
-      String name = path.getName();
-      boolean taskComplete = name.endsWith(".task_complete");
-      if (fileStatus.isDir()) {
-        String taskAttemptName = getTaskAttemptName(name);
-        if (taskAttemptName == null) {
-          LOG.info("Dir name [{0}] not task attempt", name);
-          continue;
-        }
-        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptName);
-        if (taskAttemptID.getJobID().equals(jobContext.getJobID())) {
-          if (commit) {
-            if (taskComplete) {
-              fileSystem.rename(path, new Path(shardPath, taskAttemptName + ".commit"));
-              LOG.info("Committing [{0}] in path [{1}]", taskAttemptID, path);
-            } else {
-              fileSystem.delete(path, true);
-              LOG.info("Deleteing tmp dir [{0}] in path [{1}]", taskAttemptID, path);
-            }
-          } else {
-            fileSystem.delete(path, true);
-            LOG.info("Deleteing aborted job dir [{0}] in path [{1}]", taskAttemptID, path);
-          }
-        }
-      }
-    }
-  }
-
-  private String getTaskAttemptName(String name) {
-    int lastIndexOf = name.lastIndexOf('.');
-    if (lastIndexOf < 0) {
-      return null;
-    }
-    return name.substring(0, lastIndexOf);
-  }
-
-  private boolean isShard(FileStatus fileStatus) {
-    return isShard(fileStatus.getPath());
-  }
-
-  private boolean isShard(Path path) {
-    return path.getName().startsWith(BlurConstants.SHARD_PREFIX);
-  }
-
-  @Override
-  public void abortJob(JobContext jobContext, int status) throws IOException {
-    LOG.info("Abort Job [{0}]", jobContext.getJobID());
-    Configuration configuration = jobContext.getConfiguration();
-    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
-    makeSureNoEmptyShards(configuration, tableOutput);
-    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
-      if (isShard(fileStatus)) {
-        commitOrAbortJob(jobContext, fileStatus.getPath(), false);
-      }
-    }
-  }
-
-  @Override
-  public void cleanupJob(JobContext context) throws IOException {
-
-  }
-
-}


Mime
View raw message