incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [4/4] git commit: Massive changes to the map reduce code, working on BLUR-83.
Date Tue, 14 May 2013 16:27:11 GMT
Massive changes to the map reduce code, working on BLUR-83.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/e3699254
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/e3699254
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/e3699254

Branch: refs/heads/0.1.5
Commit: e3699254be257f138ee58f86ba51471469625b62
Parents: ddb9ad7
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue May 14 12:23:24 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue May 14 12:23:24 2013 -0400

----------------------------------------------------------------------
 .../indexserver/DefaultBlurIndexWarmup.java        |    2 +-
 .../apache/blur/manager/writer/BlurNRTIndex.java   |   35 +++-
 .../apache/blur/manager/writer/IndexImporter.java  |  143 +++++++++++++++
 .../apache/blur/thrift/ThriftBlurShardServer.java  |    5 +
 .../java/org/apache/blur/thrift/ThriftServer.java  |   12 ++
 .../src/test/java/org/apache/blur/MiniCluster.java |    8 +-
 src/blur-mapred/pom.xml                            |   14 ++-
 .../blur/mapred/AbstractOutputCommitter.java       |   83 +++++++++
 .../apache/blur/mapred/BlurOutputCommitter.java    |   29 +++
 .../java/org/apache/blur/mapreduce/BlurColumn.java |   32 ++++
 .../java/org/apache/blur/mapreduce/BlurMapper.java |   11 +-
 .../java/org/apache/blur/mapreduce/BlurMutate.java |   87 ++++++++-
 .../java/org/apache/blur/mapreduce/BlurRecord.java |   44 +++++-
 .../org/apache/blur/mapreduce/BlurReducer.java     |    8 +-
 .../java/org/apache/blur/mapreduce/BlurTask.java   |   13 +-
 .../mapreduce/csv/BlurExampleIndexerRebuild.java   |   79 ++++++++
 .../mapreduce/csv/BlurExampleIndexerUpdate.java    |   62 +++++++
 .../apache/blur/mapreduce/csv/CsvBlurMapper.java   |  124 +++++++++++++
 .../example/BlurExampleIndexerRebuild.java         |   79 --------
 .../example/BlurExampleIndexerUpdate.java          |   62 -------
 .../blur/mapreduce/example/BlurExampleMapper.java  |   51 -----
 .../blur/mapreduce/lib/BlurOutputCommitter.java    |   57 ++++--
 .../blur/mapreduce/lib/BlurOutputFormat.java       |   88 ++++++++-
 .../blur/mapreduce/lib/BlurRecordWriter.java       |   88 +++++-----
 .../blur/mapreduce/lib/DefaultBlurReducer.java     |   18 ++
 .../org/apache/blur/mapreduce/BlurReducerTest.java |   57 ++++++
 .../blur/mapreduce/lib/BlurOutputFormatTest.java   |  101 ++++++++++
 .../apache/blur/lucene/LuceneVersionConstant.java  |    2 +-
 .../refcounter/DirectoryReferenceCounter.java      |    8 +-
 .../blur/store/blockcache/BlockDirectory.java      |    8 +-
 .../apache/blur/store/hdfs/DirectoryDecorator.java |    9 +
 .../org/apache/blur/store/hdfs/HdfsDirectory.java  |   18 +-
 src/pom.xml                                        |    1 +
 33 files changed, 1129 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
