incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/5] git commit: Refactoring all integration tests to run as a test suite in their own project. This is an effort to get the tests to run faster.
Date Mon, 08 Jun 2015 12:41:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
deleted file mode 100644
index d3bd4e7..0000000
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
+++ /dev/null
@@ -1,235 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-
-import org.apache.blur.MiniCluster;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.store.buffer.BufferStore;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.utils.GCWatcher;
-import org.apache.blur.utils.JavaHome;
-import org.apache.blur.utils.ShardUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class BlurOutputFormatMiniClusterTest {
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem fileSystem;
-  private static Path TEST_ROOT_DIR;
-  private static MiniCluster miniCluster;
-  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
-  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir",
-      "./target/tmp_BlurOutputFormatMiniClusterTest"));
-
-  @BeforeClass
-  public static void setupTest() throws Exception {
-    GCWatcher.init(0.60);
-    JavaHome.checkJavaHome();
-    LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
-    File testDirectory = new File(TMPDIR, "blur-cluster-test").getAbsoluteFile();
-    testDirectory.mkdirs();
-
-    Path directory = new Path(testDirectory.getPath());
-    FsPermission dirPermissions = localFS.getFileStatus(directory).getPermission();
-    FsAction userAction = dirPermissions.getUserAction();
-    FsAction groupAction = dirPermissions.getGroupAction();
-    FsAction otherAction = dirPermissions.getOtherAction();
-
-    StringBuilder builder = new StringBuilder();
-    builder.append(userAction.ordinal());
-    builder.append(groupAction.ordinal());
-    builder.append(otherAction.ordinal());
-    String dirPermissionNum = builder.toString();
-    System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
-    testDirectory.delete();
-    miniCluster = new MiniCluster();
-    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true, false);
-
-    TEST_ROOT_DIR = new Path(miniCluster.getFileSystemUri().toString() + "/blur_test");
-    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
-    try {
-      fileSystem = TEST_ROOT_DIR.getFileSystem(conf);
-    } catch (IOException io) {
-      throw new RuntimeException("problem getting local fs", io);
-    }
-
-    FileSystem.setDefaultUri(conf, miniCluster.getFileSystemUri());
-
-    miniCluster.startMrMiniCluster();
-    conf = miniCluster.getMRConfiguration();
-
-    BufferStore.initNewBuffer(128, 128 * 128);
-  }
-
-  @AfterClass
-  public static void teardown() throws IOException {
-    if (miniCluster != null) {
-      miniCluster.stopMrMiniCluster();
-    }
-    miniCluster.shutdownBlurCluster();
-    rm(new File("build"));
-  }
-
-  private static void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Before
-  public void setup() {
-    TableContext.clear();
-  }
-
-  @Test
-  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException, BlurException,
-      TException {
-    fileSystem.delete(inDir, true);
-    String tableName = "testBlurOutputFormat";
-    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
-    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
-
-    Job job = Job.getInstance(conf, "blur index");
-    job.setJarByClass(BlurOutputFormatMiniClusterTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/blur/" + tableName).makeQualified(fileSystem.getUri(),
-        fileSystem.getWorkingDirectory()).toString();
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
-    tableDescriptor.setName(tableName);
-
-    Iface client = getClient();
-    client.createTable(tableDescriptor);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
-    BlurOutputFormat.setOutputPath(job, output);
-
-    Path tablePath = new Path(tableUri);
-    Path shardPath = new Path(tablePath, ShardUtil.getShardName(0));
-    FileStatus[] listStatus = fileSystem.listStatus(shardPath);
-    assertEquals(3, listStatus.length);
-    System.out.println("======" + listStatus.length);
-    for (FileStatus fileStatus : listStatus) {
-      System.out.println(fileStatus.getPath());
-    }
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    client.loadData(tableName, output.toString());
-
-    while (true) {
-      TableStats tableStats = client.tableStats(tableName);
-      System.out.println(tableStats);
-      if (tableStats.getRowCount() > 0) {
-        break;
-      }
-      Thread.sleep(100);
-    }
-
-    assertTrue(fileSystem.exists(tablePath));
-    assertFalse(fileSystem.isFile(tablePath));
-
-    FileStatus[] listStatusAfter = fileSystem.listStatus(shardPath);
-
-    assertEquals(11, listStatusAfter.length);
-
-  }
-
-  private Iface getClient() {
-    return BlurClient.getClient(miniCluster.getControllerConnectionStr());
-  }
-
-  public static String readFile(String name) throws IOException {
-    DataInputStream f = fileSystem.open(new Path(TEST_ROOT_DIR + "/" + name));
-    BufferedReader b = new BufferedReader(new InputStreamReader(f));
-    StringBuilder result = new StringBuilder();
-    String line = b.readLine();
-    while (line != null) {
-      result.append(line);
-      result.append('\n');
-      line = b.readLine();
-    }
-    b.close();
-    return result.toString();
-  }
-
-  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
-      int numberOfRecords, String family) throws IOException {
-    // "1,1,cf1,val1"
-    Path file = new Path(TEST_ROOT_DIR + "/" + name);
-    fileSystem.delete(file, false);
-    DataOutputStream f = fileSystem.create(file);
-    PrintWriter writer = new PrintWriter(f);
-    for (int row = 0; row < numberOfRows; row++) {
-      for (int record = 0; record < numberOfRecords; record++) {
-        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
-      }
-    }
-    writer.close();
-    return file;
-  }
-
-  private String getRecord(int rowId, int recordId, String family) {
-    return rowId + "," + recordId + "," + family + ",valuetoindex";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTestIT.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTestIT.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTestIT.java
new file mode 100644
index 0000000..cccb8c0
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTestIT.java
@@ -0,0 +1,230 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.SuiteCluster;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurOutputFormatMiniClusterTestIT {
+
+  private static FileSystem fileSystem;
+  private static Path TEST_ROOT_DIR;
+  private static MiniCluster miniCluster;
+  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
+
+  // @BeforeClass
+  // public static void setupTest() throws Exception {
+  // GCWatcher.init(0.60);
+  // JavaHome.checkJavaHome();
+  // LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
+  // File testDirectory = new File(TMPDIR,
+  // "blur-cluster-test").getAbsoluteFile();
+  // testDirectory.mkdirs();
+  //
+  // Path directory = new Path(testDirectory.getPath());
+  // FsPermission dirPermissions =
+  // localFS.getFileStatus(directory).getPermission();
+  // FsAction userAction = dirPermissions.getUserAction();
+  // FsAction groupAction = dirPermissions.getGroupAction();
+  // FsAction otherAction = dirPermissions.getOtherAction();
+  //
+  // StringBuilder builder = new StringBuilder();
+  // builder.append(userAction.ordinal());
+  // builder.append(groupAction.ordinal());
+  // builder.append(otherAction.ordinal());
+  // String dirPermissionNum = builder.toString();
+  // System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
+  // testDirectory.delete();
+  // miniCluster = new MiniCluster();
+  // miniCluster.startBlurCluster(new File(testDirectory,
+  // "cluster").getAbsolutePath(), 2, 3, true, false);
+  //
+  // TEST_ROOT_DIR = new Path(miniCluster.getFileSystemUri().toString() +
+  // "/blur_test");
+  // System.setProperty("hadoop.log.dir",
+  // "./target/BlurOutputFormatTest/hadoop_log");
+  // try {
+  // fileSystem = TEST_ROOT_DIR.getFileSystem(conf);
+  // } catch (IOException io) {
+  // throw new RuntimeException("problem getting local fs", io);
+  // }
+  //
+  // FileSystem.setDefaultUri(conf, miniCluster.getFileSystemUri());
+  //
+  // miniCluster.startMrMiniCluster();
+  // conf = miniCluster.getMRConfiguration();
+  //
+  // BufferStore.initNewBuffer(128, 128 * 128);
+  // }
+  //
+  // @AfterClass
+  // public static void teardown() throws IOException {
+  // if (miniCluster != null) {
+  // miniCluster.shutdownMrMiniCluster();
+  // }
+  // miniCluster.shutdownBlurCluster();
+  // rm(new File("build"));
+  // }
+
+  @BeforeClass
+  public static void startup() throws IOException, BlurException, TException {
+    SuiteCluster.setupMiniCluster(BlurOutputFormatMiniClusterTestIT.class);
+    miniCluster = SuiteCluster.getMiniCluster();
+    fileSystem = miniCluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    SuiteCluster.shutdownMiniCluster(BlurOutputFormatMiniClusterTestIT.class);
+  }
+
+  @Before
+  public void setup() {
+    TableContext.clear();
+  }
+
+  @Test
+  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException, BlurException,
+      TException {
+    fileSystem.delete(inDir, true);
+    String tableName = "testBlurOutputFormat";
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = Job.getInstance(miniCluster.getMRConfiguration(), "blur index");
+    job.setJarByClass(BlurOutputFormatMiniClusterTestIT.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/blur/" + tableName).makeQualified(fileSystem.getUri(),
+        fileSystem.getWorkingDirectory()).toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName(tableName);
+
+    Iface client = getClient();
+    client.createTable(tableDescriptor);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
+
+    Path tablePath = new Path(tableUri);
+    Path shardPath = new Path(tablePath, ShardUtil.getShardName(0));
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath);
+    assertEquals(3, listStatus.length);
+    System.out.println("======" + listStatus.length);
+    for (FileStatus fileStatus : listStatus) {
+      System.out.println(fileStatus.getPath());
+    }
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    client.loadData(tableName, output.toString());
+
+    while (true) {
+      TableStats tableStats = client.tableStats(tableName);
+      System.out.println(tableStats);
+      if (tableStats.getRowCount() > 0) {
+        break;
+      }
+      Thread.sleep(100);
+    }
+
+    assertTrue(fileSystem.exists(tablePath));
+    assertFalse(fileSystem.isFile(tablePath));
+
+    FileStatus[] listStatusAfter = fileSystem.listStatus(shardPath);
+
+    assertEquals(11, listStatusAfter.length);
+
+  }
+
+  private Iface getClient() {
+    return BlurClient.getClient(miniCluster.getControllerConnectionStr());
+  }
+
+  public static String readFile(String name) throws IOException {
+    DataInputStream f = fileSystem.open(new Path(TEST_ROOT_DIR + "/" + name));
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+      result.append(line);
+      result.append('\n');
+      line = b.readLine();
+    }
+    b.close();
+    return result.toString();
+  }
+
+  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
+      int numberOfRecords, String family) throws IOException {
+    // "1,1,cf1,val1"
+    Path file = new Path(TEST_ROOT_DIR + "/" + name);
+    fileSystem.delete(file, false);
+    DataOutputStream f = fileSystem.create(file);
+    PrintWriter writer = new PrintWriter(f);
+    for (int row = 0; row < numberOfRows; row++) {
+      for (int record = 0; record < numberOfRecords; record++) {
+        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
+      }
+    }
+    writer.close();
+    return file;
+  }
+
+  private String getRecord(int rowId, int recordId, String family) {
+    return rowId + "," + recordId + "," + family + ",valuetoindex";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
deleted file mode 100644
index f4e7074..0000000
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ /dev/null
@@ -1,478 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.util.Collection;
-import java.util.TreeSet;
-
-import org.apache.blur.MiniCluster;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.store.buffer.BufferStore;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.JavaHome;
-import org.apache.blur.utils.ShardUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.lucene.index.DirectoryReader;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class BlurOutputFormatTest {
-
-  private static Configuration _conf = new Configuration();
-  private static FileSystem _fileSystem;
-  private static MiniCluster _miniCluster;
-
-  private static Path _root;
-
-  @BeforeClass
-  public static void setupTest() throws Exception {
-    JavaHome.checkJavaHome();
-    File file = new File("./target/tmp/BlurOutputFormatTest_tmp");
-    String pathStr = file.getAbsoluteFile().toURI().toString();
-    String hdfsPath = pathStr + "/hdfs";
-    System.setProperty("test.build.data", hdfsPath);
-    System.setProperty("hadoop.log.dir", pathStr + "/hadoop_log");
-
-    _miniCluster = new MiniCluster();
-    _miniCluster.startDfs(hdfsPath);
-    _fileSystem = _miniCluster.getFileSystem();
-    _root = new Path(_fileSystem.getUri() + "/testroot");
-    _miniCluster.startMrMiniCluster();
-    _conf = _miniCluster.getMRConfiguration();
-
-    BufferStore.initNewBuffer(128, 128 * 128);
-  }
-
-  @AfterClass
-  public static void teardown() throws IOException {
-    if (_miniCluster != null) {
-      _miniCluster.stopMrMiniCluster();
-      _miniCluster.shutdownDfs();
-    }
-    rm(new File("build"));
-  }
-
-  private static void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Before
-  public void setup() {
-    TableContext.clear();
-  }
-
-  @Test
-  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException {
-    Path input = getInDir();
-    Path output = getOutDir();
-    _fileSystem.delete(input, true);
-    _fileSystem.delete(output, true);
-    writeRecordsFile(new Path(input, "part1"), 1, 1, 1, 1, "cf1");
-    writeRecordsFile(new Path(input, "part2"), 1, 1, 2, 1, "cf1");
-
-    Job job = Job.getInstance(_conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, input);
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    Path tablePath = new Path(new Path(_root, "table"), "test");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tablePath.toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(tablePath, 1);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurOutputFormat.setOutputPath(job, output);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    Path path = new Path(output, ShardUtil.getShardName(0));
-    dump(path, _conf);
-    Collection<Path> commitedTasks = getCommitedTasks(path);
-    assertEquals(1, commitedTasks.size());
-    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
-    assertEquals(2, reader.numDocs());
-    reader.close();
-  }
-
-  private Path getOutDir() {
-    return new Path(_root, "out");
-  }
-
-  private Path getInDir() {
-    return new Path(_root, "in");
-  }
-
-  private void dump(Path path, Configuration conf) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    System.out.println(path);
-    if (!fileSystem.isFile(path)) {
-      FileStatus[] listStatus = fileSystem.listStatus(path);
-      for (FileStatus fileStatus : listStatus) {
-        dump(fileStatus.getPath(), conf);
-      }
-    }
-  }
-
-  private Collection<Path> getCommitedTasks(Path path) throws IOException {
-    Collection<Path> result = new TreeSet<Path>();
-    FileSystem fileSystem = path.getFileSystem(_conf);
-    FileStatus[] listStatus = fileSystem.listStatus(path);
-    for (FileStatus fileStatus : listStatus) {
-      Path p = fileStatus.getPath();
-      if (fileStatus.isDir() && p.getName().endsWith(".commit")) {
-        result.add(p);
-      }
-    }
-    return result;
-  }
-
-  @Test
-  public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException, ClassNotFoundException {
-    Path input = getInDir();
-    Path output = getOutDir();
-    _fileSystem.delete(input, true);
-    _fileSystem.delete(output, true);
-    // 1500 * 50 = 75,000
-    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
-    // 100 * 50 = 5,000
-    writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
-
-    Job job = Job.getInstance(_conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, input);
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    Path tablePath = new Path(new Path(_root, "table"), "test");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tablePath.toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(tablePath, 1);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurOutputFormat.setOutputPath(job, output);
-    BlurOutputFormat.setIndexLocally(job, true);
-    BlurOutputFormat.setOptimizeInFlight(job, false);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    Path path = new Path(output, ShardUtil.getShardName(0));
-    Collection<Path> commitedTasks = getCommitedTasks(path);
-    assertEquals(1, commitedTasks.size());
-
-    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
-    assertEquals(80000, reader.numDocs());
-    reader.close();
-  }
-
-  @Test
-  public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    Path input = getInDir();
-    Path output = getOutDir();
-    _fileSystem.delete(input, true);
-    _fileSystem.delete(output, true);
-    // 1500 * 50 = 75,000
-    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
-    // 100 * 50 = 5,000
-    writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
-
-    Job job = Job.getInstance(_conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, input);
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    Path tablePath = new Path(new Path(_root, "table"), "test");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(tablePath.toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(output, 2);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurOutputFormat.setOutputPath(job, output);
-    BlurOutputFormat.setIndexLocally(job, false);
-    BlurOutputFormat.setDocumentBufferStrategy(job, DocumentBufferStrategyHeapSize.class);
-    BlurOutputFormat.setMaxDocumentBufferHeapSize(job, 128 * 1024);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    long total = 0;
-    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(output, ShardUtil.getShardName(i));
-      Collection<Path> commitedTasks = getCommitedTasks(path);
-      assertEquals(1, commitedTasks.size());
-
-      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
-      total += reader.numDocs();
-      reader.close();
-    }
-    assertEquals(80000, total);
-
-  }
-
-  @Test
-  public void testBlurOutputFormatOverFlowMultipleReducersWithReduceMultiplierTest() throws IOException,
-      InterruptedException, ClassNotFoundException {
-    Path input = getInDir();
-    Path output = getOutDir();
-    _fileSystem.delete(input, true);
-    _fileSystem.delete(output, true);
-
-    // 1500 * 50 = 75,000
-    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
-    // 100 * 50 = 5,000
-    writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
-
-    Job job = Job.getInstance(_conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, input);
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    Path tablePath = new Path(new Path(_root, "table"), "test");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(7);
-    tableDescriptor.setTableUri(tablePath.toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(output, 7);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurOutputFormat.setOutputPath(job, output);
-    int multiple = 2;
-    BlurOutputFormat.setReducerMultiplier(job, multiple);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    long total = 0;
-    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(output, ShardUtil.getShardName(i));
-      Collection<Path> commitedTasks = getCommitedTasks(path);
-      assertTrue(multiple >= commitedTasks.size());
-      for (Path p : commitedTasks) {
-        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, p));
-        total += reader.numDocs();
-        reader.close();
-      }
-    }
-    assertEquals(80000, total);
-
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testBlurOutputFormatValidateReducerCount() throws IOException, InterruptedException,
-      ClassNotFoundException {
-
-    Path input = getInDir();
-    Path output = getOutDir();
-    _fileSystem.delete(input, true);
-    _fileSystem.delete(output, true);
-    writeRecordsFile(new Path(input, "part1"), 1, 1, 1, 1, "cf1");
-    writeRecordsFile(new Path(input, "part2"), 1, 1, 2, 1, "cf1");
-
-    Job job = Job.getInstance(_conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, input);
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    Path tablePath = new Path(new Path(_root, "table"), "test");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tablePath.toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(getOutDir(), 1);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurOutputFormat.setOutputPath(job, output);
-    BlurOutputFormat.setReducerMultiplier(job, 2);
-    job.setNumReduceTasks(4);
-    job.submit();
-
-  }
-
-  // @TODO this test to fail sometimes due to issues in the MR MiniCluster
-  // @Test
-  public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    Path input = getInDir();
-    Path output = getOutDir();
-    _fileSystem.delete(input, true);
-    _fileSystem.delete(output, true);
-    // 1500 * 50 = 75,000
-    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
-    // 100 * 5000 = 500,000
-    writeRecordsFile(new Path(input, "part2"), 1, 5000, 2000, 100, "cf1");
-
-    Job job = Job.getInstance(_conf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, input);
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    Path tablePath = new Path(new Path(_root, "table"), "test");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(tablePath.toString());
-    tableDescriptor.setName("test");
-
-    createShardDirectories(getOutDir(), 2);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurOutputFormat.setOutputPath(job, output);
-    BlurOutputFormat.setIndexLocally(job, false);
-
-    job.submit();
-    boolean killCalled = false;
-    while (!job.isComplete()) {
-      Thread.sleep(1000);
-      System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress() * 100,
-          job.reduceProgress() * 100);
-      if (job.reduceProgress() > 0.7 && !killCalled) {
-        job.killJob();
-        killCalled = true;
-      }
-    }
-
-    assertFalse(job.isSuccessful());
-
-    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(output, ShardUtil.getShardName(i));
-      FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
-      FileStatus[] listStatus = fileSystem.listStatus(path);
-      assertEquals(toString(listStatus), 0, listStatus.length);
-    }
-  }
-
-  private String toString(FileStatus[] listStatus) {
-    if (listStatus == null || listStatus.length == 0) {
-      return "";
-    }
-    String s = "";
-    for (FileStatus fileStatus : listStatus) {
-      if (s.length() > 0) {
-        s += ",";
-      }
-      s += fileStatus.getPath();
-    }
-    return s;
-  }
-
-  public static String readFile(Path file) throws IOException {
-    DataInputStream f = _fileSystem.open(file);
-    BufferedReader b = new BufferedReader(new InputStreamReader(f));
-    StringBuilder result = new StringBuilder();
-    String line = b.readLine();
-    while (line != null) {
-      result.append(line);
-      result.append('\n');
-      line = b.readLine();
-    }
-    b.close();
-    return result.toString();
-  }
-
-  private Path writeRecordsFile(Path file, int starintgRowId, int numberOfRows, int startRecordId, int numberOfRecords,
-      String family) throws IOException {
-    _fileSystem.delete(file, false);
-    DataOutputStream f = _fileSystem.create(file);
-    PrintWriter writer = new PrintWriter(f);
-    for (int row = 0; row < numberOfRows; row++) {
-      for (int record = 0; record < numberOfRecords; record++) {
-        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
-      }
-    }
-    writer.close();
-    return file;
-  }
-
-  private void createShardDirectories(Path outDir, int shardCount) throws IOException {
-    _fileSystem.mkdirs(outDir);
-    for (int i = 0; i < shardCount; i++) {
-      _fileSystem.mkdirs(new Path(outDir, ShardUtil.getShardName(i)));
-    }
-  }
-
-  private String getRecord(int rowId, int recordId, String family) {
-    return rowId + "," + recordId + "," + family + ",valuetoindex";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTestIT.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTestIT.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTestIT.java
new file mode 100644
index 0000000..25310ad
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTestIT.java
@@ -0,0 +1,484 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.TreeSet;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.SuiteCluster;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.lucene.index.DirectoryReader;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurOutputFormatTestIT {
+
+  // private static Configuration _conf = new Configuration();
+  // private static FileSystem _fileSystem;
+  // private static MiniCluster _miniCluster;
+  // private static Path _root;
+
+  // @BeforeClass
+  // public static void setupTest() throws Exception {
+  // JavaHome.checkJavaHome();
+  // File file = new File("./target/tmp/BlurOutputFormatTest_tmp");
+  // String pathStr = file.getAbsoluteFile().toURI().toString();
+  // String hdfsPath = pathStr + "/hdfs";
+  // System.setProperty("test.build.data", hdfsPath);
+  // System.setProperty("hadoop.log.dir", pathStr + "/hadoop_log");
+  //
+  // _miniCluster = new MiniCluster();
+  // _miniCluster.startDfs(hdfsPath);
+  // _fileSystem = _miniCluster.getFileSystem();
+  // _root = new Path(_fileSystem.getUri() + "/testroot");
+  // _miniCluster.startMrMiniCluster();
+  // _conf = _miniCluster.getMRConfiguration();
+  //
+  // BufferStore.initNewBuffer(128, 128 * 128);
+  // }
+  //
+  // @AfterClass
+  // public static void teardown() throws IOException {
+  // if (_miniCluster != null) {
+  // _miniCluster.shutdownMrMiniCluster();
+  // _miniCluster.shutdownDfs();
+  // }
+  // rm(new File("build"));
+  // }
+
+  private static MiniCluster _miniCluster;
+  private static FileSystem _fileSystem;
+  private static Configuration _conf;
+  private static Path _root;
+
+  @BeforeClass
+  public static void startup() throws IOException, BlurException, TException {
+    SuiteCluster.setupMiniCluster(BlurOutputFormatTestIT.class);
+    _miniCluster = SuiteCluster.getMiniCluster();
+    _fileSystem = _miniCluster.getFileSystem();
+    _conf = _miniCluster.getMRConfiguration();
+    _root = new Path(_fileSystem.getUri() + "/testroot");
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    SuiteCluster.shutdownMiniCluster(BlurOutputFormatTestIT.class);
+  }
+
+  @Before
+  public void setup() {
+    TableContext.clear();
+  }
+
+  @Test
+  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException {
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+    writeRecordsFile(new Path(input, "part1"), 1, 1, 1, 1, "cf1");
+    writeRecordsFile(new Path(input, "part2"), 1, 1, 2, 1, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTestIT.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, input);
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tablePath.toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(tablePath, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setOutputPath(job, output);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    Path path = new Path(output, ShardUtil.getShardName(0));
+    dump(path, _conf);
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
+    assertEquals(2, reader.numDocs());
+    reader.close();
+  }
+
+  private Path getOutDir() {
+    return new Path(_root, "out");
+  }
+
+  private Path getInDir() {
+    return new Path(_root, "in");
+  }
+
+  private void dump(Path path, Configuration conf) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    System.out.println(path);
+    if (!fileSystem.isFile(path)) {
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fileStatus : listStatus) {
+        dump(fileStatus.getPath(), conf);
+      }
+    }
+  }
+
+  private Collection<Path> getCommitedTasks(Path path) throws IOException {
+    Collection<Path> result = new TreeSet<Path>();
+    FileSystem fileSystem = path.getFileSystem(_conf);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus fileStatus : listStatus) {
+      Path p = fileStatus.getPath();
+      if (fileStatus.isDir() && p.getName().endsWith(".commit")) {
+        result.add(p);
+      }
+    }
+    return result;
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException, ClassNotFoundException {
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+    // 1500 * 50 = 75,000
+    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
+    // 100 * 50 = 5,000
+    writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTestIT.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, input);
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tablePath.toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(tablePath, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setOutputPath(job, output);
+    BlurOutputFormat.setIndexLocally(job, true);
+    BlurOutputFormat.setOptimizeInFlight(job, false);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    Path path = new Path(output, ShardUtil.getShardName(0));
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
+    assertEquals(80000, reader.numDocs());
+    reader.close();
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+    // 1500 * 50 = 75,000
+    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
+    // 100 * 50 = 5,000
+    writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTestIT.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, input);
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setTableUri(tablePath.toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(output, 2);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setOutputPath(job, output);
+    BlurOutputFormat.setIndexLocally(job, false);
+    BlurOutputFormat.setDocumentBufferStrategy(job, DocumentBufferStrategyHeapSize.class);
+    BlurOutputFormat.setMaxDocumentBufferHeapSize(job, 128 * 1024);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(output, ShardUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertEquals(1, commitedTasks.size());
+
+      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
+      total += reader.numDocs();
+      reader.close();
+    }
+    assertEquals(80000, total);
+
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersWithReduceMultiplierTest() throws IOException,
+      InterruptedException, ClassNotFoundException {
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+
+    // 1500 * 50 = 75,000
+    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
+    // 100 * 50 = 5,000
+    writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTestIT.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, input);
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(7);
+    tableDescriptor.setTableUri(tablePath.toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(output, 7);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setOutputPath(job, output);
+    int multiple = 2;
+    BlurOutputFormat.setReducerMultiplier(job, multiple);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(output, ShardUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertTrue(multiple >= commitedTasks.size());
+      for (Path p : commitedTasks) {
+        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, p));
+        total += reader.numDocs();
+        reader.close();
+      }
+    }
+    assertEquals(80000, total);
+
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlurOutputFormatValidateReducerCount() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+    writeRecordsFile(new Path(input, "part1"), 1, 1, 1, 1, "cf1");
+    writeRecordsFile(new Path(input, "part2"), 1, 1, 2, 1, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTestIT.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, input);
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tablePath.toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(getOutDir(), 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setOutputPath(job, output);
+    BlurOutputFormat.setReducerMultiplier(job, 2);
+    job.setNumReduceTasks(4);
+    job.submit();
+
+  }
+
+  // @TODO this test to fail sometimes due to issues in the MR MiniCluster
+  // @Test
+  public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+    // 1500 * 50 = 75,000
+    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
+    // 100 * 5000 = 500,000
+    writeRecordsFile(new Path(input, "part2"), 1, 5000, 2000, 100, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTestIT.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, input);
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setTableUri(tablePath.toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(getOutDir(), 2);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setOutputPath(job, output);
+    BlurOutputFormat.setIndexLocally(job, false);
+
+    job.submit();
+    boolean killCalled = false;
+    while (!job.isComplete()) {
+      Thread.sleep(1000);
+      System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress() * 100,
+          job.reduceProgress() * 100);
+      if (job.reduceProgress() > 0.7 && !killCalled) {
+        job.killJob();
+        killCalled = true;
+      }
+    }
+
+    assertFalse(job.isSuccessful());
+
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(output, ShardUtil.getShardName(i));
+      FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      assertEquals(toString(listStatus), 0, listStatus.length);
+    }
+  }
+
+  private String toString(FileStatus[] listStatus) {
+    if (listStatus == null || listStatus.length == 0) {
+      return "";
+    }
+    String s = "";
+    for (FileStatus fileStatus : listStatus) {
+      if (s.length() > 0) {
+        s += ",";
+      }
+      s += fileStatus.getPath();
+    }
+    return s;
+  }
+
+  public static String readFile(Path file) throws IOException {
+    DataInputStream f = _fileSystem.open(file);
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+      result.append(line);
+      result.append('\n');
+      line = b.readLine();
+    }
+    b.close();
+    return result.toString();
+  }
+
+  private Path writeRecordsFile(Path file, int starintgRowId, int numberOfRows, int startRecordId, int numberOfRecords,
+      String family) throws IOException {
+    _fileSystem.delete(file, false);
+    DataOutputStream f = _fileSystem.create(file);
+    PrintWriter writer = new PrintWriter(f);
+    for (int row = 0; row < numberOfRows; row++) {
+      for (int record = 0; record < numberOfRecords; record++) {
+        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
+      }
+    }
+    writer.close();
+    return file;
+  }
+
+  private void createShardDirectories(Path outDir, int shardCount) throws IOException {
+    _fileSystem.mkdirs(outDir);
+    for (int i = 0; i < shardCount; i++) {
+      _fileSystem.mkdirs(new Path(outDir, ShardUtil.getShardName(i)));
+    }
+  }
+
+  private String getRecord(int rowId, int recordId, String family) {
+    return rowId + "," + recordId + "," + family + ",valuetoindex";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTest.java
deleted file mode 100644
index 8c8de6e..0000000
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTest.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.update;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.blur.MiniCluster;
-import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.blur.store.buffer.BufferStore;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.ColumnDefinition;
-import org.apache.blur.thrift.generated.FetchResult;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.RecordMutation;
-import org.apache.blur.thrift.generated.RecordMutationType;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.blur.thrift.generated.RowMutation;
-import org.apache.blur.thrift.generated.RowMutationType;
-import org.apache.blur.thrift.generated.Selector;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class DriverTest {
-
-  private static Configuration conf = new Configuration();
-  private static MiniCluster miniCluster;
-
-  @BeforeClass
-  public static void setupTest() throws Exception {
-    setupJavaHome();
-    File file = new File("./target/tmp/BlurInputFormatTest_tmp");
-    String pathStr = file.getAbsoluteFile().toURI().toString();
-    System.setProperty("test.build.data", pathStr + "/data");
-    System.setProperty("hadoop.log.dir", pathStr + "/hadoop_log");
-    miniCluster = new MiniCluster();
-    miniCluster.startBlurCluster(pathStr + "/blur", 2, 2);
-    miniCluster.startMrMiniCluster();
-    conf = miniCluster.getMRConfiguration();
-
-    BufferStore.initNewBuffer(128, 128 * 128);
-  }
-
-  public static void setupJavaHome() {
-    String str = System.getenv("JAVA_HOME");
-    if (str == null) {
-      String property = System.getProperty("java.home");
-      if (property != null) {
-        throw new RuntimeException("JAVA_HOME not set should probably be [" + property + "].");
-      }
-      throw new RuntimeException("JAVA_HOME not set.");
-    }
-  }
-
-  @AfterClass
-  public static void teardown() throws IOException {
-    if (miniCluster != null) {
-      miniCluster.stopMrMiniCluster();
-    }
-    rm(new File("build"));
-  }
-
-  private static void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Test
-  public void testDriverAddSingleRowWithSingleRecord() throws Exception {
-    FileSystem fileSystem = miniCluster.getFileSystem();
-    Path root = new Path(fileSystem.getUri() + "/");
-
-    String tableName = "testDriverAddSingleRowWithSingleRecord";
-    creatTable(tableName, new Path(root, "tables"), true);
-
-    Driver driver = new Driver();
-    driver.setConf(conf);
-
-    String mrIncWorkingPathStr = new Path(root, "working").toString();
-    generateData(mrIncWorkingPathStr);
-    String outputPathStr = new Path(root, "output").toString();
-    String blurZkConnection = miniCluster.getZkConnectionString();
-
-    assertEquals(0, driver.run(new String[] { tableName, mrIncWorkingPathStr, outputPathStr, blurZkConnection, "1" }));
-
-    Iface client = getClient();
-    client.loadData(tableName, outputPathStr);
-
-    waitUntilAllImportsAreCompleted(client, tableName);
-
-    TableStats tableStats = client.tableStats(tableName);
-    assertEquals(1, tableStats.getRowCount());
-    assertEquals(1, tableStats.getRecordCount());
-  }
-
-  @Test
-  public void testDriverAddSingleRecordToExistingRow() throws Exception {
-    FileSystem fileSystem = miniCluster.getFileSystem();
-    Path root = new Path(fileSystem.getUri() + "/");
-
-    String tableName = "testDriverAddSingleRecordToExistingRow";
-    Iface client = getClient();
-    creatTable(tableName, new Path(root, "tables"), true);
-    addRow(client, tableName, "row1", "record1", "value1");
-
-    Driver driver = new Driver();
-    driver.setConf(conf);
-
-    String mrIncWorkingPathStr = new Path(root, "working").toString();
-    generateData(mrIncWorkingPathStr);
-    String outputPathStr = new Path(root, "output").toString();
-    String blurZkConnection = miniCluster.getZkConnectionString();
-
-    assertEquals(0, driver.run(new String[] { tableName, mrIncWorkingPathStr, outputPathStr, blurZkConnection, "1" }));
-
-    client.loadData(tableName, outputPathStr);
-
-    waitUntilAllImportsAreCompleted(client, tableName);
-
-    TableStats tableStats = client.tableStats(tableName);
-    assertEquals(1, tableStats.getRowCount());
-    assertEquals(2, tableStats.getRecordCount());
-  }
-
-  @Test
-  public void testDriverUpdateRecordToExistingRow() throws Exception {
-    FileSystem fileSystem = miniCluster.getFileSystem();
-    Path root = new Path(fileSystem.getUri() + "/");
-
-    String tableName = "testDriverUpdateRecordToExistingRow";
-    Iface client = getClient();
-    creatTable(tableName, new Path(root, "tables"), true);
-    String rowId = "row1";
-    String recordId = "record1";
-    addRow(client, tableName, rowId, recordId, "value1");
-
-    Driver driver = new Driver();
-    driver.setConf(conf);
-
-    String mrIncWorkingPathStr = new Path(root, "working").toString();
-    generateData(mrIncWorkingPathStr, rowId, recordId, "value2");
-    String outputPathStr = new Path(root, "output").toString();
-    String blurZkConnection = miniCluster.getZkConnectionString();
-
-    assertEquals(0, driver.run(new String[] { tableName, mrIncWorkingPathStr, outputPathStr, blurZkConnection, "1" }));
-    {
-      Selector selector = new Selector();
-      selector.setRowId(rowId);
-      FetchResult fetchRow = client.fetchRow(tableName, selector);
-      Row row = fetchRow.getRowResult().getRow();
-      assertEquals(rowId, row.getId());
-      List<Record> records = row.getRecords();
-      assertEquals(1, records.size());
-      Record record = records.get(0);
-      assertEquals(recordId, record.getRecordId());
-      List<Column> columns = record.getColumns();
-      assertEquals(1, columns.size());
-      Column column = columns.get(0);
-      assertEquals("col0", column.getName());
-      assertEquals("value1", column.getValue());
-    }
-
-    client.loadData(tableName, outputPathStr);
-
-    waitUntilAllImportsAreCompleted(client, tableName);
-
-    TableStats tableStats = client.tableStats(tableName);
-    assertEquals(1, tableStats.getRowCount());
-    assertEquals(1, tableStats.getRecordCount());
-
-    {
-      Selector selector = new Selector();
-      selector.setRowId(rowId);
-      FetchResult fetchRow = client.fetchRow(tableName, selector);
-      Row row = fetchRow.getRowResult().getRow();
-      assertEquals(rowId, row.getId());
-      List<Record> records = row.getRecords();
-      assertEquals(1, records.size());
-      Record record = records.get(0);
-      assertEquals(recordId, record.getRecordId());
-      List<Column> columns = record.getColumns();
-      assertEquals(1, columns.size());
-      Column column = columns.get(0);
-      assertEquals("col0", column.getName());
-      assertEquals("value2", column.getValue());
-    }
-  }
-
-  @Test
-  public void testBulkTableUpdateCommandUpdateRecordToExistingRow() throws Exception {
-    FileSystem fileSystem = miniCluster.getFileSystem();
-    Path root = new Path(fileSystem.getUri() + "/");
-
-    String tableName = "testBulkTableUpdateCommandUpdateRecordToExistingRow";
-    Iface client = getClient();
-    Path mrIncWorkingPath = new Path(new Path(root, "working"), tableName);
-    creatTable(tableName, new Path(root, "tables"), true, mrIncWorkingPath.toString());
-    String rowId = "row1";
-    String recordId = "record1";
-    addRow(client, tableName, rowId, recordId, "value1");
-
-    generateData(mrIncWorkingPath.toString(), rowId, recordId, "value2");
-
-    {
-      Selector selector = new Selector();
-      selector.setRowId(rowId);
-      FetchResult fetchRow = client.fetchRow(tableName, selector);
-      Row row = fetchRow.getRowResult().getRow();
-      assertEquals(rowId, row.getId());
-      List<Record> records = row.getRecords();
-      assertEquals(1, records.size());
-      Record record = records.get(0);
-      assertEquals(recordId, record.getRecordId());
-      List<Column> columns = record.getColumns();
-      assertEquals(1, columns.size());
-      Column column = columns.get(0);
-      assertEquals("col0", column.getName());
-      assertEquals("value1", column.getValue());
-    }
-
-    BulkTableUpdateCommand bulkTableUpdateCommand = new BulkTableUpdateCommand();
-    bulkTableUpdateCommand.setAutoLoad(true);
-    bulkTableUpdateCommand.setTable(tableName);
-    bulkTableUpdateCommand.setWaitForDataBeVisible(true);
-    bulkTableUpdateCommand.addExtraConfig(conf);
-    assertEquals(0, (int) bulkTableUpdateCommand.run(getClient()));
-
-    TableStats tableStats = client.tableStats(tableName);
-    assertEquals(1, tableStats.getRowCount());
-    assertEquals(1, tableStats.getRecordCount());
-
-    {
-      Selector selector = new Selector();
-      selector.setRowId(rowId);
-      FetchResult fetchRow = client.fetchRow(tableName, selector);
-      Row row = fetchRow.getRowResult().getRow();
-      assertEquals(rowId, row.getId());
-      List<Record> records = row.getRecords();
-      assertEquals(1, records.size());
-      Record record = records.get(0);
-      assertEquals(recordId, record.getRecordId());
-      List<Column> columns = record.getColumns();
-      assertEquals(1, columns.size());
-      Column column = columns.get(0);
-      assertEquals("col0", column.getName());
-      assertEquals("value2", column.getValue());
-    }
-  }
-
-  private void addRow(Iface client, String tableName, String rowId, String recordId, String value)
-      throws BlurException, TException {
-    List<RecordMutation> recordMutations = new ArrayList<RecordMutation>();
-    List<Column> columns = new ArrayList<Column>();
-    columns.add(new Column("col0", value));
-    Record record = new Record(recordId, "fam0", columns);
-    recordMutations.add(new RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, record));
-    RowMutation rowMutation = new RowMutation(tableName, rowId, RowMutationType.REPLACE_ROW, recordMutations);
-    client.mutate(rowMutation);
-  }
-
-  private void waitUntilAllImportsAreCompleted(Iface client, String tableName) throws BlurException, TException,
-      InterruptedException {
-    while (true) {
-      Thread.sleep(1000);
-      TableStats tableStats = client.tableStats(tableName);
-      if (tableStats.getSegmentImportInProgressCount() == 0 && tableStats.getSegmentImportPendingCount() == 0) {
-        return;
-      }
-    }
-  }
-
-  private void generateData(String mrIncWorkingPathStr, String rowId, String recordId, String value) throws IOException {
-    Path path = new Path(new Path(mrIncWorkingPathStr), "new");
-    Writer writer = new SequenceFile.Writer(miniCluster.getFileSystem(), conf, new Path(path, UUID.randomUUID()
-        .toString()), Text.class, BlurRecord.class);
-    BlurRecord blurRecord = new BlurRecord();
-    blurRecord.setRowId(rowId);
-    blurRecord.setRecordId(recordId);
-    blurRecord.setFamily("fam0");
-    blurRecord.addColumn("col0", value);
-    writer.append(new Text(rowId), blurRecord);
-    writer.close();
-  }
-
-  private void generateData(String mrIncWorkingPathStr) throws IOException {
-    generateData(mrIncWorkingPathStr, "row1", "record-" + System.currentTimeMillis(), "val0");
-  }
-
-  private void creatTable(String tableName, Path tables, boolean fastDisable) throws BlurException, TException {
-    creatTable(tableName, tables, fastDisable, null);
-  }
-
-  private void creatTable(String tableName, Path tables, boolean fastDisable, String workingPath) throws BlurException,
-      TException {
-    Path tablePath = new Path(tables, tableName);
-    Iface client = getClient();
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setTableUri(tablePath.toString());
-    tableDescriptor.setName(tableName);
-    tableDescriptor.setShardCount(2);
-    tableDescriptor.putToTableProperties(BlurConstants.BLUR_TABLE_DISABLE_FAST_DIR, Boolean.toString(fastDisable));
-    if (workingPath != null) {
-      tableDescriptor.putToTableProperties(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH, workingPath);
-    }
-    client.createTable(tableDescriptor);
-
-    ColumnDefinition colDef = new ColumnDefinition();
-    colDef.setFamily("fam0");
-    colDef.setColumnName("col0");
-    colDef.setFieldType("string");
-    client.addColumnDefinition(tableName, colDef);
-  }
-
-  private Iface getClient() {
-    return BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTestIT.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTestIT.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTestIT.java
new file mode 100644
index 0000000..f9b839c
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTestIT.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.SuiteCluster;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.ColumnDefinition;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RecordMutationType;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.RowMutationType;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class DriverTestIT {
+
+  private static MiniCluster miniCluster;
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void startup() throws IOException, BlurException, TException {
+    SuiteCluster.setupMiniCluster(DriverTestIT.class);
+    miniCluster = SuiteCluster.getMiniCluster();
+    conf = miniCluster.getMRConfiguration();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    SuiteCluster.shutdownMiniCluster(DriverTestIT.class);
+  }
+
+  @Test
+  public void testDriverAddSingleRowWithSingleRecord() throws Exception {
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    Path root = new Path(fileSystem.getUri() + "/");
+
+    String tableName = "testDriverAddSingleRowWithSingleRecord";
+    creatTable(tableName, new Path(root, "tables"), true);
+
+    Driver driver = new Driver();
+    driver.setConf(conf);
+
+    String mrIncWorkingPathStr = new Path(root, "working").toString();
+    generateData(mrIncWorkingPathStr);
+    String outputPathStr = new Path(root, "output").toString();
+    String blurZkConnection = miniCluster.getZkConnectionString();
+
+    assertEquals(0, driver.run(new String[] { tableName, mrIncWorkingPathStr, outputPathStr, blurZkConnection, "1" }));
+
+    Iface client = getClient();
+    client.loadData(tableName, outputPathStr);
+
+    waitUntilAllImportsAreCompleted(client, tableName);
+
+    TableStats tableStats = client.tableStats(tableName);
+    assertEquals(1, tableStats.getRowCount());
+    assertEquals(1, tableStats.getRecordCount());
+  }
+
+  @Test
+  public void testDriverAddSingleRecordToExistingRow() throws Exception {
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    Path root = new Path(fileSystem.getUri() + "/");
+
+    String tableName = "testDriverAddSingleRecordToExistingRow";
+    Iface client = getClient();
+    creatTable(tableName, new Path(root, "tables"), true);
+    addRow(client, tableName, "row1", "record1", "value1");
+
+    Driver driver = new Driver();
+    driver.setConf(conf);
+
+    String mrIncWorkingPathStr = new Path(root, "working").toString();
+    generateData(mrIncWorkingPathStr);
+    String outputPathStr = new Path(root, "output").toString();
+    String blurZkConnection = miniCluster.getZkConnectionString();
+
+    assertEquals(0, driver.run(new String[] { tableName, mrIncWorkingPathStr, outputPathStr, blurZkConnection, "1" }));
+
+    client.loadData(tableName, outputPathStr);
+
+    waitUntilAllImportsAreCompleted(client, tableName);
+
+    TableStats tableStats = client.tableStats(tableName);
+    assertEquals(1, tableStats.getRowCount());
+    assertEquals(2, tableStats.getRecordCount());
+  }
+
+  @Test
+  public void testDriverUpdateRecordToExistingRow() throws Exception {
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    Path root = new Path(fileSystem.getUri() + "/");
+
+    String tableName = "testDriverUpdateRecordToExistingRow";
+    Iface client = getClient();
+    creatTable(tableName, new Path(root, "tables"), true);
+    String rowId = "row1";
+    String recordId = "record1";
+    addRow(client, tableName, rowId, recordId, "value1");
+
+    Driver driver = new Driver();
+    driver.setConf(conf);
+
+    String mrIncWorkingPathStr = new Path(root, "working").toString();
+    generateData(mrIncWorkingPathStr, rowId, recordId, "value2");
+    String outputPathStr = new Path(root, "output").toString();
+    String blurZkConnection = miniCluster.getZkConnectionString();
+
+    assertEquals(0, driver.run(new String[] { tableName, mrIncWorkingPathStr, outputPathStr, blurZkConnection, "1" }));
+    {
+      Selector selector = new Selector();
+      selector.setRowId(rowId);
+      FetchResult fetchRow = client.fetchRow(tableName, selector);
+      Row row = fetchRow.getRowResult().getRow();
+      assertEquals(rowId, row.getId());
+      List<Record> records = row.getRecords();
+      assertEquals(1, records.size());
+      Record record = records.get(0);
+      assertEquals(recordId, record.getRecordId());
+      List<Column> columns = record.getColumns();
+      assertEquals(1, columns.size());
+      Column column = columns.get(0);
+      assertEquals("col0", column.getName());
+      assertEquals("value1", column.getValue());
+    }
+
+    client.loadData(tableName, outputPathStr);
+
+    waitUntilAllImportsAreCompleted(client, tableName);
+
+    TableStats tableStats = client.tableStats(tableName);
+    assertEquals(1, tableStats.getRowCount());
+    assertEquals(1, tableStats.getRecordCount());
+
+    {
+      Selector selector = new Selector();
+      selector.setRowId(rowId);
+      FetchResult fetchRow = client.fetchRow(tableName, selector);
+      Row row = fetchRow.getRowResult().getRow();
+      assertEquals(rowId, row.getId());
+      List<Record> records = row.getRecords();
+      assertEquals(1, records.size());
+      Record record = records.get(0);
+      assertEquals(recordId, record.getRecordId());
+      List<Column> columns = record.getColumns();
+      assertEquals(1, columns.size());
+      Column column = columns.get(0);
+      assertEquals("col0", column.getName());
+      assertEquals("value2", column.getValue());
+    }
+  }
+
+  @Test
+  public void testBulkTableUpdateCommandUpdateRecordToExistingRow() throws Exception {
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    Path root = new Path(fileSystem.getUri() + "/");
+
+    String tableName = "testBulkTableUpdateCommandUpdateRecordToExistingRow";
+    Iface client = getClient();
+    Path mrIncWorkingPath = new Path(new Path(root, "working"), tableName);
+    creatTable(tableName, new Path(root, "tables"), true, mrIncWorkingPath.toString());
+    String rowId = "row1";
+    String recordId = "record1";
+    addRow(client, tableName, rowId, recordId, "value1");
+
+    generateData(mrIncWorkingPath.toString(), rowId, recordId, "value2");
+
+    {
+      Selector selector = new Selector();
+      selector.setRowId(rowId);
+      FetchResult fetchRow = client.fetchRow(tableName, selector);
+      Row row = fetchRow.getRowResult().getRow();
+      assertEquals(rowId, row.getId());
+      List<Record> records = row.getRecords();
+      assertEquals(1, records.size());
+      Record record = records.get(0);
+      assertEquals(recordId, record.getRecordId());
+      List<Column> columns = record.getColumns();
+      assertEquals(1, columns.size());
+      Column column = columns.get(0);
+      assertEquals("col0", column.getName());
+      assertEquals("value1", column.getValue());
+    }
+
+    BulkTableUpdateCommand bulkTableUpdateCommand = new BulkTableUpdateCommand();
+    bulkTableUpdateCommand.setAutoLoad(true);
+    bulkTableUpdateCommand.setTable(tableName);
+    bulkTableUpdateCommand.setWaitForDataBeVisible(true);
+    bulkTableUpdateCommand.addExtraConfig(conf);
+    assertEquals(0, (int) bulkTableUpdateCommand.run(getClient()));
+
+    TableStats tableStats = client.tableStats(tableName);
+    assertEquals(1, tableStats.getRowCount());
+    assertEquals(1, tableStats.getRecordCount());
+
+    {
+      Selector selector = new Selector();
+      selector.setRowId(rowId);
+      FetchResult fetchRow = client.fetchRow(tableName, selector);
+      Row row = fetchRow.getRowResult().getRow();
+      assertEquals(rowId, row.getId());
+      List<Record> records = row.getRecords();
+      assertEquals(1, records.size());
+      Record record = records.get(0);
+      assertEquals(recordId, record.getRecordId());
+      List<Column> columns = record.getColumns();
+      assertEquals(1, columns.size());
+      Column column = columns.get(0);
+      assertEquals("col0", column.getName());
+      assertEquals("value2", column.getValue());
+    }
+  }
+
+  private void addRow(Iface client, String tableName, String rowId, String recordId, String value)
+      throws BlurException, TException {
+    List<RecordMutation> recordMutations = new ArrayList<RecordMutation>();
+    List<Column> columns = new ArrayList<Column>();
+    columns.add(new Column("col0", value));
+    Record record = new Record(recordId, "fam0", columns);
+    recordMutations.add(new RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, record));
+    RowMutation rowMutation = new RowMutation(tableName, rowId, RowMutationType.REPLACE_ROW, recordMutations);
+    client.mutate(rowMutation);
+  }
+
+  private void waitUntilAllImportsAreCompleted(Iface client, String tableName) throws BlurException, TException,
+      InterruptedException {
+    while (true) {
+      Thread.sleep(1000);
+      TableStats tableStats = client.tableStats(tableName);
+      if (tableStats.getSegmentImportInProgressCount() == 0 && tableStats.getSegmentImportPendingCount() == 0) {
+        return;
+      }
+    }
+  }
+
+  private void generateData(String mrIncWorkingPathStr, String rowId, String recordId, String value) throws IOException {
+    Path path = new Path(new Path(mrIncWorkingPathStr), "new");
+    Writer writer = new SequenceFile.Writer(miniCluster.getFileSystem(), conf, new Path(path, UUID.randomUUID()
+        .toString()), Text.class, BlurRecord.class);
+    BlurRecord blurRecord = new BlurRecord();
+    blurRecord.setRowId(rowId);
+    blurRecord.setRecordId(recordId);
+    blurRecord.setFamily("fam0");
+    blurRecord.addColumn("col0", value);
+    writer.append(new Text(rowId), blurRecord);
+    writer.close();
+  }
+
+  private void generateData(String mrIncWorkingPathStr) throws IOException {
+    generateData(mrIncWorkingPathStr, "row1", "record-" + System.currentTimeMillis(), "val0");
+  }
+
+  private void creatTable(String tableName, Path tables, boolean fastDisable) throws BlurException, TException {
+    creatTable(tableName, tables, fastDisable, null);
+  }
+
+  private void creatTable(String tableName, Path tables, boolean fastDisable, String workingPath) throws BlurException,
+      TException {
+    Path tablePath = new Path(tables, tableName);
+    Iface client = getClient();
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setTableUri(tablePath.toString());
+    tableDescriptor.setName(tableName);
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.putToTableProperties(BlurConstants.BLUR_TABLE_DISABLE_FAST_DIR, Boolean.toString(fastDisable));
+    if (workingPath != null) {
+      tableDescriptor.putToTableProperties(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH, workingPath);
+    }
+    client.createTable(tableDescriptor);
+
+    ColumnDefinition colDef = new ColumnDefinition();
+    colDef.setFamily("fam0");
+    colDef.setColumnName("col0");
+    colDef.setFieldType("string");
+    client.addColumnDefinition(tableName, colDef);
+  }
+
+  private Iface getClient() {
+    return BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-query/src/main/java/org/apache/blur/lucene/search/IndexSearcherCloseableBase.java
----------------------------------------------------------------------
diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/IndexSearcherCloseableBase.java b/blur-query/src/main/java/org/apache/blur/lucene/search/IndexSearcherCloseableBase.java
index 829fef5..4d473c4 100644
--- a/blur-query/src/main/java/org/apache/blur/lucene/search/IndexSearcherCloseableBase.java
+++ b/blur-query/src/main/java/org/apache/blur/lucene/search/IndexSearcherCloseableBase.java
@@ -78,6 +78,8 @@ public abstract class IndexSearcherCloseableBase extends IndexSearcher implement
             Throwable cause = e.getCause();
             if (cause instanceof IOException) {
               throw (IOException) cause;
+            } else if (cause instanceof RuntimeException) {
+              throw (RuntimeException) cause;
             } else {
               throw new RuntimeException(cause);
             }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-shell/pom.xml
----------------------------------------------------------------------
diff --git a/blur-shell/pom.xml b/blur-shell/pom.xml
index abc720c..af351ee 100644
--- a/blur-shell/pom.xml
+++ b/blur-shell/pom.xml
@@ -55,12 +55,24 @@
 		<plugins>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-source-plugin</artifactId>
 				<executions>
 					<execution>
 						<id>attach-sources</id>
 						<goals>
 							<goal>jar</goal>
+							<goal>test-jar</goal>
 						</goals>
 					</execution>
 				</executions>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-shell/src/main/java/org/apache/blur/shell/RemoveShardServerCommand.java
----------------------------------------------------------------------
diff --git a/blur-shell/src/main/java/org/apache/blur/shell/RemoveShardServerCommand.java b/blur-shell/src/main/java/org/apache/blur/shell/RemoveShardServerCommand.java
index e1c3941..9c022e3 100644
--- a/blur-shell/src/main/java/org/apache/blur/shell/RemoveShardServerCommand.java
+++ b/blur-shell/src/main/java/org/apache/blur/shell/RemoveShardServerCommand.java
@@ -28,6 +28,7 @@ import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.generated.Blur;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -46,7 +47,7 @@ public class RemoveShardServerCommand extends Command {
         BlurConfiguration configuration = new BlurConfiguration();
         String connectString = configuration.get(BlurConstants.BLUR_ZOOKEEPER_CONNECTION);
         int sessionTimeout = configuration.getInt(BlurConstants.BLUR_ZOOKEEPER_TIMEOUT, 30000);
-        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
+        zooKeeper = new ZooKeeperClient(connectString, sessionTimeout, new Watcher() {
           @Override
           public void process(WatchedEvent event) {
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-status/pom.xml
----------------------------------------------------------------------
diff --git a/blur-status/pom.xml b/blur-status/pom.xml
index 5c3fb38..021e120 100644
--- a/blur-status/pom.xml
+++ b/blur-status/pom.xml
@@ -73,12 +73,24 @@
 		<plugins>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-source-plugin</artifactId>
 				<executions>
 					<execution>
 						<id>attach-sources</id>
 						<goals>
 							<goal>jar</goal>
+							<goal>test-jar</goal>
 						</goals>
 					</execution>
 				</executions>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-util/pom.xml
----------------------------------------------------------------------
diff --git a/blur-util/pom.xml b/blur-util/pom.xml
index 462f5f8..a238b7d 100644
--- a/blur-util/pom.xml
+++ b/blur-util/pom.xml
@@ -98,24 +98,20 @@
 	</repositories>
 
 	<build>
-		<pluginManagement>
-			<plugins>
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-jar-plugin</artifactId>
-					<executions>
-						<execution>
-							<goals>
-								<goal>test-jar</goal>
-							</goals>
-						</execution>
-					</executions>
-				</plugin>
-			</plugins>
-		</pluginManagement>
 		<plugins>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-source-plugin</artifactId>
 				<executions>
 					<execution>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java b/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
index 05a1153..5e47aa8 100644
--- a/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
+++ b/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
@@ -43,7 +43,7 @@ public class Executors {
 
   public static ExecutorService newThreadPool(BlockingQueue<Runnable> workQueue, String prefix, int threadCount,
       boolean watch) {
-    BlurThreadPoolExecutor executorService = new BlurThreadPoolExecutor(threadCount, threadCount, 60L,
+    BlurThreadPoolExecutor executorService = new BlurThreadPoolExecutor(threadCount, threadCount, 10L,
         TimeUnit.SECONDS, workQueue, new BlurThreadFactory(prefix));
     executorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
     executorService.add(new UserThreadBoundaryProcessor());


Mime
View raw message