incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/5] git commit: Renaming the blur-mapred-common project back to blur-mapred and removing old hadoop1 and hadoop2 projects because they are no longer needed.
Date Sun, 12 Apr 2015 16:16:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java b/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
deleted file mode 100644
index 22d2b56..0000000
--- a/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
+++ /dev/null
@@ -1,238 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-
-import org.apache.blur.MiniCluster;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.store.buffer.BufferStore;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.utils.GCWatcher;
-import org.apache.blur.utils.ShardUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class BlurOutputFormatMiniClusterTest {
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem fileSystem;
-  private static Path TEST_ROOT_DIR;
-  private static MiniCluster miniCluster;
-  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
-  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir",
-      "./target/tmp_BlurOutputFormatMiniClusterTest"));
-
-  @BeforeClass
-  public static void setupTest() throws Exception {
-    GCWatcher.init(0.60);
-    BlurOutputFormatTest.setupJavaHome();
-    LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
-    File testDirectory = new File(TMPDIR, "blur-cluster-test").getAbsoluteFile();
-    testDirectory.mkdirs();
-
-    Path directory = new Path(testDirectory.getPath());
-    FsPermission dirPermissions = localFS.getFileStatus(directory).getPermission();
-    FsAction userAction = dirPermissions.getUserAction();
-    FsAction groupAction = dirPermissions.getGroupAction();
-    FsAction otherAction = dirPermissions.getOtherAction();
-
-    StringBuilder builder = new StringBuilder();
-    builder.append(userAction.ordinal());
-    builder.append(groupAction.ordinal());
-    builder.append(otherAction.ordinal());
-    String dirPermissionNum = builder.toString();
-    System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
-    testDirectory.delete();
-    miniCluster = new MiniCluster();
-    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true, false);
-
-    // System.setProperty("test.build.data",
-    // "./target/BlurOutputFormatTest/data");
-    // TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
-    // "target/tmp/BlurOutputFormatTest_tmp"));
-    TEST_ROOT_DIR = new Path(miniCluster.getFileSystemUri().toString() + "/blur_test");
-    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
-    try {
-      fileSystem = TEST_ROOT_DIR.getFileSystem(conf);
-    } catch (IOException io) {
-      throw new RuntimeException("problem getting local fs", io);
-    }
-
-    FileSystem.setDefaultUri(conf, miniCluster.getFileSystemUri());
-
-    miniCluster.startMrMiniCluster();
-    conf = miniCluster.getMRConfiguration();
-
-    BufferStore.initNewBuffer(128, 128 * 128);
-  }
-
-  @AfterClass
-  public static void teardown() throws IOException {
-    if (miniCluster != null) {
-      miniCluster.stopMrMiniCluster();
-    }
-    miniCluster.shutdownBlurCluster();
-    rm(new File("build"));
-  }
-
-  private static void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Before
-  public void setup() {
-    TableContext.clear();
-  }
-
-  @Test
-  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException, BlurException,
-      TException {
-    fileSystem.delete(inDir, true);
-    String tableName = "testBlurOutputFormat";
-    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
-    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
-
-    Job job = Job.getInstance(conf, "blur index");
-    job.setJarByClass(BlurOutputFormatMiniClusterTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/blur/" + tableName).makeQualified(fileSystem.getUri(),
-        fileSystem.getWorkingDirectory()).toString();
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
-    tableDescriptor.setName(tableName);
-
-    Iface client = getClient();
-    client.createTable(tableDescriptor);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
-    BlurOutputFormat.setOutputPath(job, output);
-
-    Path tablePath = new Path(tableUri);
-    Path shardPath = new Path(tablePath, ShardUtil.getShardName(0));
-    FileStatus[] listStatus = fileSystem.listStatus(shardPath);
-    assertEquals(3, listStatus.length);
-    System.out.println("======" + listStatus.length);
-    for (FileStatus fileStatus : listStatus) {
-      System.out.println(fileStatus.getPath());
-    }
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    client.loadData(tableName, output.toString());
-
-    while (true) {
-      TableStats tableStats = client.tableStats(tableName);
-      System.out.println(tableStats);
-      if (tableStats.getRowCount() > 0) {
-        break;
-      }
-      Thread.sleep(100);
-    }
-
-    assertTrue(fileSystem.exists(tablePath));
-    assertFalse(fileSystem.isFile(tablePath));
-
-    FileStatus[] listStatusAfter = fileSystem.listStatus(shardPath);
-
-    assertEquals(11, listStatusAfter.length);
-
-  }
-
-  private Iface getClient() {
-    return BlurClient.getClient(miniCluster.getControllerConnectionStr());
-  }
-
-  public static String readFile(String name) throws IOException {
-    DataInputStream f = fileSystem.open(new Path(TEST_ROOT_DIR + "/" + name));
-    BufferedReader b = new BufferedReader(new InputStreamReader(f));
-    StringBuilder result = new StringBuilder();
-    String line = b.readLine();
-    while (line != null) {
-      result.append(line);
-      result.append('\n');
-      line = b.readLine();
-    }
-    b.close();
-    return result.toString();
-  }
-
-  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
-      int numberOfRecords, String family) throws IOException {
-    // "1,1,cf1,val1"
-    Path file = new Path(TEST_ROOT_DIR + "/" + name);
-    fileSystem.delete(file, false);
-    DataOutputStream f = fileSystem.create(file);
-    PrintWriter writer = new PrintWriter(f);
-    for (int row = 0; row < numberOfRows; row++) {
-      for (int record = 0; record < numberOfRecords; record++) {
-        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
-      }
-    }
-    writer.close();
-    return file;
-  }
-
-  private String getRecord(int rowId, int recordId, String family) {
-    return rowId + "," + recordId + "," + family + ",valuetoindex";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java b/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
deleted file mode 100644
index c3ab723..0000000
--- a/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ /dev/null
@@ -1,465 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.Collection;
-import java.util.TreeSet;
-
-import org.apache.blur.MiniCluster;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.store.buffer.BufferStore;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.ShardUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.lucene.index.DirectoryReader;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class BlurOutputFormatTest {
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem localFs;
-  private static Path TEST_ROOT_DIR;
-  private static MiniCluster miniCluster;
-  private Path outDir = new Path(TEST_ROOT_DIR + "/out");
-  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
-
-  @BeforeClass
-  public static void setupTest() throws Exception {
-    setupJavaHome();
-    File file = new File("./target/tmp/BlurOutputFormatTest_tmp");
-    String pathStr = file.getAbsoluteFile().toURI().toString();
-    System.setProperty("test.build.data", pathStr + "/data");
-    System.setProperty("hadoop.log.dir", pathStr + "/hadoop_log");
-    try {
-      localFs = FileSystem.getLocal(conf);
-    } catch (IOException io) {
-      throw new RuntimeException("problem getting local fs", io);
-    }
-    TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", pathStr));
-
-    URI uri = new URI("file:///");
-    FileSystem.setDefaultUri(conf, uri);
-    
-    miniCluster = new MiniCluster();
-    miniCluster.startMrMiniCluster(uri.toString());
-    conf = miniCluster.getMRConfiguration();
-
-    BufferStore.initNewBuffer(128, 128 * 128);
-  }
-
-  public static void setupJavaHome() {
-    String str = System.getenv("JAVA_HOME");
-    if (str == null) {
-      String property = System.getProperty("java.home");
-      if (property != null) {
-        throw new RuntimeException("JAVA_HOME not set should probably be [" + property + "].");
-      }
-      throw new RuntimeException("JAVA_HOME not set.");
-    }
-  }
-
-  @AfterClass
-  public static void teardown() throws IOException {
-    if (miniCluster != null) {
-      miniCluster.stopMrMiniCluster();
-    }
-    rm(new File("build"));
-  }
-
-  private static void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Before
-  public void setup() {
-    TableContext.clear();
-  }
-
-  @Test
-  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException {
-    localFs.delete(inDir, true);
-    localFs.delete(outDir, true);
-    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
-    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
-
-    Job job = Job.getInstance(conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 1);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
-    BlurOutputFormat.setOutputPath(job, output);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    Path path = new Path(output, ShardUtil.getShardName(0));
-    dump(path, conf);
-    Collection<Path> commitedTasks = getCommitedTasks(path);
-    assertEquals(1, commitedTasks.size());
-    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
-    assertEquals(2, reader.numDocs());
-    reader.close();
-  }
-
-  private void dump(Path path, Configuration conf) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    System.out.println(path);
-    if (!fileSystem.isFile(path)) {
-      FileStatus[] listStatus = fileSystem.listStatus(path);
-      for (FileStatus fileStatus : listStatus) {
-        dump(fileStatus.getPath(), conf);
-      }
-    }
-  }
-
-  private Collection<Path> getCommitedTasks(Path path) throws IOException {
-    Collection<Path> result = new TreeSet<Path>();
-    FileSystem fileSystem = path.getFileSystem(conf);
-    FileStatus[] listStatus = fileSystem.listStatus(path);
-    for (FileStatus fileStatus : listStatus) {
-      Path p = fileStatus.getPath();
-      if (fileStatus.isDir() && p.getName().endsWith(".commit")) {
-        result.add(p);
-      }
-    }
-    return result;
-  }
-
-  @Test
-  public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException, ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
-
-    Job job = Job.getInstance(conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 1);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
-    BlurOutputFormat.setOutputPath(job, output);
-    BlurOutputFormat.setIndexLocally(job, true);
-    BlurOutputFormat.setOptimizeInFlight(job, false);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    Path path = new Path(output, ShardUtil.getShardName(0));
-    Collection<Path> commitedTasks = getCommitedTasks(path);
-    assertEquals(1, commitedTasks.size());
-
-    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
-    assertEquals(80000, reader.numDocs());
-    reader.close();
-  }
-
-  @Test
-  public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
-
-    Job job = Job.getInstance(conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 2);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
-    BlurOutputFormat.setOutputPath(job, output);
-    BlurOutputFormat.setIndexLocally(job, false);
-    BlurOutputFormat.setDocumentBufferStrategy(job, DocumentBufferStrategyHeapSize.class);
-    BlurOutputFormat.setMaxDocumentBufferHeapSize(job, 128 * 1024);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    long total = 0;
-    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(output, ShardUtil.getShardName(i));
-      Collection<Path> commitedTasks = getCommitedTasks(path);
-      assertEquals(1, commitedTasks.size());
-
-      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
-      total += reader.numDocs();
-      reader.close();
-    }
-    assertEquals(80000, total);
-
-  }
-
-  @Test
-  public void testBlurOutputFormatOverFlowMultipleReducersWithReduceMultiplierTest() throws IOException,
-      InterruptedException, ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
-
-    Job job = Job.getInstance(conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(7);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 7);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
-    BlurOutputFormat.setOutputPath(job, output);
-    int multiple = 2;
-    BlurOutputFormat.setReducerMultiplier(job, multiple);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    long total = 0;
-    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(output, ShardUtil.getShardName(i));
-      Collection<Path> commitedTasks = getCommitedTasks(path);
-      assertTrue(multiple >= commitedTasks.size());
-      for (Path p : commitedTasks) {
-        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, p));
-        total += reader.numDocs();
-        reader.close();
-      }
-    }
-    assertEquals(80000, total);
-
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testBlurOutputFormatValidateReducerCount() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
-    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
-
-    Job job = Job.getInstance(conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 1);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
-    BlurOutputFormat.setOutputPath(job, output);
-    BlurOutputFormat.setReducerMultiplier(job, 2);
-    job.setNumReduceTasks(4);
-    job.submit();
-
-  }
-
-  // @TODO this test to fail sometimes due to issues in the MR MiniCluster
-  // @Test
-  public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 5000, 2000, 100, "cf1"); // 100 * 5000 =
-                                                             // 500,000
-
-    Job job = Job.getInstance(conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 2);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
-    BlurOutputFormat.setOutputPath(job, output);
-    BlurOutputFormat.setIndexLocally(job, false);
-
-    job.submit();
-    boolean killCalled = false;
-    while (!job.isComplete()) {
-      Thread.sleep(1000);
-      System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress() * 100,
-          job.reduceProgress() * 100);
-      if (job.reduceProgress() > 0.7 && !killCalled) {
-        job.killJob();
-        killCalled = true;
-      }
-    }
-
-    assertFalse(job.isSuccessful());
-
-    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(output, ShardUtil.getShardName(i));
-      FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
-      FileStatus[] listStatus = fileSystem.listStatus(path);
-      assertEquals(toString(listStatus), 0, listStatus.length);
-    }
-  }
-
-  private String toString(FileStatus[] listStatus) {
-    if (listStatus == null || listStatus.length == 0) {
-      return "";
-    }
-    String s = "";
-    for (FileStatus fileStatus : listStatus) {
-      if (s.length() > 0) {
-        s += ",";
-      }
-      s += fileStatus.getPath();
-    }
-    return s;
-  }
-
-  public static String readFile(String name) throws IOException {
-    DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
-    BufferedReader b = new BufferedReader(new InputStreamReader(f));
-    StringBuilder result = new StringBuilder();
-    String line = b.readLine();
-    while (line != null) {
-      result.append(line);
-      result.append('\n');
-      line = b.readLine();
-    }
-    b.close();
-    return result.toString();
-  }
-
-  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
-      int numberOfRecords, String family) throws IOException {
-    // "1,1,cf1,val1"
-    Path file = new Path(TEST_ROOT_DIR + "/" + name);
-    localFs.delete(file, false);
-    DataOutputStream f = localFs.create(file);
-    PrintWriter writer = new PrintWriter(f);
-    for (int row = 0; row < numberOfRows; row++) {
-      for (int record = 0; record < numberOfRecords; record++) {
-        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
-      }
-    }
-    writer.close();
-    return file;
-  }
-
-  private void createShardDirectories(Path outDir, int shardCount) throws IOException {
-    localFs.mkdirs(outDir);
-    for (int i = 0; i < shardCount; i++) {
-      localFs.mkdirs(new Path(outDir, ShardUtil.getShardName(i)));
-    }
-  }
-
-  private String getRecord(int rowId, int recordId, String family) {
-    return rowId + "," + recordId + "," + family + ",valuetoindex";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java b/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
deleted file mode 100644
index 340d2b3..0000000
--- a/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.blur.mapreduce.lib.CsvBlurDriver.ControllerPool;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvBlurDriverTest {
-
-  protected String tableUri = "file:///tmp/tmppath";
-  protected int shardCount = 13;
-
-  @Before
-  public void setup() throws IOException {
-    Configuration configuration = new Configuration();
-    Path path1 = new Path("file:///tmp/test1");
-    Path path2 = new Path("file:///tmp/test2");
-    FileSystem fileSystem = path1.getFileSystem(configuration);
-    fileSystem.mkdirs(path1);
-    fileSystem.mkdirs(path2);
-  }
-
-  @Test
-  public void testCsvBlurDriverTestFail1() throws Exception {
-    Configuration configuration = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return null;
-      }
-    };
-    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
-    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, ref, new String[] {}));
-  }
-
-  @Test
-  public void testCsvBlurDriverTest() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
-        "file:///tmp/test2");
-    assertNotNull(job);
-    Configuration configuration = job.getConfiguration();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    assertEquals(tableDescriptor.getName(), "table1");
-    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
-    assertEquals(2, inputs.size());
-    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
-    assertEquals(2, familyAndColumnNameMap.size());
-  }
-
-  @Test
-  public void testCsvBlurDriverTest2() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
-        "file:///tmp/test2", "-S", "-C", "1000000", "2000000");
-    assertNotNull(job);
-    Configuration configuration = job.getConfiguration();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    assertEquals(tableDescriptor.getName(), "table1");
-    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
-    assertEquals(2, inputs.size());
-    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
-    assertEquals(2, familyAndColumnNameMap.size());
-  }
-
-  @Test
-  public void testCsvBlurDriverTest3() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
-        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
-    assertNotNull(job);
-    Configuration configuration = job.getConfiguration();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    assertEquals(tableDescriptor.getName(), "table1");
-    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
-    assertEquals(2, inputs.size());
-    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
-    assertEquals(2, familyAndColumnNameMap.size());
-    assertEquals("true", configuration.get(CsvBlurDriver.MAPRED_COMPRESS_MAP_OUTPUT));
-    assertEquals(SnappyCodec.class.getName(), configuration.get(CsvBlurDriver.MAPRED_MAP_OUTPUT_COMPRESSION_CODEC));
-  }
-
-  @Test
-  public void multiplierParamShouldIncreaseReduceTasks() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    int multiplierParam = 10;
-    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
-        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY", "-r", Integer.toString(multiplierParam));
-    assertNotNull(job);
-
-    assertEquals(multiplierParam * shardCount, job.getNumReduceTasks());
-  }
-
-  protected Iface getMockIface() {
-    InvocationHandler handler = new InvocationHandler() {
-
-      @Override
-      public Object invoke(Object o, Method method, Object[] args) throws Throwable {
-        if (method.getName().equals("describe")) {
-          TableDescriptor tableDescriptor = new TableDescriptor();
-          tableDescriptor.setName((String) args[0]);
-          tableDescriptor.setTableUri(tableUri);
-          tableDescriptor.setShardCount(shardCount);
-          return tableDescriptor;
-        }
-        throw new RuntimeException("not implemented.");
-      }
-    };
-    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class }, handler);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java b/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
deleted file mode 100644
index 47aa8e5..0000000
--- a/blur-mapred-common/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvBlurMapperTest {
-
-  private MapDriver<Writable, Text, Text, BlurMutate> _mapDriver;
-  private CsvBlurMapper _mapper;
-
-  @Before
-  public void setUp() throws IOException {
-    _mapper = new CsvBlurMapper();
-    _mapDriver = MapDriver.newMapDriver(_mapper);
-  }
-
-  @Test
-  public void testMapperWithFamilyInData() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
-    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-
-  @Test
-  public void testMapperFamilyPerPath() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,value1,value2"));
-    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-  
-  @Test
-  public void testMapperAutoGenerateRecordId() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("rowid1,value1,value2"));
-    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "-25nqln3n2vb4cayex9y9tpxx3", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-  
-  @Test
-  public void testMapperAutoGenerateRowId() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("record1,value1,value2"));
-    _mapDriver.withOutput(new Text("-50b4uzohynr7j7s9pve7ytz66"), new BlurMutate(MUTATE_TYPE.REPLACE, "-50b4uzohynr7j7s9pve7ytz66", "record1", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-  
-  @Test
-  public void testMapperAutoGenerateRowIdAndRecordId() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("value1,value2"));
-    _mapDriver.withOutput(new Text("5q0tme15ph3h5pns8sv3u5wy2"), new BlurMutate(MUTATE_TYPE.REPLACE, "5q0tme15ph3h5pns8sv3u5wy2", "5q0tme15ph3h5pns8sv3u5wy2", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/pom.xml
----------------------------------------------------------------------
diff --git a/blur-mapred/pom.xml b/blur-mapred/pom.xml
new file mode 100644
index 0000000..c3262a7
--- /dev/null
+++ b/blur-mapred/pom.xml
@@ -0,0 +1,244 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.blur</groupId>
+		<artifactId>blur</artifactId>
+		<version>0.2.4-incubating-SNAPSHOT</version>
+		<relativePath>../pom.xml</relativePath>
+	</parent>
+	<groupId>org.apache.blur</groupId>
+	<artifactId>blur-mapred</artifactId>
+	<version>${projectVersion}</version>
+	<packaging>jar</packaging>
+	<name>Blur Map Reduce Common</name>
+	<description>The Blur Map Reduce code module contains MR drivers and input/output formats.</description>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+			<version>${zookeeper.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-store</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-util</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-util</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>${log4j.version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.mail</groupId>
+					<artifactId>mail</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+	</dependencies>
+
+	<repositories>
+		<repository>
+			<id>libdir</id>
+			<url>file://${basedir}/../lib</url>
+		</repository>
+	</repositories>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-help-plugin</artifactId>
+				<version>2.2</version>
+				<executions>
+					<execution>
+						<phase>generate-resources</phase>
+						<goals>
+							<goal>effective-pom</goal>
+						</goals>
+						<configuration>
+							<output>${project.build.directory}/effective-pom.xml</output>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-install-plugin</artifactId>
+				<version>2.3.1</version>
+				<executions>
+					<execution>
+						<phase>install</phase>
+						<goals>
+							<goal>install-file</goal>
+						</goals>
+						<configuration>
+							<file>${project.build.directory}/${artifactId}-${project.version}.jar</file>
+							<pomFile>${project.build.directory}/effective-pom.xml</pomFile>
+							<!-- sources></sources -->
+							<!-- javadoc></javadoc -->
+							<groupId>${project.groupId}</groupId>
+							<artifactId>${project.artifactId}</artifactId>
+							<version>${project.version}</version>
+							<packaging>jar</packaging>
+							<!--classifier></classifier -->
+							<generatePom>true</generatePom>
+							<createChecksum>true</createChecksum>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+	
+	
+	<profiles>
+		<profile>
+			<id>hadoop1</id>
+			<activation>
+				<property>
+					<name>hadoop1</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-test</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
+				<dependency>
+				      <groupId>org.apache.mrunit</groupId>
+				      <artifactId>mrunit</artifactId>
+				      <version>${mrunit.version}</version>
+				      <classifier>hadoop1</classifier>
+					  <scope>test</scope>
+                </dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<id>hadoop2-mr1</id>
+			<activation>
+				<property>
+					<name>hadoop2-mr1</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-test</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
+                <dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
+				<dependency>
+				      <groupId>org.apache.mrunit</groupId>
+				      <artifactId>mrunit</artifactId>
+				      <version>${mrunit.version}</version>
+				      <classifier>hadoop2</classifier>
+					  <scope>test</scope>
+                </dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<id>hadoop2</id>
+			<activation>
+				<property>
+					<name>hadoop2</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+				      <groupId>org.apache.mrunit</groupId>
+				      <artifactId>mrunit</artifactId>
+				      <version>${mrunit.version}</version>
+				      <classifier>hadoop2</classifier>
+					  <scope>test</scope>
+                </dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-client</artifactId>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
new file mode 100644
index 0000000..037edec
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
@@ -0,0 +1,49 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Base mapper class for Blur map reduce classes.
+ * 
+ * @param <KEY>
+ * @param <VALUE>
+ */
+public abstract class BaseBlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, Text, BlurMutate> {
+  protected BlurMutate _mutate;
+  protected Text _key;
+  protected Counter _recordCounter;
+  protected Counter _columnCounter;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    _mutate = new BlurMutate();
+    _mutate.setRecord(new BlurRecord());
+    _key = new Text();
+    _recordCounter = context.getCounter(BlurCounters.RECORD_COUNT);
+    _columnCounter = context.getCounter(BlurCounters.COLUMN_COUNT);
+  }
+
+  @Override
+  protected abstract void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
new file mode 100644
index 0000000..d32a3bd
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
@@ -0,0 +1,109 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class BlurColumn implements Writable {
+
+  private String name;
+  private String value;
+
+  public BlurColumn() {
+  }
+
+  public BlurColumn(String name, String value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  public boolean hasNull() {
+    if (name == null || value == null) {
+      return true;
+    }
+    return false;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    name = IOUtil.readString(in);
+    value = IOUtil.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeString(out, name);
+    IOUtil.writeString(out, value);
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public void setValue(String value) {
+    this.value = value;
+  }
+
+  @Override
+  public String toString() {
+    return "{name=" + name + ", value=" + value + "}";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurColumn other = (BlurColumn) obj;
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
+      return false;
+    if (value == null) {
+      if (other.value != null)
+        return false;
+    } else if (!value.equals(other.value))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
new file mode 100644
index 0000000..0691dce
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
@@ -0,0 +1,26 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * The enum class used for all the internal counters during map reduce jobs.
+ */
+public enum BlurCounters {
+  RECORD_COUNT, LUCENE_FIELD_COUNT, ROW_COUNT, RECORD_RATE, COPY_RATE, ROW_RATE, RECORD_DUPLICATE_COUNT, ROW_OVERFLOW_COUNT, ROW_DELETE_COUNT, COLUMN_COUNT
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
new file mode 100644
index 0000000..41769d0
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import lucene.security.DocumentVisibility;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This utility code was taken from HBase to locate classes and the jars files
+ * to add to the MapReduce Job.
+ */
+public class BlurMapReduceUtil {
+
+  private final static Log LOG = LogFactory.getLog(BlurMapReduceUtil.class);
+
+  /**
+   * Add the Blur dependency jars as well as jars for any of the configured job
+   * classes to the job configuration, so that JobClient will ship them to the
+   * cluster and add them to the DistributedCache.
+   */
+  public static void addDependencyJars(Job job) throws IOException {
+    try {
+      addDependencyJars(job.getConfiguration(), org.apache.zookeeper.ZooKeeper.class, job.getMapOutputKeyClass(),
+          job.getMapOutputValueClass(), job.getInputFormatClass(), job.getOutputKeyClass(), job.getOutputValueClass(),
+          job.getOutputFormatClass(), job.getPartitionerClass(), job.getCombinerClass(), DocumentVisibility.class);
+      addAllJarsInBlurLib(job.getConfiguration());
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Adds all the jars in the same path as the blur jar files.
+   * 
+   * @param conf
+   * @throws IOException
+   */
+  public static void addAllJarsInBlurLib(Configuration conf) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Set<String> jars = new HashSet<String>();
+    jars.addAll(conf.getStringCollection("tmpjars"));
+
+    String property = System.getProperty("java.class.path");
+    String[] files = property.split("\\:");
+
+    String blurLibPath = getPath("blur-", files);
+    if (blurLibPath == null) {
+      return;
+    }
+    List<String> pathes = getPathes(blurLibPath, files);
+    for (String pathStr : pathes) {
+      Path path = new Path(pathStr);
+      if (!localFs.exists(path)) {
+        LOG.warn("Could not validate jar file " + path);
+        continue;
+      }
+      jars.add(path.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toString());
+    }
+    if (jars.isEmpty()) {
+      return;
+    }
+    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
+  }
+
+  private static List<String> getPathes(String path, String[] files) {
+    List<String> pathes = new ArrayList<String>();
+    for (String file : files) {
+      if (file.startsWith(path)) {
+        pathes.add(file);
+      }
+    }
+    return pathes;
+  }
+
+  private static String getPath(String startsWith, String[] files) {
+    for (String file : files) {
+      int lastIndexOf = file.lastIndexOf('/');
+      String fileName = file.substring(lastIndexOf + 1);
+      if (fileName.startsWith(startsWith)) {
+        return file.substring(0, lastIndexOf);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Add the jars containing the given classes to the job's configuration such
+   * that JobClient will ship them to the cluster and add them to the
+   * DistributedCache.
+   */
+  public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Set<String> jars = new HashSet<String>();
+    // Add jars that are already in the tmpjars variable
+    jars.addAll(conf.getStringCollection("tmpjars"));
+
+    // Add jars containing the specified classes
+    for (Class<?> clazz : classes) {
+      if (clazz == null) {
+        continue;
+      }
+
+      String pathStr = findOrCreateJar(clazz);
+      if (pathStr == null) {
+        LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
+        continue;
+      }
+      Path path = new Path(pathStr);
+      if (!localFs.exists(path)) {
+        LOG.warn("Could not validate jar file " + path + " for class " + clazz);
+        continue;
+      }
+      jars.add(path.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toString());
+    }
+    if (jars.isEmpty()) {
+      return;
+    }
+
+    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
+  }
+
+  /**
+   * If org.apache.hadoop.util.JarFinder is available (0.23+ hadoop), finds the
+   * Jar for a class or creates it if it doesn't exist. If the class is in a
+   * directory in the classpath, it creates a Jar on the fly with the contents
+   * of the directory and returns the path to that Jar. If a Jar is created, it
+   * is created in the system temporary directory.
+   * 
+   * Otherwise, returns an existing jar that contains a class of the same name.
+   * 
+   * @param my_class
+   *          the class to find.
+   * @return a jar file that contains the class, or null.
+   * @throws IOException
+   */
+  private static String findOrCreateJar(Class<?> my_class) throws IOException {
+    try {
+      Class<?> jarFinder = Class.forName("org.apache.hadoop.util.JarFinder");
+      // hadoop-0.23 has a JarFinder class that will create the jar
+      // if it doesn't exist. Note that this is needed to run the mapreduce
+      // unit tests post-0.23, because mapreduce v2 requires the relevant jars
+      // to be in the mr cluster to do output, split, etc. At unit test time,
+      // the hbase jars do not exist, so we need to create some. Note that we
+      // can safely fall back to findContainingJars for pre-0.23 mapreduce.
+      Method m = jarFinder.getMethod("getJar", Class.class);
+      return (String) m.invoke(null, my_class);
+    } catch (InvocationTargetException ite) {
+      // function was properly called, but threw it's own exception
+      throw new IOException(ite.getCause());
+    } catch (Exception e) {
+      // ignore all other exceptions. related to reflection failure
+    }
+
+    LOG.debug("New JarFinder: org.apache.hadoop.util.JarFinder.getJar " + "not available.  Using old findContainingJar");
+    return findContainingJar(my_class);
+  }
+
+  /**
+   * Find a jar that contains a class of the same name, if any. It will return a
+   * jar file, even if that is not the first thing on the class path that has a
+   * class with the same name.
+   * 
+   * This is shamelessly copied from JobConf
+   * 
+   * @param my_class
+   *          the class to find.
+   * @return a jar file that contains the class, or null.
+   * @throws IOException
+   */
+  private static String findContainingJar(Class<?> my_class) {
+    ClassLoader loader = my_class.getClassLoader();
+    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+    try {
+      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
+        URL url = itr.nextElement();
+        if ("jar".equals(url.getProtocol())) {
+          String toReturn = url.getPath();
+          if (toReturn.startsWith("file:")) {
+            toReturn = toReturn.substring("file:".length());
+          }
+          // URLDecoder is a misnamed class, since it actually decodes
+          // x-www-form-urlencoded MIME type rather than actual
+          // URL encoding (which the file path has). Therefore it would
+          // decode +s to ' 's which is incorrect (spaces are actually
+          // either unencoded or encoded as "%20"). Replace +s first, so
+          // that they are kept sacred during the decoding process.
+          toReturn = toReturn.replaceAll("\\+", "%2B");
+          toReturn = URLDecoder.decode(toReturn, "UTF-8");
+          return toReturn.replaceAll("!.*$", "");
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
new file mode 100644
index 0000000..36d7f4f
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
@@ -0,0 +1,178 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * {@link BlurMutate} carries the {@link Record}s bound for the {@link Row} for
+ * indexing. If this mutate represents a delete of the {@link Row} the recordId
+ * of the {@link BlurRecord} is ignored.
+ */
+public class BlurMutate implements Writable {
+
+  /**
+   * The {@link MUTATE_TYPE} controls the mutating of the {@link Row}. DELETE
+   * indicates that the {@link Row} is to be deleted. REPLACE indicates that the
+   * group of mutates are to replace the existing {@link Row}.
+   * 
+   * If both a DELETE and a REPLACE exist for a single {@link Row} in the
+   * {@link BlurOutputFormat} then the {@link Row} will be replaced not just
+   * deleted.
+   */
+  public enum MUTATE_TYPE {
+    /* ADD(0), UPDATE(1), */DELETE(2), REPLACE(3);
+    private int _value;
+
+    private MUTATE_TYPE(int value) {
+      _value = value;
+    }
+
+    public int getValue() {
+      return _value;
+    }
+
+    public MUTATE_TYPE find(int value) {
+      switch (value) {
+      // @TODO Updates through MR is going to be disabled
+      // case 0:
+      // return ADD;
+      // case 1:
+      // return UPDATE;
+      case 2:
+        return DELETE;
+      case 3:
+        return REPLACE;
+      default:
+        throw new RuntimeException("Value [" + value + "] not found.");
+      }
+    }
+  }
+
+  private MUTATE_TYPE _mutateType = MUTATE_TYPE.REPLACE;
+  private BlurRecord _record = new BlurRecord();
+
+  public BlurMutate() {
+
+  }
+
+  public BlurMutate(MUTATE_TYPE type, BlurRecord record) {
+    _mutateType = type;
+    _record = record;
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+    _record.setRecordId(recordId);
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId, String family) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+    _record.setRecordId(recordId);
+    _record.setFamily(family);
+  }
+
+  public BlurMutate addColumn(BlurColumn column) {
+    _record.addColumn(column);
+    return this;
+  }
+
+  public BlurMutate addColumn(String name, String value) {
+    return addColumn(new BlurColumn(name, value));
+  }
+
+  public BlurRecord getRecord() {
+    return _record;
+  }
+
+  public void setRecord(BlurRecord record) {
+    _record = record;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeVInt(out, _mutateType.getValue());
+    _record.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _mutateType.find(IOUtil.readVInt(in));
+    _record.readFields(in);
+  }
+
+  public MUTATE_TYPE getMutateType() {
+    return _mutateType;
+  }
+
+  public BlurMutate setMutateType(MUTATE_TYPE mutateType) {
+    _mutateType = mutateType;
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "BlurMutate [mutateType=" + _mutateType + ", record=" + _record + "]";
+  }
+
+  public BlurMutate setFamily(String family) {
+    _record.setFamily(family);
+    return this;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_mutateType == null) ? 0 : _mutateType.hashCode());
+    result = prime * result + ((_record == null) ? 0 : _record.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurMutate other = (BlurMutate) obj;
+    if (_mutateType != other._mutateType)
+      return false;
+    if (_record == null) {
+      if (other._record != null)
+        return false;
+    } else if (!_record.equals(other._record))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
new file mode 100644
index 0000000..4cb811b
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
@@ -0,0 +1,216 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class BlurOutputCommitter extends OutputCommitter {
+
+  private static final Log LOG = LogFactory.getLog(BlurOutputCommitter.class);
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+    LOG.info("Running setup job.");
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    // look through all the shards for attempts that need to be cleaned up.
+    // also find all the attempts that are finished
+    // then rename all the attempts jobs to commits
+    LOG.info("Commiting Job [{0}]", jobContext.getJobID());
+    Configuration configuration = jobContext.getConfiguration();
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    LOG.info("TableOutput path [{0}]", tableOutput);
+    makeSureNoEmptyShards(configuration, tableOutput);
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
+      LOG.info("Checking file status [{0}] with path [{1}]", fileStatus, fileStatus.getPath());
+      if (isShard(fileStatus)) {
+        commitOrAbortJob(jobContext, fileStatus.getPath(), true);
+      }
+    }
+    LOG.info("Commiting Complete [{0}]", jobContext.getJobID());
+  }
+
+  private void makeSureNoEmptyShards(Configuration configuration, Path tableOutput) throws IOException {
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    int shardCount = tableDescriptor.getShardCount();
+    for (int i = 0; i < shardCount; i++) {
+      String shardName = ShardUtil.getShardName(i);
+      fileSystem.mkdirs(new Path(tableOutput, shardName));
+    }
+  }
+
+  private void commitOrAbortJob(JobContext jobContext, Path shardPath, boolean commit) throws IOException {
+    LOG.info("CommitOrAbort [{0}] path [{1}]", commit, shardPath);
+    FileSystem fileSystem = shardPath.getFileSystem(jobContext.getConfiguration());
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        LOG.info("Checking path [{0}]", path);
+        if (path.getName().endsWith(".task_complete")) {
+          return true;
+        }
+        return false;
+      }
+    });
+    for (FileStatus fileStatus : listStatus) {
+      Path path = fileStatus.getPath();
+      LOG.info("Trying to commitOrAbort [{0}]", path);
+      String name = path.getName();
+      boolean taskComplete = name.endsWith(".task_complete");
+      if (fileStatus.isDir()) {
+        String taskAttemptName = getTaskAttemptName(name);
+        if (taskAttemptName == null) {
+          LOG.info("Dir name [{0}] not task attempt", name);
+          continue;
+        }
+        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptName);
+        if (taskAttemptID.getJobID().equals(jobContext.getJobID())) {
+          if (commit) {
+            if (taskComplete) {
+              fileSystem.rename(path, new Path(shardPath, taskAttemptName + ".commit"));
+              LOG.info("Committing [{0}] in path [{1}]", taskAttemptID, path);
+            } else {
+              fileSystem.delete(path, true);
+              LOG.info("Deleting tmp dir [{0}] in path [{1}]", taskAttemptID, path);
+            }
+          } else {
+            fileSystem.delete(path, true);
+            LOG.info("Deleting aborted job dir [{0}] in path [{1}]", taskAttemptID, path);
+          }
+        } else {
+          LOG.warn("TaskAttempt JobID [{0}] does not match JobContext JobId [{1}]", taskAttemptID.getJobID(),
+              jobContext.getJobID());
+        }
+      }
+    }
+  }
+
+  private String getTaskAttemptName(String name) {
+    int lastIndexOf = name.lastIndexOf('.');
+    if (lastIndexOf < 0) {
+      return null;
+    }
+    return name.substring(0, lastIndexOf);
+  }
+
+  private boolean isShard(FileStatus fileStatus) {
+    return isShard(fileStatus.getPath());
+  }
+
+  private boolean isShard(Path path) {
+    return path.getName().startsWith(BlurConstants.SHARD_PREFIX);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, State state) throws IOException {
+    LOG.info("Abort Job [{0}]", jobContext.getJobID());
+    Configuration configuration = jobContext.getConfiguration();
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    makeSureNoEmptyShards(configuration, tableOutput);
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
+      if (isShard(fileStatus)) {
+        commitOrAbortJob(jobContext, fileStatus.getPath(), false);
+      }
+    }
+  }
+
+  @Override
+  public void cleanupJob(JobContext context) throws IOException {
+    LOG.info("Running job cleanup.");
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+    int numReduceTasks = context.getNumReduceTasks();
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    return taskAttemptID.isMap() && numReduceTasks != 0 ? false : true;
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    LOG.info("Running task setup.");
+  }
+
+  private static class Conf {
+    Path _newIndex;
+    Configuration _configuration;
+    TaskAttemptID _taskAttemptID;
+    Path _indexPath;
+    TableDescriptor _tableDescriptor;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    LOG.info("Running commit task.");
+    Conf conf = setup(context);
+    FileSystem fileSystem = conf._newIndex.getFileSystem(conf._configuration);
+    if (fileSystem.exists(conf._newIndex) && !fileSystem.isFile(conf._newIndex)) {
+      Path dst = new Path(conf._indexPath, conf._taskAttemptID.toString() + ".task_complete");
+      LOG.info("Committing [{0}] to [{1}]", conf._newIndex, dst);
+      fileSystem.rename(conf._newIndex, dst);
+    } else {
+      throw new IOException("Path [" + conf._newIndex + "] does not exist, can not commit.");
+    }
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    LOG.info("Running abort task.");
+    Conf conf = setup(context);
+    FileSystem fileSystem = conf._newIndex.getFileSystem(conf._configuration);
+    LOG.info("abortTask - Deleting [{0}]", conf._newIndex);
+    fileSystem.delete(conf._newIndex, true);
+  }
+
+  private Conf setup(TaskAttemptContext context) throws IOException {
+    LOG.info("Setting up committer with task attempt [{0}]", context.getTaskAttemptID().toString());
+    Conf conf = new Conf();
+    conf._configuration = context.getConfiguration();
+    conf._tableDescriptor = BlurOutputFormat.getTableDescriptor(conf._configuration);
+    int shardCount = conf._tableDescriptor.getShardCount();
+    int attemptId = context.getTaskAttemptID().getTaskID().getId();
+    int shardId = attemptId % shardCount;
+    conf._taskAttemptID = context.getTaskAttemptID();
+    Path tableOutput = BlurOutputFormat.getOutputPath(conf._configuration);
+    String shardName = ShardUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+    conf._indexPath = new Path(tableOutput, shardName);
+    conf._newIndex = new Path(conf._indexPath, conf._taskAttemptID.toString() + ".tmp");
+    return conf;
+  }
+
+}


Mime
View raw message