index b7a626c..5f18dc0 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
@@ -36,7 +36,7 @@ public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
   @Override
   public void warmBlurIndex(final TableDescriptor table, final String shard, IndexReader reader,
       AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
-    LOG.info("Runngin warmup for reader [{0}]", reader);
+    LOG.info("Running warmup for reader [{0}]", reader);
     try {
       FieldBasedWarmer warmer = new FieldBasedWarmer(table);
       for (IndexReaderContext context : reader.getContext().leaves()) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index c6fabd5..c10c713 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
@@ -63,8 +65,11 @@ public class BlurNRTIndex extends BlurIndex {
   private final ShardContext _shardContext;
   private final TransactionRecorder _recorder;
   private final TrackingIndexWriter _trackingWriter;
-
+  // This lock is used during a import of data from the file system. For example
+  // after a mapreduce program.
+  private final ReadWriteLock _lock = new ReentrantReadWriteLock();
   private long _lastRefresh = 0;
+  private IndexImporter _indexImporter;
 
   public BlurNRTIndex(ShardContext shardContext, SharedMergeScheduler mergeScheduler, IndexInputCloser closer,
       Directory directory, DirectoryReferenceFileGC gc, final ExecutorService searchExecutor) throws IOException {
@@ -96,6 +101,7 @@ public class BlurNRTIndex extends BlurIndex {
     };
 
     _trackingWriter = new TrackingIndexWriter(_writer);
+    _indexImporter = new IndexImporter(_trackingWriter, _lock, _shardContext, TimeUnit.SECONDS, 10);
     _nrtManagerRef.set(new NRTManager(_trackingWriter, _searcherFactory, APPLY_ALL_DELETES));
     // start commiter
 
@@ -114,19 +120,29 @@ public class BlurNRTIndex extends BlurIndex {
 
   @Override
   public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
-    List<Record> records = row.records;
-    if (records == null || records.isEmpty()) {
-      deleteRow(waitToBeVisible, wal, row.id);
-      return;
+    _lock.readLock().lock();
+    try {
+      List<Record> records = row.records;
+      if (records == null || records.isEmpty()) {
+        deleteRow(waitToBeVisible, wal, row.id);
+        return;
+      }
+      long generation = _recorder.replaceRow(wal, row, _trackingWriter);
+      waitToBeVisible(waitToBeVisible, generation);
+    } finally {
+      _lock.readLock().unlock();
     }
-    long generation = _recorder.replaceRow(wal, row, _trackingWriter);
-    waitToBeVisible(waitToBeVisible, generation);
   }
 
   @Override
   public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
-    long generation = _recorder.deleteRow(wal, rowId, _trackingWriter);
-    waitToBeVisible(waitToBeVisible, generation);
+    _lock.readLock().lock();
+    try {
+      long generation = _recorder.deleteRow(wal, rowId, _trackingWriter);
+      waitToBeVisible(waitToBeVisible, generation);
+    } finally {
+      _lock.readLock().unlock();
+    }
   }
 
   /**
@@ -147,6 +163,7 @@ public class BlurNRTIndex extends BlurIndex {
     // @TODO make sure that locks are cleaned up.
     if (!_isClosed.get()) {
       _isClosed.set(true);
+      _indexImporter.close();
       _committer.interrupt();
       _refresher.close();
       try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
new file mode 100644
index 0000000..5c09ab3
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -0,0 +1,143 @@
+package org.apache.blur.manager.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.CompositeReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+
+public class IndexImporter extends TimerTask implements Closeable {
+  private final static Log LOG = LogFactory.getLog(IndexImporter.class);
+
+  private final TrackingIndexWriter _trackingWriter;
+  private final ReadWriteLock _lock;
+  private final ShardContext _shardContext;
+  private final Timer _timer;
+
+  public IndexImporter(TrackingIndexWriter trackingWriter, ReadWriteLock lock, ShardContext shardContext,
+      TimeUnit refreshUnit, long refreshAmount) {
+    _trackingWriter = trackingWriter;
+    _lock = lock;
+    _shardContext = shardContext;
+    _timer = new Timer("IndexImporter [" + shardContext.getShard() + "/" + shardContext.getTableContext().getTable()
+        + "]", true);
+    long period = refreshUnit.toMillis(refreshAmount);
+    _timer.schedule(this, period, period);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _timer.cancel();
+    _timer.purge();
+  }
+
+  @Override
+  public void run() {
+    Path path = _shardContext.getHdfsDirPath();
+    Configuration configuration = _shardContext.getTableContext().getConfiguration();
+    try {
+      FileSystem fileSystem = path.getFileSystem(configuration);
+      SortedSet<FileStatus> listStatus = sort(fileSystem.listStatus(path));
+      List<HdfsDirectory> indexesToImport = new ArrayList<HdfsDirectory>();
+      for (FileStatus fileStatus : listStatus) {
+        Path file = fileStatus.getPath();
+        if (fileStatus.isDir() && file.getName().endsWith(".commit")) {
+          HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, file);
+          if (!DirectoryReader.indexExists(hdfsDirectory)) {
+            LOG.error("Directory found at [{0}] is not a vaild index.", file);
+          } else {
+            indexesToImport.add(hdfsDirectory);
+          }
+        }
+      }
+      if (indexesToImport.isEmpty()) {
+        return;
+      }
+      String table = _shardContext.getTableContext().getTable();
+      String shard = _shardContext.getShard();
+      for (Directory directory : indexesToImport) {
+        LOG.info("About to import [{0}] into [{1}/{2}]", directory, shard, table);
+      }
+      LOG.info("Obtaining lock on [{0}/{1}]", shard, table);
+      _lock.writeLock().lock();
+      try {
+        IndexWriter indexWriter = _trackingWriter.getIndexWriter();
+        for (HdfsDirectory directory : indexesToImport) {
+          LOG.info("Starting import [{0}], commiting on [{1}/{2}]", directory, shard, table);
+          indexWriter.commit();
+          applyDeletes(directory, indexWriter);
+          LOG.info("Add index [{0}] [{1}/{2}]", directory, shard, table);
+          indexWriter.addIndexes(directory);
+          LOG.info("Finishing import [{0}], commiting on [{1}/{2}]", directory, shard, table);
+          indexWriter.commit();
+          Path dirPath = directory.getPath();
+          LOG.info("Cleaning up old directory [{0}] for [{1}/{2}]", dirPath, shard, table);
+          fileSystem.delete(dirPath, true);
+          LOG.info("Import complete on [{0}/{1}]", shard, table);
+        }
+      } finally {
+        _lock.writeLock().unlock();
+      }
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to refresh imports.", e);
+    }
+
+  }
+
+  private SortedSet<FileStatus> sort(FileStatus[] listStatus) {
+    SortedSet<FileStatus> result = new TreeSet<FileStatus>();
+    for (FileStatus fileStatus : listStatus) {
+      result.add(fileStatus);
+    }
+    return result;
+  }
+
+  private void applyDeletes(Directory directory, IndexWriter indexWriter) throws IOException {
+    DirectoryReader reader = DirectoryReader.open(directory);
+    try {
+      LOG.info("Applying deletes in reader [{0}]", reader);
+      CompositeReaderContext compositeReaderContext = reader.getContext();
+      List<AtomicReaderContext> leaves = compositeReaderContext.leaves();
+      for (AtomicReaderContext context : leaves) {
+        AtomicReader atomicReader = context.reader();
+        Fields fields = atomicReader.fields();
+        Terms terms = fields.terms(BlurConstants.ROW_ID);
+        TermsEnum termsEnum = terms.iterator(null);
+        BytesRef ref = null;
+        while ((ref = termsEnum.next()) != null) {
+          Term term = new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref));
+          indexWriter.deleteDocuments(term);
+        }
+      }
+    } finally {
+      reader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 425e04a..538b02f 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -59,6 +59,7 @@ import org.apache.blur.manager.indexserver.DistributedIndexServer;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
 import org.apache.blur.metrics.JSONReporter;
 import org.apache.blur.metrics.JSONReporterServlet;
+import org.apache.blur.server.ShardServerEventHandler;
 import org.apache.blur.store.blockcache.BlockCache;
 import org.apache.blur.store.blockcache.BlockDirectory;
 import org.apache.blur.store.blockcache.BlockDirectoryCache;
@@ -70,6 +71,7 @@ import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.zookeeper.ZkUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.server.TServlet;
 import org.apache.zookeeper.ZooKeeper;
 import org.mortbay.jetty.servlet.ServletHolder;
@@ -213,6 +215,8 @@ public class ThriftBlurShardServer extends ThriftServer {
 
     int threadCount = configuration.getInt(BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT, 32);
 
+    ShardServerEventHandler eventHandler = new ShardServerEventHandler();
+    
     final ThriftBlurShardServer server = new ThriftBlurShardServer();
     server.setNodeName(nodeName);
     server.setBindAddress(bindAddress);
@@ -220,6 +224,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     server.setThreadCount(threadCount);
     server.setIface(iface);
     server.setConfiguration(configuration);
+    server.setEventHandler(eventHandler);
 
     // This will shutdown the server when the correct path is set in zk
     BlurShutdown shutdown = new BlurShutdown() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
index 49e6b9a..e2b0fe9 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
@@ -35,6 +35,7 @@ import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.server.TThreadedSelectorServer;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TTransportException;
@@ -55,6 +56,7 @@ public class ThriftServer {
   private ExecutorService _executorService;
   private ExecutorService _queryExexutorService;
   private ExecutorService _mutateExecutorService;
+  private TServerEventHandler _eventHandler;
 
   public static void printUlimits() throws IOException {
     ProcessBuilder processBuilder = new ProcessBuilder("ulimit", "-a");
@@ -114,6 +116,7 @@ public class ThriftServer {
     args.transportFactory(new TFramedTransport.Factory());
     args.protocolFactory(new TBinaryProtocol.Factory(true, true));
     _server = new TThreadedSelectorServer(args);
+    _server.setServerEventHandler(_eventHandler);
     LOG.info("Starting server [{0}]", _nodeName);
     _server.serve();
   }
@@ -192,4 +195,13 @@ public class ThriftServer {
   public void setShutdown(BlurShutdown shutdown) {
     this._shutdown = shutdown;
   }
+
+  public TServerEventHandler getEventHandler() {
+    return _eventHandler;
+  }
+
+  public void setEventHandler(TServerEventHandler eventHandler) {
+    _eventHandler = eventHandler;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java b/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
index 94ed435..c2ffc9d 100644
--- a/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
+++ b/src/blur-core/src/test/java/org/apache/blur/MiniCluster.java
@@ -88,8 +88,8 @@ public abstract class MiniCluster {
 
   public static void main(String[] args) throws IOException, InterruptedException, KeeperException, BlurException,
       TException {
-    startDfs("./tmp");
-    startZooKeeper("./tmp");
+    startDfs("./tmp/hdfs");
+    startZooKeeper("./tmp/zk");
     startControllers(1);
     startShards(1);
 
@@ -129,8 +129,8 @@ public abstract class MiniCluster {
   }
 
   public static void startBlurCluster(String path, int controllerCount, int shardCount) {
-    startDfs(path);
-    startZooKeeper(path);
+    startDfs(path + "/hdfs");
+    startZooKeeper(path + "/zk");
     setupBuffers();
     startControllers(controllerCount);
     startShards(shardCount);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/pom.xml
----------------------------------------------------------------------
diff --git a/src/blur-mapred/pom.xml b/src/blur-mapred/pom.xml
index 29511b8..5e09359 100644
--- a/src/blur-mapred/pom.xml
+++ b/src/blur-mapred/pom.xml
@@ -16,7 +16,7 @@
 		<groupId>org.apache.blur</groupId>
 		<artifactId>blur</artifactId>
 		<version>0.1.5</version>
-        <relativePath>../pom.xml</relativePath>
+		<relativePath>../pom.xml</relativePath>
 	</parent>
 	<groupId>org.apache.blur</groupId>
 	<artifactId>blur-mapred</artifactId>
@@ -69,6 +69,18 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.mrunit</groupId>
+			<artifactId>mrunit</artifactId>
+			<version>${mrunit.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-test</artifactId>
+			<version>${hadoop.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<repositories>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java b/src/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
new file mode 100644
index 0000000..65bc9ea
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
@@ -0,0 +1,83 @@
+package org.apache.blur.mapred;
+
+import java.io.IOException;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+
+public abstract class AbstractOutputCommitter extends OutputCommitter {
+
+  private final static Log LOG = LogFactory.getLog(AbstractOutputCommitter.class);
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    // look through all the shards for attempts that need to be cleaned up.
+    // also find all the attempts that are finished
+    // then rename all the attempts jobs to commits
+    LOG.info("Commiting Job [{0}]",jobContext.getJobID());
+    Configuration configuration = jobContext.getConfiguration();
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
+      if (isShard(fileStatus)) {
+        commitJob(jobContext, fileStatus.getPath());
+      }
+    }
+
+  }
+
+  private void commitJob(JobContext jobContext, Path shardPath) throws IOException {
+    FileSystem fileSystem = shardPath.getFileSystem(jobContext.getConfiguration());
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath);
+    for (FileStatus fileStatus : listStatus) {
+      Path path = fileStatus.getPath();
+      String name = path.getName();
+      if (fileStatus.isDir() && name.endsWith(".task_complete")) {
+        String taskAttemptName = getTaskAttemptName(name);
+        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptName);
+        if (taskAttemptID.getJobID().equals(jobContext.getJobID())) {
+          fileSystem.rename(path, new Path(shardPath, taskAttemptName + ".commit"));
+          LOG.info("Committing [{0}] in path [{1}]", taskAttemptID, path);
+        }
+      }
+    }
+  }
+
+  private String getTaskAttemptName(String name) {
+    int lastIndexOf = name.lastIndexOf('.');
+    return name.substring(0, lastIndexOf);
+  }
+
+  private boolean isShard(FileStatus fileStatus) {
+    return isShard(fileStatus.getPath());
+  }
+
+  private boolean isShard(Path path) {
+    return path.getName().startsWith(BlurConstants.SHARD_PREFIX);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, int status) throws IOException {
+    System.out.println("abortJob");
+  }
+
+  @Override
+  public void cleanupJob(JobContext context) throws IOException {
+    System.out.println("cleanupJob");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurOutputCommitter.java b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurOutputCommitter.java
new file mode 100644
index 0000000..1c3d2ca
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurOutputCommitter.java
@@ -0,0 +1,29 @@
+package org.apache.blur.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskAttemptContext;
+
+public class BlurOutputCommitter extends AbstractOutputCommitter {
+
+  @Override
+  public void setupTask(TaskAttemptContext taskContext) throws IOException {
+
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext taskContext) throws IOException {
+
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext taskContext) throws IOException {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurColumn.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurColumn.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurColumn.java
index 2ddc11f..e394ac0 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurColumn.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurColumn.java
@@ -74,4 +74,36 @@ public class BlurColumn implements Writable {
   public String toString() {
     return "{name=" + name + ", value=" + value + "}";
   }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurColumn other = (BlurColumn) obj;
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
+      return false;
+    if (value == null) {
+      if (other.value != null)
+        return false;
+    } else if (!value.equals(other.value))
+      return false;
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java
index 4e301eb..9a3575e 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java
@@ -18,14 +18,14 @@ package org.apache.blur.mapreduce;
  */
 import java.io.IOException;
 
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Mapper;
 
-public abstract class BlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, BytesWritable, BlurMutate> {
+public abstract class BlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, Text, BlurMutate> {
 
   protected BlurMutate _mutate;
-  protected BytesWritable _key;
+  protected Text _key;
   protected BlurTask _blurTask;
   protected Counter _recordCounter;
   protected Counter _fieldCounter;
@@ -33,7 +33,7 @@ public abstract class BlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, BytesWri
   @Override
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
-    long maxRecordCount = _blurTask.getMaxRecordCount();
+    long maxRecordCount = _blurTask == null ? Long.MAX_VALUE : _blurTask.getMaxRecordCount();
     if (maxRecordCount == -1) {
       maxRecordCount = Long.MAX_VALUE;
     }
@@ -47,7 +47,8 @@ public abstract class BlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, BytesWri
   protected void setup(Context context) throws IOException, InterruptedException {
     _blurTask = BlurTask.read(context.getConfiguration());
     _mutate = new BlurMutate();
-    _key = new BytesWritable();
+    _mutate.setRecord(new BlurRecord());
+    _key = new Text();
     _recordCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRecordCounterName());
     _fieldCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getFieldCounterName());
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMutate.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMutate.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMutate.java
index 573f9c3..c7615ea 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMutate.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMutate.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.io.Writable;
 public class BlurMutate implements Writable {
 
   public enum MUTATE_TYPE {
-    ADD(0), UPDATE(1), DELETE(2);
+    /*ADD(0), UPDATE(1),*/ DELETE(2), REPLACE(3);
     private int _value;
 
     private MUTATE_TYPE(int value) {
@@ -38,21 +38,55 @@ public class BlurMutate implements Writable {
 
     public MUTATE_TYPE find(int value) {
       switch (value) {
-      case 0:
-        return ADD;
-      case 1:
-        return UPDATE;
+   // @TODO Updates through MR is going to be disabled
+//      case 0:
+//        return ADD;
+//      case 1:
+//        return UPDATE;
       case 2:
         return DELETE;
+      case 3:
+        return REPLACE;
       default:
         throw new RuntimeException("Value [" + value + "] not found.");
       }
     }
   }
 
-  private MUTATE_TYPE _mutateType = MUTATE_TYPE.UPDATE;
+  private MUTATE_TYPE _mutateType = MUTATE_TYPE.REPLACE;
   private BlurRecord _record = new BlurRecord();
 
+  public BlurMutate() {
+
+  }
+
+  public BlurMutate(MUTATE_TYPE type, BlurRecord record) {
+    _mutateType = type;
+    _record = record;
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+    _record.setRecordId(recordId);
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId, String family) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+    _record.setRecordId(recordId);
+    _record.setFamily(family);
+  }
+
+  public BlurMutate addColumn(BlurColumn column) {
+    _record.addColumn(column);
+    return this;
+  }
+
+  public BlurMutate addColumn(String name, String value) {
+    return addColumn(new BlurColumn(name, value));
+  }
+
   public BlurRecord getRecord() {
     return _record;
   }
@@ -77,8 +111,47 @@ public class BlurMutate implements Writable {
     return _mutateType;
   }
 
-  public void setMutateType(MUTATE_TYPE mutateType) {
+  public BlurMutate setMutateType(MUTATE_TYPE mutateType) {
     _mutateType = mutateType;
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "BlurMutate [mutateType=" + _mutateType + ", record=" + _record + "]";
+  }
+
+  public BlurMutate setFamily(String family) {
+    _record.setFamily(family);
+    return this;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_mutateType == null) ? 0 : _mutateType.hashCode());
+    result = prime * result + ((_record == null) ? 0 : _record.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurMutate other = (BlurMutate) obj;
+    if (_mutateType != other._mutateType)
+      return false;
+    if (_record == null) {
+      if (other._record != null)
+        return false;
+    } else if (!_record.equals(other._record))
+      return false;
+    return true;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurRecord.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurRecord.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurRecord.java
index b45bcb3..9d2868b 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurRecord.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurRecord.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.blur.utils.ReaderBlurRecord;
 import org.apache.hadoop.io.Writable;
 
-
 public class BlurRecord implements Writable, ReaderBlurRecord {
 
   private String _rowId;
@@ -133,4 +132,47 @@ public class BlurRecord implements Writable, ReaderBlurRecord {
     return "{rowId=" + _rowId + ", recordId=" + _recordId + ", family=" + _family + ", columns=" + _columns + "}";
   }
 
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_columns == null) ? 0 : _columns.hashCode());
+    result = prime * result + ((_family == null) ? 0 : _family.hashCode());
+    result = prime * result + ((_recordId == null) ? 0 : _recordId.hashCode());
+    result = prime * result + ((_rowId == null) ? 0 : _rowId.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurRecord other = (BlurRecord) obj;
+    if (_columns == null) {
+      if (other._columns != null)
+        return false;
+    } else if (!_columns.equals(other._columns))
+      return false;
+    if (_family == null) {
+      if (other._family != null)
+        return false;
+    } else if (!_family.equals(other._family))
+      return false;
+    if (_recordId == null) {
+      if (other._recordId != null)
+        return false;
+    } else if (!_recordId.equals(other._recordId))
+      return false;
+    if (_rowId == null) {
+      if (other._rowId != null)
+        return false;
+    } else if (!_rowId.equals(other._rowId))
+      return false;
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
index cb6beee..d71d983 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -75,7 +75,7 @@ import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.NoLockFactory;
 import org.apache.lucene.util.IOUtils;
 
-public class BlurReducer extends Reducer<BytesWritable, BlurMutate, BytesWritable, BlurMutate> {
+public class BlurReducer extends Reducer<Text, BlurMutate, Text, BlurMutate> {
 
   static class LuceneFileComparator implements Comparator<String> {
 
@@ -149,14 +149,14 @@ public class BlurReducer extends Reducer<BytesWritable, BlurMutate, BytesWritabl
   }
 
   @Override
-  protected void reduce(BytesWritable key, Iterable<BlurMutate> values, Context context) throws IOException,
+  protected void reduce(Text key, Iterable<BlurMutate> values, Context context) throws IOException,
       InterruptedException {
     if (!index(key, values, context)) {
       _rowFailures.increment(1);
     }
   }
 
-  protected boolean index(BytesWritable key, Iterable<BlurMutate> values, Context context) throws IOException {
+  protected boolean index(Text key, Iterable<BlurMutate> values, Context context) throws IOException {
     int recordCount = 0;
     _newDocs.clear();
     _recordIdsToDelete.clear();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java
index 7e57070..3af51ec 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java
@@ -48,7 +48,6 @@ import org.apache.thrift.transport.TIOStreamTransport;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 
-
 public class BlurTask implements Writable {
 
   public enum INDEXING_TYPE {
@@ -124,7 +123,8 @@ public class BlurTask implements Writable {
         return num;
       }
       if (shardCount != num) {
-        LOG.warn("Asked for " + num + " reducers, but existing table " + _tableDescriptor.name + " has " + shardCount + " shards. Using " + shardCount + " reducers");
+        LOG.warn("Asked for " + num + " reducers, but existing table " + _tableDescriptor.name + " has " + shardCount
+            + " shards. Using " + shardCount + " reducers");
       }
       return shardCount;
     } catch (IOException e) {
@@ -187,7 +187,8 @@ public class BlurTask implements Writable {
     try {
       List<String> children = _zooKeeper.getChildren(ZookeeperPathConstants.getLockPath(cluster, table), false);
       if (!children.isEmpty()) {
-        throw new RuntimeException("Table [" + table + "] in cluster [" + cluster + "] has write locks enabled, cannot perform update.");
+        throw new RuntimeException("Table [" + table + "] in cluster [" + cluster
+            + "] has write locks enabled, cannot perform update.");
       }
     } catch (KeeperException e) {
       throw new RuntimeException(e);
@@ -198,7 +199,11 @@ public class BlurTask implements Writable {
   }
 
   public static BlurTask read(Configuration configuration) throws IOException {
-    byte[] blurTaskBs = Base64.decodeBase64(configuration.get(BLUR_BLURTASK));
+    String base64String = configuration.get(BLUR_BLURTASK);
+    if (base64String == null) {
+      return null;
+    }
+    byte[] blurTaskBs = Base64.decodeBase64(base64String);
     BlurTask blurTask = new BlurTask();
     blurTask.readFields(new DataInputStream(new ByteArrayInputStream(blurTaskBs)));
     return blurTask;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/BlurExampleIndexerRebuild.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/BlurExampleIndexerRebuild.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/BlurExampleIndexerRebuild.java
new file mode 100644
index 0000000..65ae8c8
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/BlurExampleIndexerRebuild.java
@@ -0,0 +1,79 @@
+package org.apache.blur.mapreduce.csv;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.BlurTask;
+import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.ColumnDefinition;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+
+public class BlurExampleIndexerRebuild {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length != 2) {
+      System.err.println("Usage: blurindexer <in> <out>");
+      System.exit(2);
+    }
+
+    AnalyzerDefinition ad = new AnalyzerDefinition();
+    ad.defaultDefinition = new ColumnDefinition(StandardAnalyzer.class.getName(), true, null);
+
+    TableDescriptor descriptor = new TableDescriptor();
+    descriptor.analyzerDefinition = ad;
+    descriptor.compressionBlockSize = 32768;
+    descriptor.compressionClass = DefaultCodec.class.getName();
+    descriptor.isEnabled = true;
+    descriptor.name = "test-table";
+    descriptor.shardCount = 1;
+    descriptor.cluster = "default";
+    descriptor.tableUri = "./blur-testing";
+
+    BlurTask blurTask = new BlurTask();
+    blurTask.setTableDescriptor(descriptor);
+    blurTask.setIndexingType(INDEXING_TYPE.REBUILD);
+    blurTask.setOptimize(false);
+    Job job = blurTask.configureJob(configuration);
+    job.setJarByClass(BlurExampleIndexerRebuild.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
+    long s = System.currentTimeMillis();
+    boolean waitForCompletion = job.waitForCompletion(true);
+    long e = System.currentTimeMillis();
+    System.out.println("Completed in [" + (e - s) + " ms]");
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/BlurExampleIndexerUpdate.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/BlurExampleIndexerUpdate.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/BlurExampleIndexerUpdate.java
new file mode 100644
index 0000000..68bc076
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/BlurExampleIndexerUpdate.java
@@ -0,0 +1,62 @@
+package org.apache.blur.mapreduce.csv;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.mapreduce.BlurTask;
+import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+
+public class BlurExampleIndexerUpdate {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length != 2) {
+      System.err.println("Usage: blurindexer <in> <out>");
+      System.exit(2);
+    }
+
+    ZookeeperClusterStatus status = new ZookeeperClusterStatus("localhost");
+    TableDescriptor descriptor = status.getTableDescriptor(false, "default", "test-table");
+
+    BlurTask blurTask = new BlurTask();
+    blurTask.setTableDescriptor(descriptor);
+    blurTask.setIndexingType(INDEXING_TYPE.UPDATE);
+    Job job = blurTask.configureJob(configuration);
+    job.setJarByClass(BlurExampleIndexerUpdate.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
+    boolean waitForCompletion = job.waitForCompletion(true);
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/CsvBlurMapper.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/CsvBlurMapper.java
new file mode 100644
index 0000000..a75b4e6
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/csv/CsvBlurMapper.java
@@ -0,0 +1,124 @@
+package org.apache.blur.mapreduce.csv;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.blur.mapreduce.BlurMapper;
+import org.apache.blur.mapreduce.BlurMutate.MUTATE_TYPE;
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.base.Splitter;
+
+public class CsvBlurMapper extends BlurMapper<LongWritable, Text> {
+
+  public static final String BLUR_CSV_FAMILY_COLUMN_PREFIX = "blur.csv.family.";
+  public static final String BLUR_CSV_FAMILIES = "blur.csv.families";
+
+  private Map<String, List<String>> columnNameMap;
+  private String separator = ",";
+  private Splitter splitter;
+
+  public static void addColumns(Configuration configuration, String family, String... columns) {
+    Collection<String> families = new TreeSet<String>(configuration.getStringCollection(BLUR_CSV_FAMILIES));
+    families.add(family);
+    configuration.setStrings(BLUR_CSV_FAMILIES, families.toArray(new String[] {}));
+    configuration.setStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family, columns);
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration configuration = context.getConfiguration();
+    Collection<String> familyNames = configuration.getStringCollection(BLUR_CSV_FAMILIES);
+    columnNameMap = new HashMap<String, List<String>>();
+    for (String family : familyNames) {
+      String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
+      columnNameMap.put(family, Arrays.asList(columnsNames));
+    }
+    splitter = Splitter.on(separator);
+  }
+
+  @Override
+  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException {
+    BlurRecord record = _mutate.getRecord();
+    record.clearColumns();
+    String str = value.toString();
+
+    Iterable<String> split = splitter.split(str);
+    List<String> list = toList(split);
+
+    if (list.size() < 3) {
+      throw new IOException("Record [" + str + "] too short.");
+    }
+
+    record.setRowId(list.get(0));
+    record.setRecordId(list.get(1));
+    String family = list.get(2);
+    record.setFamily(family);
+
+    List<String> columnNames = columnNameMap.get(family);
+    if (columnNames == null) {
+      throw new IOException("Family [" + family + "] is missing in the definition.");
+    }
+    if (list.size() - 3 != columnNames.size()) {
+      throw new IOException("Record [" + str + "] too short, does not match defined record [rowid,recordid,family"
+          + getColumnNames(columnNames) + "].");
+    }
+
+    for (int i = 0; i < columnNames.size(); i++) {
+      record.addColumn(columnNames.get(i), list.get(i + 3));
+      _fieldCounter.increment(1);
+    }
+    _key.set(record.getRowId());
+    _mutate.setMutateType(MUTATE_TYPE.REPLACE);
+    context.write(_key, _mutate);
+    _recordCounter.increment(1);
+    context.progress();
+  }
+
+  private String getColumnNames(List<String> columnNames) {
+    StringBuilder builder = new StringBuilder();
+    for (String c : columnNames) {
+      builder.append(',').append(c);
+    }
+    return builder.toString();
+  }
+
+  private List<String> toList(Iterable<String> split) {
+    List<String> lst = new ArrayList<String>();
+    for (String s : split) {
+      lst.add(s);
+    }
+    return lst;
+  }
+
+  public static void addColumns(Job job, String family, String columns) {
+    addColumns(job.getConfiguration(), family, columns);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java
deleted file mode 100644
index 29eba4e..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.blur.mapreduce.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.mapreduce.BlurTask;
-import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
-import org.apache.blur.thrift.generated.AnalyzerDefinition;
-import org.apache.blur.thrift.generated.ColumnDefinition;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-
-
-public class BlurExampleIndexerRebuild {
-
-  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-    Configuration configuration = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: blurindexer <in> <out>");
-      System.exit(2);
-    }
-
-    AnalyzerDefinition ad = new AnalyzerDefinition();
-    ad.defaultDefinition = new ColumnDefinition(StandardAnalyzer.class.getName(), true, null);
-
-    TableDescriptor descriptor = new TableDescriptor();
-    descriptor.analyzerDefinition = ad;
-    descriptor.compressionBlockSize = 32768;
-    descriptor.compressionClass = DefaultCodec.class.getName();
-    descriptor.isEnabled = true;
-    descriptor.name = "test-table";
-    descriptor.shardCount = 1;
-    descriptor.cluster = "default";
-    descriptor.tableUri = "./blur-testing";
-
-    BlurTask blurTask = new BlurTask();
-    blurTask.setTableDescriptor(descriptor);
-    blurTask.setIndexingType(INDEXING_TYPE.REBUILD);
-    blurTask.setOptimize(false);
-    Job job = blurTask.configureJob(configuration);
-    job.setJarByClass(BlurExampleIndexerRebuild.class);
-    job.setMapperClass(BlurExampleMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setOutputFormatClass(TextOutputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
-    long s = System.currentTimeMillis();
-    boolean waitForCompletion = job.waitForCompletion(true);
-    long e = System.currentTimeMillis();
-    System.out.println("Completed in [" + (e - s) + " ms]");
-    System.exit(waitForCompletion ? 0 : 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java
deleted file mode 100644
index 87cb59d..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.blur.mapreduce.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
-import org.apache.blur.mapreduce.BlurTask;
-import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-
-public class BlurExampleIndexerUpdate {
-
-  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-    Configuration configuration = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: blurindexer <in> <out>");
-      System.exit(2);
-    }
-
-    ZookeeperClusterStatus status = new ZookeeperClusterStatus("localhost");
-    TableDescriptor descriptor = status.getTableDescriptor(false, "default", "test-table");
-
-    BlurTask blurTask = new BlurTask();
-    blurTask.setTableDescriptor(descriptor);
-    blurTask.setIndexingType(INDEXING_TYPE.UPDATE);
-    Job job = blurTask.configureJob(configuration);
-    job.setJarByClass(BlurExampleIndexerUpdate.class);
-    job.setMapperClass(BlurExampleMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setOutputFormatClass(TextOutputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
-    boolean waitForCompletion = job.waitForCompletion(true);
-    System.exit(waitForCompletion ? 0 : 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java
deleted file mode 100644
index 20b86d2..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.blur.mapreduce.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.blur.mapreduce.BlurMapper;
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.blur.mapreduce.BlurMutate.MUTATE_TYPE;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-
-public class BlurExampleMapper extends BlurMapper<LongWritable, Text> {
-
-  @Override
-  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException {
-    BlurRecord record = _mutate.getRecord();
-    record.clearColumns();
-    String str = value.toString();
-    String[] split = str.split("\\t");
-    record.setRowId(UUID.randomUUID().toString());
-    record.setRecordId(UUID.randomUUID().toString());
-    record.setFamily("cf1");
-    for (int i = 0; i < split.length; i++) {
-      record.addColumn("c" + i, split[i]);
-      _fieldCounter.increment(1);
-    }
-    byte[] bs = record.getRowId().getBytes();
-    _key.set(bs, 0, bs.length);
-    _mutate.setMutateType(MUTATE_TYPE.ADD);
-    context.write(_key, _mutate);
-    _recordCounter.increment(1);
-    context.progress();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
index f5abf65..83fcde1 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
@@ -18,39 +18,58 @@ package org.apache.blur.mapreduce.lib;
  */
 import java.io.IOException;
 
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.blur.mapred.AbstractOutputCommitter;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
 
-public class BlurOutputCommitter extends OutputCommitter {
+public class BlurOutputCommitter extends AbstractOutputCommitter {
 
-  public BlurOutputCommitter(TaskAttemptContext context) {
-
-  }
+  private Path _newIndex;
+  private Configuration _configuration;
+  private TaskAttemptID _taskAttemptID;
+  private Path _indexPath;
 
   @Override
-  public void setupJob(JobContext jobContext) throws IOException {
-
+  public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+    return true;
   }
 
   @Override
-  public void setupTask(TaskAttemptContext taskContext) throws IOException {
-
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    
   }
 
   @Override
-  public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
-    return false;
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    setup(context);
+    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
+    if (fileSystem.exists(_newIndex) && !fileSystem.isFile(_newIndex)) {
+      fileSystem.rename(_newIndex, new Path(_indexPath, _taskAttemptID.toString() + ".task_complete"));
+    } else {
+      throw new IOException("Path [" + _newIndex + "] does not exist, can not commit.");
+    }
   }
 
   @Override
-  public void commitTask(TaskAttemptContext taskContext) throws IOException {
-
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    setup(context);
+    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
+    fileSystem.delete(_indexPath, true);
   }
-
-  @Override
-  public void abortTask(TaskAttemptContext taskContext) throws IOException {
-
+  
+  private void setup(TaskAttemptContext context) {
+    _configuration = context.getConfiguration();
+    int shardId = context.getTaskAttemptID().getTaskID().getId();
+    _taskAttemptID = context.getTaskAttemptID();
+    Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
+    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+    _indexPath = new Path(tableOutput, shardName);
+    _newIndex = new Path(_indexPath, _taskAttemptID.toString() + ".tmp");
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index b3fd106..b36cc19 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -18,30 +18,106 @@ package org.apache.blur.mapreduce.lib;
  */
 import java.io.IOException;
 
-import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.mapreduce.BlurMutate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 
+public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> {
 
-public class BlurOutputFormat extends OutputFormat<Text, BlurRecord> {
+  private static final String MAPRED_OUTPUT_COMMITTER_CLASS = "mapred.output.committer.class";
+  public static final String BLUR_OUTPUT_PATH = "blur.output.path";
 
   @Override
   public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-
   }
 
   @Override
-  public RecordWriter<Text, BlurRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-    return new BlurRecordWriter(context);
+  public RecordWriter<Text, BlurMutate> getRecordWriter(TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    int id = context.getTaskAttemptID().getTaskID().getId();
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    return new BlurRecordWriter(context.getConfiguration(), new BlurAnalyzer(), id, taskAttemptID.toString() + ".tmp");
   }
 
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-    return new BlurOutputCommitter(context);
+    int numReduceTasks = context.getNumReduceTasks();
+    if (numReduceTasks != 0) {
+      try {
+        Class<? extends OutputFormat<?, ?>> outputFormatClass = context.getOutputFormatClass();
+        if (outputFormatClass.equals(BlurOutputFormat.class)) {
+          // Then only reducer needs committer.
+          if (context.getTaskAttemptID().isMap()) {
+            return getDoNothing();
+          }
+        }
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+    }
+    return new BlurOutputCommitter();
+  }
+
+  private OutputCommitter getDoNothing() {
+    return new OutputCommitter() {
+      
+      @Override
+      public void commitJob(JobContext jobContext) throws IOException {
+      }
+
+      @Override
+      public void cleanupJob(JobContext context) throws IOException {
+      }
+
+      @Override
+      public void abortJob(JobContext jobContext, State state) throws IOException {
+      }
+
+      @Override
+      public void setupTask(TaskAttemptContext taskContext) throws IOException {
+        
+      }
+      
+      @Override
+      public void setupJob(JobContext jobContext) throws IOException {
+        
+      }
+      
+      @Override
+      public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+        return false;
+      }
+      
+      @Override
+      public void commitTask(TaskAttemptContext taskContext) throws IOException {
+        
+      }
+      
+      @Override
+      public void abortTask(TaskAttemptContext taskContext) throws IOException {
+        
+      }
+    };
+  }
+
+  public static void setOutputPath(Job job, Path path) {
+    Configuration configuration = job.getConfiguration();
+    configuration.set(BLUR_OUTPUT_PATH, path.toString());
+    configuration.set(MAPRED_OUTPUT_COMMITTER_CLASS, BlurOutputCommitter.class.getName());
+  }
+
+  public static Path getOutputPath(Configuration configuration) {
+    return new Path(configuration.get(BLUR_OUTPUT_PATH));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
index f754ab3..a40fa49 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
@@ -20,11 +20,17 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.LuceneVersionConstant;
+import org.apache.blur.manager.writer.TransactionRecorder;
 import org.apache.blur.mapreduce.BlurColumn;
+import org.apache.blur.mapreduce.BlurMutate;
 import org.apache.blur.mapreduce.BlurRecord;
 import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -32,84 +38,78 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Index;
 import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.util.Version;
 
-public class BlurRecordWriter extends RecordWriter<Text, BlurRecord> {
+public class BlurRecordWriter extends RecordWriter<Text, BlurMutate> {
 
   private static Log LOG = LogFactory.getLog(BlurRecordWriter.class);
 
-  private Text prevKey = new Text();
-  private List<Document> documents = new ArrayList<Document>();
-  private IndexWriter writer;
+  private final Text _prevKey = new Text();
+  private final List<Document> _documents = new ArrayList<Document>();
+  private final IndexWriter _writer;
+  private final BlurAnalyzer _analyzer;
+  private final StringBuilder _builder = new StringBuilder();
 
-  public BlurRecordWriter(TaskAttemptContext context) throws IOException {
-    Configuration configuration = context.getConfiguration();
-    String outputPath = configuration.get("mapred.output.dir");
-    int id = context.getTaskAttemptID().getTaskID().getId();
-    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, id);
-    Path basePath = new Path(outputPath);
-    Path indexPath = new Path(basePath, shardName);
-
-    // @TODO
-    Analyzer analyzer = new KeywordAnalyzer();
-
-    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer);
-
-    // @TODO setup compressed directory, read compression codec from config,
-    // setup progressable dir, setup lock factory
-    Directory dir = new HdfsDirectory(configuration, indexPath);
+  public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int shardId, String tmpDirName) throws IOException {
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+    Path indexPath = new Path(tableOutput, shardName);
+    Path newIndex = new Path(indexPath,tmpDirName);
+    _analyzer = blurAnalyzer;
+    IndexWriterConfig conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, _analyzer);
+    Directory dir = new HdfsDirectory(configuration, newIndex);
     dir.setLockFactory(NoLockFactory.getNoLockFactory());
-    writer = new IndexWriter(dir, conf);
+    _writer = new IndexWriter(dir, conf);
   }
 
   @Override
-  public void write(Text key, BlurRecord value) throws IOException, InterruptedException {
-    if (!prevKey.equals(key)) {
+  public void write(Text key, BlurMutate value) throws IOException, InterruptedException {
+    if (!_prevKey.equals(key)) {
       flush();
-      prevKey.set(key);
+      _prevKey.set(key);
     }
     add(value);
   }
 
-  private void add(BlurRecord value) {
-    List<BlurColumn> columns = value.getColumns();
-    String family = value.getFamily();
-    Document document = new Document();
-    document.add(new Field(BlurConstants.ROW_ID, value.getRowId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    document.add(new Field(BlurConstants.RECORD_ID, value.getRecordId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    for (BlurColumn column : columns) {
-      document.add(convert(family, column));
+  private void add(BlurMutate value) {
+    BlurRecord blurRecord = value.getRecord();
+    Record record = getRecord(blurRecord);
+    Document document = TransactionRecorder.convert(blurRecord.getRowId(), record, _builder, _analyzer);
+    if (_documents.size() == 0) {
+      document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
     }
-    documents.add(document);
+    _documents.add(document);
     LOG.error("Needs to use blur analyzer and field converter");
   }
 
-  private Field convert(String family, BlurColumn column) {
-    return new Field(family + "." + column.getName(), column.getValue(), Store.YES, Index.ANALYZED_NO_NORMS);
+  private Record getRecord(BlurRecord value) {
+    Record record = new Record();
+    record.setRecordId(value.getRecordId());
+    record.setFamily(value.getFamily());
+    for (BlurColumn col : value.getColumns()) {
+      record.addToColumns(new Column(col.getName(), col.getValue()));
+    }
+    return record;
   }
 
   private void flush() throws CorruptIndexException, IOException {
-    if (documents.isEmpty()) {
+    if (_documents.isEmpty()) {
       return;
     }
-    writer.addDocuments(documents);
-    documents.clear();
+    _writer.addDocuments(_documents);
+    _documents.clear();
   }
 
   @Override
   public void close(TaskAttemptContext context) throws IOException, InterruptedException {
     flush();
-    writer.close();
+    _writer.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
new file mode 100644
index 0000000..3e283dd
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
@@ -0,0 +1,18 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.BlurMutate;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class DefaultBlurReducer extends Reducer<Text, BlurMutate, Text, BlurMutate> {
+
+  @Override
+  protected void reduce(Text key, Iterable<BlurMutate> values, Context context) throws IOException,
+      InterruptedException {
+    for (BlurMutate value : values) {
+      context.write(key, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurReducerTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurReducerTest.java b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurReducerTest.java
new file mode 100644
index 0000000..08541c1
--- /dev/null
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurReducerTest.java
@@ -0,0 +1,57 @@
+package org.apache.blur.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.BlurMutate.MUTATE_TYPE;
+import org.apache.blur.mapreduce.csv.CsvBlurMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BlurReducerTest {
+
+  MapDriver<LongWritable, Text, Text, BlurMutate> mapDriver;
+  ReduceDriver<Text, BlurMutate, Text, BlurMutate> reduceDriver;
+  MapReduceDriver<LongWritable, Text, Text, BlurMutate, Text, BlurMutate> mapReduceDriver;
+
+  @Before
+  public void setUp() throws IOException {
+    CsvBlurMapper mapper = new CsvBlurMapper();
+
+    mapDriver = MapDriver.newMapDriver(mapper);
+    Configuration configuration = mapDriver.getConfiguration();
+    CsvBlurMapper.addColumns(configuration, "cf1", "col1", "col2");
+
+    // Configuration configuration = new Configuration();
+    // BlurTask blurTask = new BlurTask();
+    // blurTask.configureJob(configuration);
+    // mapDriver.setConfiguration(configuration);
+    BlurReducer reducer = new BlurReducer();
+    reduceDriver = ReduceDriver.newReduceDriver(reducer);
+
+    mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
+
+  }
+
+  @Test
+  public void testMapper() {
+    mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
+    mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1").addColumn("col1", "value1").addColumn("col2", "value2"));
+    mapDriver.runTest();
+  }
+
+  @Test
+  public void testReducer() {
+    // List<IntWritable> values = new ArrayList<IntWritable>();
+    // values.add(new IntWritable(1));
+    // values.add(new IntWritable(1));
+    // reduceDriver.withInput(new Text("6"), values);
+    // reduceDriver.withOutput(new Text("6"), new IntWritable(2));
+    // reduceDriver.runTest();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
new file mode 100644
index 0000000..e728d41
--- /dev/null
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -0,0 +1,101 @@
+package org.apache.blur.mapreduce.lib;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.blur.mapreduce.BlurMutate;
+import org.apache.blur.mapreduce.csv.CsvBlurMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TestMapReduceLocal.TrackingTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurOutputFormatTest {
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  private static MiniMRCluster mr;
+  private static Path TEST_ROOT_DIR;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    System.setProperty("test.build.data", "./target/BlurOutputFormatTest/data");
+    TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "/tmp"));
+    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+    mr = new MiniMRCluster(2, "file:///", 3);
+  }
+
+  @AfterClass
+  public static void teardown() {
+    if (mr != null) {
+      mr.shutdown();
+    }
+  }
+
+  @Test
+  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+    writeFile("in/part1", "1,1,cf1,val1");
+    writeFile("in/part2", "1,2,cf1,val2");
+    Job job = new Job(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setReducerClass(DefaultBlurReducer.class);
+    job.setNumReduceTasks(4);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(BlurMutate.class);
+    job.setOutputFormatClass(BlurOutputFormat.class);
+
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    BlurOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+  }
+
+  public static String readFile(String name) throws IOException {
+    DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+      result.append(line);
+      result.append('\n');
+      line = b.readLine();
+    }
+    b.close();
+    return result.toString();
+  }
+
+  public static Path writeFile(String name, String data) throws IOException {
+    Path file = new Path(TEST_ROOT_DIR + "/" + name);
+    localFs.delete(file, false);
+    DataOutputStream f = localFs.create(file);
+    f.write(data.getBytes());
+    f.close();
+    return file;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-store/src/main/java/org/apache/blur/lucene/LuceneVersionConstant.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/LuceneVersionConstant.java b/src/blur-store/src/main/java/org/apache/blur/lucene/LuceneVersionConstant.java
index b4ca0f9..20ae366 100644
--- a/src/blur-store/src/main/java/org/apache/blur/lucene/LuceneVersionConstant.java
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/LuceneVersionConstant.java
@@ -23,6 +23,6 @@ public class LuceneVersionConstant {
   /**
    * All Lucene Version references should refer to this constant.
    */
-  public static final Version LUCENE_VERSION = Version.LUCENE_41;
+  public static final Version LUCENE_VERSION = Version.LUCENE_43;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
index 1f21190..250cc69 100644
--- a/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceCounter.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.store.hdfs.DirectoryDecorator;
 import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
@@ -32,7 +33,7 @@ import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockFactory;
 
-public class DirectoryReferenceCounter extends Directory {
+public class DirectoryReferenceCounter extends Directory implements DirectoryDecorator {
 
   private final static Log LOG = LogFactory.getLog(DirectoryReferenceCounter.class);
   private Directory directory;
@@ -279,4 +280,9 @@ public class DirectoryReferenceCounter extends Directory {
     }
   }
 
+  @Override
+  public Directory getOriginalDirectory() {
+    return directory;
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e3699254/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
index dfdba07..87b35c1 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.blur.store.buffer.BufferStore;
 import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
+import org.apache.blur.store.hdfs.DirectoryDecorator;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
@@ -33,7 +34,7 @@ import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockFactory;
 
-public class BlockDirectory extends Directory {
+public class BlockDirectory extends Directory implements DirectoryDecorator {
 
   public static final long BLOCK_SHIFT = 13; // 2^13 = 8,192 bytes per block
   public static final long BLOCK_MOD = 0x1FFF;
@@ -313,4 +314,9 @@ public class BlockDirectory extends Directory {
     return _directory;
   }
 
+  @Override
+  public Directory getOriginalDirectory() {
+    return _directory;
+  }
+
 }


Mime
View raw message