incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Adding incremental row udpates via map reduce with hive integration, documentation to come.
Date Tue, 21 Apr 2015 13:59:36 GMT
Adding incremental row udpates via map reduce with hive integration, documentation to come.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/67984a6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/67984a6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/67984a6a

Branch: refs/heads/master
Commit: 67984a6a774b17eb4cf8b6234100fcc2972a9c5a
Parents: 2b2cc90
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Apr 21 09:59:16 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Apr 21 09:59:16 2015 -0400

----------------------------------------------------------------------
 .../hive/BlurHiveMRLoaderOutputCommitter.java   |  91 ++++++
 .../apache/blur/hive/BlurHiveOutputFormat.java  |  44 ++-
 .../blur/hive/BlurHiveStorageHandler.java       |  28 +-
 .../java/org/apache/blur/hive/BlurSerDe.java    |  23 +-
 .../org/apache/blur/hive/BlurSerDeTest.java     |  85 +++++-
 .../blur/mapreduce/lib/BlurInputFormat.java     | 203 +++++--------
 .../blur/mapreduce/lib/GenericRecordReader.java | 229 +++++++++++++++
 .../blur/mapreduce/lib/update/Driver.java       | 150 ++++++++++
 .../blur/mapreduce/lib/update/IndexKey.java     | 207 +++++++++++++
 .../lib/update/IndexKeyPartitioner.java         |  33 +++
 .../lib/update/IndexKeyWritableComparator.java  |  36 +++
 .../blur/mapreduce/lib/update/IndexValue.java   |  66 +++++
 .../lib/update/MapperForExistingData.java       |  46 +++
 .../mapreduce/lib/update/MapperForNewData.java  |  82 ++++++
 .../mapreduce/lib/update/UpdateReducer.java     |  83 ++++++
 .../blur/mapreduce/lib/BlurInputFormatTest.java |  61 +++-
 .../blur/mapreduce/lib/update/DriverTest.java   | 289 +++++++++++++++++++
 .../blur/mapreduce/lib/update/IndexKeyTest.java |  55 ++++
 18 files changed, 1645 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
new file mode 100644
index 0000000..b14063d
--- /dev/null
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.hive;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+
+public class BlurHiveMRLoaderOutputCommitter extends OutputCommitter {
+
+  private static final Log LOG = LogFactory.getLog(BlurHiveMRLoaderOutputCommitter.class);
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskContext) throws IOException {
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext taskContext) throws IOException {
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext taskContext) throws IOException {
+
+  }
+
+  @Override
+  public void abortJob(JobContext context, int status) throws IOException {
+    finishBulkJob(context, false);
+  }
+
+  @Override
+  public void cleanupJob(JobContext context) throws IOException {
+
+  }
+
+  @Override
+  public void commitJob(JobContext context) throws IOException {
+    finishBulkJob(context, true);
+  }
+
+  private void finishBulkJob(JobContext context, boolean apply) throws IOException {
+    Configuration configuration = context.getConfiguration();
+    String workingPathStr = configuration.get(BlurSerDe.BLUR_MR_UPDATE_WORKING_PATH);
+    Path workingPath = new Path(workingPathStr);
+    Path tmpDir = new Path(workingPath, "tmp");
+    FileSystem fileSystem = tmpDir.getFileSystem(configuration);
+    String loadId = configuration.get(BlurSerDe.BLUR_MR_LOAD_ID);
+    Path loadPath = new Path(tmpDir, loadId);
+
+    if (apply) {
+      Path newDataPath = new Path(workingPath, "new");
+      Path dst = new Path(newDataPath, loadId);
+      if (!fileSystem.rename(loadPath, dst)) {
+        LOG.error("Could move data from src [" + loadPath + "] to dst [" + dst + "]");
+      }
+    } else {
+      fileSystem.delete(loadPath, true);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
index 158f807..2e5e2dc 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.blur.manager.BlurPartitioner;
@@ -44,6 +45,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -77,13 +80,48 @@ public class BlurHiveOutputFormat implements HiveOutputFormat<Text, BlurRecord>
   public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc,
       Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties,
       Progressable progress) throws IOException {
+    if (BlurSerDe.shouldUseMRWorkingPath(jc)) {
+      return getMrWorkingPathWriter(jc);
+    }
+    return getBulkRecordWriter(jc);
+  }
+
+  private org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getMrWorkingPathWriter(
+      Configuration configuration) throws IOException {
+    String workingPathStr = configuration.get(BlurSerDe.BLUR_MR_UPDATE_WORKING_PATH);
+    Path workingPath = new Path(workingPathStr);
+    Path tmpDir = new Path(workingPath, "tmp");
+    FileSystem fileSystem = tmpDir.getFileSystem(configuration);
+    String loadId = configuration.get(BlurSerDe.BLUR_MR_LOAD_ID);
+    Path loadPath = new Path(tmpDir, loadId);
+
+    final Writer writer = new SequenceFile.Writer(fileSystem, configuration, new Path(loadPath, UUID.randomUUID()
+        .toString()), Text.class, BlurRecord.class);
+
+    return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+
+      @Override
+      public void write(Writable w) throws IOException {
+        BlurRecord blurRecord = (BlurRecord) w;
+        String rowId = blurRecord.getRowId();
+        writer.append(new Text(rowId), blurRecord);
+      }
+
+      @Override
+      public void close(boolean abort) throws IOException {
+        writer.close();
+      }
+    };
+  }
 
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(jc);
-    String conStr = jc.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
+  private org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getBulkRecordWriter(Configuration configuration)
+      throws IOException {
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    String conStr = configuration.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
     final Iface controllerClient = BlurClient.getClient(conStr);
     final String table = tableDescriptor.getName();
     final int numberOfShardsInTable = tableDescriptor.getShardCount();
-    final String bulkId = getBulkId(jc);
+    final String bulkId = getBulkId(configuration);
     return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
 
       private BlurPartitioner _blurPartitioner = new BlurPartitioner();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
index 13a63dd..99380d4 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
@@ -50,17 +50,23 @@ public class BlurHiveStorageHandler extends DefaultStorageHandler {
 
   @Override
   public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
-    try {
-      String bulkId = UUID.randomUUID().toString();
-      String connectionStr = jobConf.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
-      Iface client = BlurClient.getClient(connectionStr);
-      client.bulkMutateStart(bulkId);
-      BlurHiveOutputFormat.setBulkId(jobConf, bulkId);
-      jobConf.setOutputCommitter(BlurHiveOutputCommitter.class);
-    } catch (BlurException e) {
-      throw new RuntimeException(e);
-    } catch (TException e) {
-      throw new RuntimeException(e);
+    if (BlurSerDe.shouldUseMRWorkingPath(jobConf)) {
+      String loadId = UUID.randomUUID().toString();
+      jobConf.set(BlurSerDe.BLUR_MR_LOAD_ID, loadId);
+      jobConf.setOutputCommitter(BlurHiveMRLoaderOutputCommitter.class);
+    } else {
+      try {
+        String bulkId = UUID.randomUUID().toString();
+        String connectionStr = jobConf.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
+        Iface client = BlurClient.getClient(connectionStr);
+        client.bulkMutateStart(bulkId);
+        BlurHiveOutputFormat.setBulkId(jobConf, bulkId);
+        jobConf.setOutputCommitter(BlurHiveOutputCommitter.class);
+      } catch (BlurException e) {
+        throw new RuntimeException(e);
+      } catch (TException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
index 150ad20..1f09493 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
@@ -43,12 +43,15 @@ import org.apache.hadoop.io.Writable;
 
 public class BlurSerDe extends AbstractSerDe {
 
+  public static final String BLUR_MR_UPDATE_WORKING_PATH = "blur.mr.update.working.path";
+  public static final String BLUR_MR_UPDATE_DISABLED = "blur.mr.update.disabled";
   public static final String BLUR_BLOCKING_APPLY = "blur.blocking.apply";
   public static final String BLUR_CONTROLLER_CONNECTION_STR = "blur.controller.connection.str";
   public static final String FAMILY = "blur.family";
   public static final String TABLE = "blur.table";
   public static final String ZK = BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
-  
+  public static final String BLUR_MR_LOAD_ID = "blur.mr.load.id";
+
   private String _family;
   private Map<String, ColumnDefinition> _schema;
   private ObjectInspector _objectInspector;
@@ -82,6 +85,16 @@ public class BlurSerDe extends AbstractSerDe {
       }
       if (conf != null) {
         TableDescriptor tableDescriptor = client.describe(table);
+        Map<String, String> tableProperties = tableDescriptor.getTableProperties();
+        if (tableProperties != null) {
+          String workingPath = tableProperties.get(BLUR_MR_UPDATE_WORKING_PATH);
+          if (conf != null && workingPath != null) {
+            if (!conf.getBoolean(BLUR_MR_UPDATE_DISABLED, false)) {
+              conf.set(BLUR_MR_UPDATE_WORKING_PATH, workingPath);
+            }
+          }
+        }
+
         BlurOutputFormat.setTableDescriptor(conf, tableDescriptor);
         conf.set(BLUR_CONTROLLER_CONNECTION_STR, getControllerConnectionStr(client));
       }
@@ -161,4 +174,12 @@ public class BlurSerDe extends AbstractSerDe {
     return BlurRecord.class;
   }
 
+  public static boolean shouldUseMRWorkingPath(Configuration configuration) {
+    String workingPath = configuration.get(BlurSerDe.BLUR_MR_UPDATE_WORKING_PATH);
+    if (workingPath != null) {
+      return true;
+    }
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java b/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
index aea485f..fda4f20 100644
--- a/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
+++ b/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
@@ -40,6 +40,7 @@ import java.util.Properties;
 import org.apache.blur.MiniCluster;
 import org.apache.blur.mapreduce.lib.BlurColumn;
 import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.mapreduce.lib.update.Driver;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.BlurClient;
 import org.apache.blur.thrift.generated.Blur.Iface;
@@ -49,6 +50,7 @@ import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.ColumnDefinition;
 import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
 import org.apache.blur.utils.GCWatcher;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -107,13 +109,17 @@ public class BlurSerDeTest {
     testDirectory.delete();
     miniCluster = new MiniCluster();
     miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true, externalProcesses);
+    miniCluster.startMrMiniCluster();
   }
 
   @AfterClass
-  public static void shutdownCluster() {
+  public static void shutdownCluster() throws IOException {
+    miniCluster.stopMrMiniCluster();
     miniCluster.shutdownBlurCluster();
   }
 
+  private String _mrWorkingPath;
+
   @Before
   public void setup() throws BlurException, TException, IOException {
     String controllerConnectionStr = miniCluster.getControllerConnectionStr();
@@ -124,6 +130,10 @@ public class BlurSerDeTest {
       tableDescriptor.setName(TEST);
       tableDescriptor.setShardCount(1);
       tableDescriptor.setTableUri(miniCluster.getFileSystemUri().toString() + "/blur/tables/test");
+
+      _mrWorkingPath = miniCluster.getFileSystemUri().toString() + "/mrworkingpath";
+      tableDescriptor.putToTableProperties(BlurSerDe.BLUR_MR_UPDATE_WORKING_PATH, _mrWorkingPath);
+
       client.createTable(tableDescriptor);
 
       Map<String, String> props = new HashMap<String, String>();
@@ -258,7 +268,67 @@ public class BlurSerDeTest {
   @Test
   public void test2() throws SQLException, ClassNotFoundException, IOException, BlurException, TException,
       InterruptedException {
-    miniCluster.startMrMiniCluster();
+    int totalRecords = runLoad(true);
+
+    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
+    BlurQuery blurQuery = new BlurQuery();
+    Query query = new Query();
+    query.setQuery("*");
+    blurQuery.setQuery(query);
+    BlurResults results = client.query(TEST, blurQuery);
+    assertEquals(totalRecords, results.getTotalResults());
+  }
+
+  @Test
+  public void test3() throws Exception {
+
+    Path mrWorkingPath = new Path(_mrWorkingPath);
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    fileSystem.mkdirs(mrWorkingPath);
+    fileSystem.mkdirs(new Path(mrWorkingPath, "tmp"));
+    fileSystem.mkdirs(new Path(mrWorkingPath, Driver.NEW));
+    fileSystem.mkdirs(new Path(mrWorkingPath, Driver.COMPLETE));
+    fileSystem.mkdirs(new Path(mrWorkingPath, Driver.CACHE));
+    fileSystem.mkdirs(new Path(mrWorkingPath, Driver.INPROGRESS));
+
+    int totalRecords = runLoad(false);
+
+    Driver driver = new Driver();
+    driver.setConf(miniCluster.getMRConfiguration());
+
+    String outputPathStr = miniCluster.getFileSystemUri().toString() + "/indexoutput";
+    String blurZkConnection = miniCluster.getZkConnectionString();
+
+    assertEquals(0, driver.run(new String[] { TEST, _mrWorkingPath, outputPathStr, blurZkConnection }));
+
+    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
+
+    client.loadData(TEST, outputPathStr);
+
+    waitUntilAllImportsAreCompleted(client, TEST);
+
+    BlurQuery blurQuery = new BlurQuery();
+    Query query = new Query();
+    query.setQuery("*");
+    blurQuery.setQuery(query);
+    BlurResults results = client.query(TEST, blurQuery);
+    assertEquals(totalRecords, results.getTotalResults());
+  }
+
+  private void waitUntilAllImportsAreCompleted(Iface client, String tableName) throws BlurException, TException,
+      InterruptedException {
+    while (true) {
+      Thread.sleep(1000);
+      TableStats tableStats = client.tableStats(tableName);
+      if (tableStats.getSegmentImportInProgressCount() == 0 && tableStats.getSegmentImportPendingCount() == 0) {
+        return;
+      }
+    }
+  }
+
+  private int runLoad(boolean disableMrUpdate) throws IOException, InterruptedException, ClassNotFoundException,
+      SQLException {
+
     Configuration configuration = miniCluster.getMRConfiguration();
     HiveConf hiveConf = new HiveConf(configuration, getClass());
     hiveConf.set("hive.server2.thrift.port", "0");
@@ -272,6 +342,7 @@ public class BlurSerDeTest {
     String userName = UserGroupInformation.getCurrentUser().getShortUserName();
     Connection connection = DriverManager.getConnection("jdbc:hive2://localhost:" + port, userName, "");
 
+    run(connection, "set blur.mr.update.disabled=" + disableMrUpdate);
     run(connection, "set hive.metastore.warehouse.dir=" + WAREHOUSE.toURI().toString());
     run(connection, "create database if not exists testdb");
     run(connection, "use testdb");
@@ -293,16 +364,8 @@ public class BlurSerDeTest {
     run(connection, "select * from loadtable");
     run(connection, "set " + BlurSerDe.BLUR_BLOCKING_APPLY + "=true");
     run(connection, "insert into table testtable select * from loadtable");
-    miniCluster.stopMrMiniCluster();
     connection.close();
-
-    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
-    BlurQuery blurQuery = new BlurQuery();
-    Query query = new Query();
-    query.setQuery("*");
-    blurQuery.setQuery(query);
-    BlurResults results = client.query(TEST, blurQuery);
-    assertEquals(totalRecords, results.getTotalResults());
+    return totalRecords;
   }
 
   private void generateData(File file, int totalRecords) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
index b479774..c89196e 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
@@ -29,15 +29,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.blur.lucene.codec.Blur024Codec;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
 import org.apache.blur.store.hdfs.DirectoryUtil;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.RowDocumentUtil;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,19 +49,16 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.lucene.codecs.StoredFieldsReader;
-import org.apache.lucene.document.DocumentStoredFieldVisitor;
 import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.FieldInfos;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.SegmentInfo;
 import org.apache.lucene.index.SegmentInfoPerCommit;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.util.Bits;
 
 public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
+  private static final String BLUR_INPUTFORMAT_FILE_CACHE_PATH = "blur.inputformat.file.cache.path";
+
   private static final Log LOG = LogFactory.getLog(BlurInputFormat.class);
 
   private static final String BLUR_TABLE_PATH_MAPPING = "blur.table.path.mapping.";
@@ -73,11 +68,23 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException {
     Path[] dirs = getInputPaths(context);
-    Configuration configuration = context.getConfiguration();
+    List<BlurInputSplit> splits = getSplits(context.getConfiguration(), dirs);
+    return toList(splits);
+  }
+
+  private List<InputSplit> toList(List<BlurInputSplit> splits) {
+    List<InputSplit> inputSplits = new ArrayList<InputSplit>();
+    for (BlurInputSplit inputSplit : splits) {
+      inputSplits.add(inputSplit);
+    }
+    return inputSplits;
+  }
+
+  public static List<BlurInputSplit> getSplits(Configuration configuration, Path[] dirs) throws IOException {
     int threads = configuration.getInt(BLUR_INPUT_FORMAT_DISCOVERY_THREADS, 10);
     ExecutorService service = Executors.newFixedThreadPool(threads);
     try {
-      List<InputSplit> splits = new ArrayList<InputSplit>();
+      List<BlurInputSplit> splits = new ArrayList<BlurInputSplit>();
       for (Path dir : dirs) {
         Text table = BlurInputFormat.getTableFromPath(configuration, dir);
         String snapshot = getSnapshotForTable(configuration, table.toString());
@@ -125,8 +132,8 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
     throw new IOException("Snaphost not found for table [" + tableName + "]");
   }
 
-  private List<InputSplit> getSegmentSplits(final Path dir, ExecutorService service, final Configuration configuration,
-      final Text table, final Text snapshot) throws IOException {
+  private static List<BlurInputSplit> getSegmentSplits(final Path dir, ExecutorService service,
+      final Configuration configuration, final Text table, final Text snapshot) throws IOException {
 
     FileSystem fileSystem = dir.getFileSystem(configuration);
     FileStatus[] shardDirs = fileSystem.listStatus(dir, new PathFilter() {
@@ -136,18 +143,18 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
       }
     });
 
-    List<Future<List<InputSplit>>> futures = new ArrayList<Future<List<InputSplit>>>();
+    List<Future<List<BlurInputSplit>>> futures = new ArrayList<Future<List<BlurInputSplit>>>();
     for (final FileStatus shardFileStatus : shardDirs) {
-      futures.add(service.submit(new Callable<List<InputSplit>>() {
+      futures.add(service.submit(new Callable<List<BlurInputSplit>>() {
         @Override
-        public List<InputSplit> call() throws Exception {
+        public List<BlurInputSplit> call() throws Exception {
           return getSegmentSplits(shardFileStatus.getPath(), configuration, table, snapshot);
         }
       }));
     }
 
-    List<InputSplit> results = new ArrayList<InputSplit>();
-    for (Future<List<InputSplit>> future : futures) {
+    List<BlurInputSplit> results = new ArrayList<BlurInputSplit>();
+    for (Future<List<BlurInputSplit>> future : futures) {
       try {
         results.addAll(future.get());
       } catch (InterruptedException e) {
@@ -164,10 +171,10 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
     return results;
   }
 
-  private List<InputSplit> getSegmentSplits(Path shardDir, Configuration configuration, Text table, Text snapshot)
-      throws IOException {
+  private static List<BlurInputSplit> getSegmentSplits(Path shardDir, Configuration configuration, Text table,
+      Text snapshot) throws IOException {
     final long start = System.nanoTime();
-    List<InputSplit> splits = new ArrayList<InputSplit>();
+    List<BlurInputSplit> splits = new ArrayList<BlurInputSplit>();
     Directory directory = getDirectory(configuration, table.toString(), shardDir);
     try {
       SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(configuration,
@@ -187,7 +194,7 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
       for (SegmentInfoPerCommit commit : segmentInfos) {
         SegmentInfo segmentInfo = commit.info;
         if (commit.getDelCount() == segmentInfo.getDocCount()) {
-          LOG.info("Segment [" + segmentInfo.name + "] in dir [" + shardDir + "] has all records deleted.");
+          LOG.info("Segment [{0}] in dir [{1}] has all records deleted.", segmentInfo.name, shardDir);
         } else {
           String name = segmentInfo.name;
           Set<String> files = segmentInfo.files();
@@ -202,11 +209,12 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
     } finally {
       directory.close();
       final long end = System.nanoTime();
-      LOG.info("Found split in shard [" + shardDir + "] in [" + (end - start) / 1000000000.0 + " ms].");
+      LOG.info("Found split in shard [{0}] in [{1} ms].", shardDir, (end - start) / 1000000000.0);
     }
   }
 
-  private IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation, Path shardDir) throws IOException {
+  private static IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation, Path shardDir)
+      throws IOException {
     for (IndexCommit commit : listCommits) {
       if (commit.getGeneration() == generation) {
         return commit;
@@ -218,121 +226,44 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
   @Override
   public RecordReader<Text, TableBlurRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
       throws IOException, InterruptedException {
-    BlurRecordReader blurRecordReader = new BlurRecordReader();
-    blurRecordReader.initialize(split, context);
-    return blurRecordReader;
-  }
-
-  public static class BlurRecordReader extends RecordReader<Text, TableBlurRecord> {
-
-    private boolean _setup;
-    private Text _rowId;
-    private TableBlurRecord _tableBlurRecord;
-    private Bits _liveDocs;
-    private StoredFieldsReader _fieldsReader;
-    private Directory _directory;
+    final GenericRecordReader genericRecordReader = new GenericRecordReader();
+    genericRecordReader.initialize((BlurInputSplit) split, context.getConfiguration());
+    return new RecordReader<Text, TableBlurRecord>() {
 
-    private int _docId = -1;
-    private int _maxDoc;
-    private Text _table;
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-      if (_setup) {
-        return;
+      @Override
+      public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+        genericRecordReader.initialize((BlurInputSplit) split, context.getConfiguration());
       }
-      _setup = true;
 
-      Configuration configuration = context.getConfiguration();
-      BlurInputSplit blurInputSplit = (BlurInputSplit) split;
-
-      _table = blurInputSplit.getTable();
-
-      _directory = getDirectory(configuration, _table.toString(), blurInputSplit.getDir());
-
-      SegmentInfos segmentInfos = new SegmentInfos();
-      segmentInfos.read(_directory, blurInputSplit.getSegmentsName());
-      SegmentInfoPerCommit commit = findSegmentInfoPerCommit(segmentInfos, blurInputSplit);
-
-      Blur024Codec blur024Codec = new Blur024Codec();
-      IOContext iocontext = IOContext.READ;
-      SegmentInfo segmentInfo = commit.info;
-      String segmentName = segmentInfo.name;
-      FieldInfos fieldInfos = blur024Codec.fieldInfosFormat().getFieldInfosReader()
-          .read(_directory, segmentName, iocontext);
-      if (commit.getDelCount() > 0) {
-        _liveDocs = blur024Codec.liveDocsFormat().readLiveDocs(_directory, commit, iocontext);
+      @Override
+      public boolean nextKeyValue() throws IOException, InterruptedException {
+        return genericRecordReader.nextKeyValue();
       }
-      _fieldsReader = blur024Codec.storedFieldsFormat().fieldsReader(_directory, segmentInfo, fieldInfos, iocontext);
-
-      _maxDoc = commit.info.getDocCount();
-    }
 
-    private SegmentInfoPerCommit findSegmentInfoPerCommit(SegmentInfos segmentInfos, BlurInputSplit blurInputSplit)
-        throws IOException {
-      String segmentInfoName = blurInputSplit.getSegmentInfoName();
-      for (SegmentInfoPerCommit commit : segmentInfos) {
-        if (commit.info.name.equals(segmentInfoName)) {
-          return commit;
-        }
+      @Override
+      public Text getCurrentKey() throws IOException, InterruptedException {
+        return genericRecordReader.getCurrentKey();
       }
-      throw new IOException("SegmentInfoPerCommit of [" + segmentInfoName + "] not found.");
-    }
 
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      if (_docId >= _maxDoc) {
-        return false;
-      }
-      while (true) {
-        _docId++;
-        if (_docId >= _maxDoc) {
-          return false;
-        }
-        if (_liveDocs == null) {
-          fetchBlurRecord();
-          return true;
-        } else if (_liveDocs.get(_docId)) {
-          fetchBlurRecord();
-          return true;
-        }
+      @Override
+      public TableBlurRecord getCurrentValue() throws IOException, InterruptedException {
+        return genericRecordReader.getCurrentValue();
       }
-    }
-
-    private void fetchBlurRecord() throws IOException {
-      DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();
-      _fieldsReader.visitDocument(_docId, visitor);
-      BlurRecord blurRecord = new BlurRecord();
-      String rowId = RowDocumentUtil.readRecord(visitor.getDocument(), blurRecord);
-      blurRecord.setRowId(rowId);
-      _rowId = new Text(rowId);
-      _tableBlurRecord = new TableBlurRecord(_table, blurRecord);
-    }
-
-    @Override
-    public Text getCurrentKey() throws IOException, InterruptedException {
-      return _rowId;
-    }
 
-    @Override
-    public TableBlurRecord getCurrentValue() throws IOException, InterruptedException {
-      return _tableBlurRecord;
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return (float) _docId / (float) _maxDoc;
-    }
+      @Override
+      public float getProgress() throws IOException, InterruptedException {
+        return genericRecordReader.getProgress();
+      }
 
-    @Override
-    public void close() throws IOException {
-      _fieldsReader.close();
-      _directory.close();
-    }
+      @Override
+      public void close() throws IOException {
+        genericRecordReader.close();
+      }
 
+    };
   }
 
-  public static class BlurInputSplit extends InputSplit implements Writable {
+  public static class BlurInputSplit extends InputSplit implements org.apache.hadoop.mapred.InputSplit, Writable {
 
     private static final String UTF_8 = "UTF-8";
     private long _fileLength;
@@ -354,12 +285,12 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
     }
 
     @Override
-    public long getLength() throws IOException, InterruptedException {
+    public long getLength() throws IOException {
       return _fileLength;
     }
 
     @Override
-    public String[] getLocations() throws IOException, InterruptedException {
+    public String[] getLocations() throws IOException {
       // @TODO create locations for fdt file
       return new String[] {};
     }
@@ -413,6 +344,22 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
 
   }
 
+  public static void setLocalCachePath(Job job, Path fileCachePath) {
+    setLocalCachePath(job.getConfiguration(), fileCachePath);
+  }
+
+  public static void setLocalCachePath(Configuration configuration, Path fileCachePath) {
+    configuration.set(BLUR_INPUTFORMAT_FILE_CACHE_PATH, fileCachePath.toString());
+  }
+
+  public static Path getLocalCachePath(Configuration configuration) {
+    String p = configuration.get(BLUR_INPUTFORMAT_FILE_CACHE_PATH);
+    if (p == null) {
+      return null;
+    }
+    return new Path(p);
+  }
+
   public static void addTable(Job job, TableDescriptor tableDescriptor, String snapshot)
       throws IllegalArgumentException, IOException {
     String tableName = tableDescriptor.getName();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
new file mode 100644
index 0000000..259bf72
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.codec.Blur024Codec;
+import org.apache.blur.mapreduce.lib.BlurInputFormat.BlurInputSplit;
+import org.apache.blur.store.blockcache.LastModified;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.codecs.StoredFieldsReader;
+import org.apache.lucene.document.DocumentStoredFieldVisitor;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.Bits;
+
+public class GenericRecordReader {
+
+  private static final String LASTMOD = ".lastmod";
+
+  private static final Log LOG = LogFactory.getLog(GenericRecordReader.class);
+
+  private boolean _setup;
+  private Text _rowId;
+  private TableBlurRecord _tableBlurRecord;
+  private Bits _liveDocs;
+  private StoredFieldsReader _fieldsReader;
+  private Directory _directory;
+  private Directory _readingDirectory;
+
+  private int _docId = -1;
+  private int _maxDoc;
+  private Text _table;
+
+  public void initialize(BlurInputSplit blurInputSplit, Configuration configuration) throws IOException {
+    if (_setup) {
+      return;
+    }
+    _setup = true;
+    _table = blurInputSplit.getTable();
+    Path localCachePath = BlurInputFormat.getLocalCachePath(configuration);
+    LOG.info("Local cache path [{0}]", localCachePath);
+    _directory = BlurInputFormat.getDirectory(configuration, _table.toString(), blurInputSplit.getDir());
+
+    SegmentInfos segmentInfos = new SegmentInfos();
+    segmentInfos.read(_directory, blurInputSplit.getSegmentsName());
+    SegmentInfoPerCommit commit = findSegmentInfoPerCommit(segmentInfos, blurInputSplit);
+
+    SegmentInfo segmentInfo = commit.info;
+    if (localCachePath != null) {
+      _readingDirectory = copyFilesLocally(configuration, _directory, _table.toString(), blurInputSplit.getDir(),
+          localCachePath, segmentInfo.files());
+    } else {
+      _readingDirectory = _directory;
+    }
+
+    Blur024Codec blur024Codec = new Blur024Codec();
+    IOContext iocontext = IOContext.READ;
+
+    String segmentName = segmentInfo.name;
+    FieldInfos fieldInfos = blur024Codec.fieldInfosFormat().getFieldInfosReader()
+        .read(_readingDirectory, segmentName, iocontext);
+    if (commit.getDelCount() > 0) {
+      _liveDocs = blur024Codec.liveDocsFormat().readLiveDocs(_readingDirectory, commit, iocontext);
+    }
+    _fieldsReader = blur024Codec.storedFieldsFormat().fieldsReader(_readingDirectory, segmentInfo, fieldInfos,
+        iocontext);
+
+    _maxDoc = commit.info.getDocCount();
+  }
+
+  private static Directory copyFilesLocally(Configuration configuration, Directory dir, String table, Path shardDir,
+      Path localCachePath, Set<String> files) throws IOException {
+    LOG.info("Copying files need to local cache for faster reads [{0}].", shardDir);
+    Path localShardPath = new Path(new Path(localCachePath, table), shardDir.getName());
+    HdfsDirectory localDir = new HdfsDirectory(configuration, localShardPath);
+    for (String name : files) {
+      if (!isValidFileToCache(name)) {
+        continue;
+      }
+      LOG.info("Valid file for local copy [{0}].", name);
+      if (!isValid(localDir, dir, name)) {
+        LastModified lastModified = (LastModified) dir;
+        long fileModified = lastModified.getFileModified(name);
+
+        IndexInput input = dir.openInput(name, IOContext.READONCE);
+        IndexOutput output = localDir.createOutput(name, IOContext.READONCE);
+        output.copyBytes(input, input.length());
+        output.close();
+        IndexOutput lastMod = localDir.createOutput(name + LASTMOD, IOContext.DEFAULT);
+        lastMod.writeLong(fileModified);
+        lastMod.close();
+      }
+    }
+    return localDir;
+  }
+
+  private static boolean isValidFileToCache(String name) {
+    if (name.endsWith(".fdt")) {
+      return true;
+    } else if (name.endsWith(".fdx")) {
+      return true;
+    } else if (name.endsWith(".del")) {
+      return true;
+    } else if (name.endsWith(".fnm")) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private static boolean isValid(HdfsDirectory localDir, Directory remoteDir, String name) throws IOException {
+    LastModified lastModified = (LastModified) remoteDir;
+    long fileModified = lastModified.getFileModified(name);
+    long fileLength = remoteDir.fileLength(name);
+
+    if (localDir.fileExists(name)) {
+      LOG.info("Cache file exists [{0}]", name);
+      if (localDir.fileLength(name) == fileLength) {
+        LOG.info("Cache file length matches [{0}]", name);
+        if (localDir.fileExists(name + LASTMOD)) {
+          LOG.info("Cache file last mod file exists [{0}]", name);
+          IndexInput input = localDir.openInput(name + LASTMOD, IOContext.DEFAULT);
+          long lastMod = input.readLong();
+          if (lastMod == fileModified) {
+            LOG.info("Cache file last mod matches [{0}]", name);
+            return true;
+          } else {
+            LOG.info("Cache file last mod does not match [{0}]", name);
+          }
+        } else {
+          LOG.info("Cache file last mod file does not exist [{0}]", name);
+        }
+      } else {
+        LOG.info("Cache file length does not match [{0}]", name);
+      }
+    } else {
+      LOG.info("Cache file does not exist [{0}]", name);
+    }
+    return false;
+  }
+
+  private SegmentInfoPerCommit findSegmentInfoPerCommit(SegmentInfos segmentInfos, BlurInputSplit blurInputSplit)
+      throws IOException {
+    String segmentInfoName = blurInputSplit.getSegmentInfoName();
+    for (SegmentInfoPerCommit commit : segmentInfos) {
+      if (commit.info.name.equals(segmentInfoName)) {
+        return commit;
+      }
+    }
+    throw new IOException("SegmentInfoPerCommit of [" + segmentInfoName + "] not found.");
+  }
+
+  public boolean nextKeyValue() throws IOException {
+    if (_docId >= _maxDoc) {
+      return false;
+    }
+    while (true) {
+      _docId++;
+      if (_docId >= _maxDoc) {
+        return false;
+      }
+      if (_liveDocs == null) {
+        fetchBlurRecord();
+        return true;
+      } else if (_liveDocs.get(_docId)) {
+        fetchBlurRecord();
+        return true;
+      }
+    }
+  }
+
+  private void fetchBlurRecord() throws IOException {
+    DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();
+    _fieldsReader.visitDocument(_docId, visitor);
+    BlurRecord blurRecord = new BlurRecord();
+    String rowId = RowDocumentUtil.readRecord(visitor.getDocument(), blurRecord);
+    blurRecord.setRowId(rowId);
+    _rowId = new Text(rowId);
+    _tableBlurRecord = new TableBlurRecord(_table, blurRecord);
+  }
+
+  public Text getCurrentKey() throws IOException {
+    return _rowId;
+  }
+
+  public TableBlurRecord getCurrentValue() throws IOException {
+    return _tableBlurRecord;
+  }
+
+  public float getProgress() throws IOException {
+    return (float) _docId / (float) _maxDoc;
+  }
+
+  public void close() throws IOException {
+    _fieldsReader.close();
+    _directory.close();
+    if (_readingDirectory != _directory) {
+      _readingDirectory.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
new file mode 100644
index 0000000..05ee5a2
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/Driver.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class Driver extends Configured implements Tool {
+
+  public static final String CACHE = "cache";
+  public static final String COMPLETE = "complete";
+  public static final String INPROGRESS = "inprogress";
+  public static final String NEW = "new";
+  private static final Log LOG = LogFactory.getLog(Driver.class);
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new Driver(), args);
+    System.exit(res);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    int c = 0;
+    String table = args[c++];
+    String mrIncWorkingPathStr = args[c++];
+    String outputPathStr = args[c++];
+    String blurZkConnection = args[c++];
+
+    Path outputPath = new Path(outputPathStr);
+    Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
+    FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());
+
+    Path newData = new Path(mrIncWorkingPath, NEW);
+    Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
+    Path completeData = new Path(mrIncWorkingPath, COMPLETE);
+    Path fileCache = new Path(mrIncWorkingPath, CACHE);
+
+    fileSystem.mkdirs(newData);
+    fileSystem.mkdirs(inprogressData);
+    fileSystem.mkdirs(completeData);
+    fileSystem.mkdirs(fileCache);
+
+    List<Path> inprogressPathList = new ArrayList<Path>();
+    boolean success = false;
+    try {
+      List<Path> srcPathList = new ArrayList<Path>();
+      for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
+        srcPathList.add(fileStatus.getPath());
+      }
+
+      inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);
+
+      Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
+
+      Iface client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
+      String snapshotId = UUID.randomUUID().toString();
+      client.createSnapshot(table, snapshotId);
+      TableDescriptor descriptor = client.describe(table);
+      Path tablePath = new Path(descriptor.getTableUri());
+
+      BlurInputFormat.setLocalCachePath(job, fileCache);
+      BlurInputFormat.addTable(job, descriptor, snapshotId);
+      MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingData.class);
+      for (Path p : inprogressPathList) {
+        FileInputFormat.addInputPath(job, p);
+        MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewData.class);
+      }
+
+      BlurOutputFormat.setOutputPath(job, outputPath);
+      BlurOutputFormat.setupJob(job, descriptor);
+
+      job.setReducerClass(UpdateReducer.class);
+      job.setMapOutputKeyClass(IndexKey.class);
+      job.setMapOutputValueClass(IndexValue.class);
+      job.setPartitionerClass(IndexKeyPartitioner.class);
+      job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
+
+      success = job.waitForCompletion(true);
+      Counters counters = job.getCounters();
+      LOG.info("Counters [" + counters + "]");
+
+    } finally {
+      if (success) {
+        LOG.info("Indexing job succeeded!");
+        movePathList(fileSystem, completeData, inprogressPathList);
+      } else {
+        LOG.error("Indexing job failed!");
+        movePathList(fileSystem, newData, inprogressPathList);
+      }
+    }
+
+    if (success) {
+      return 0;
+    } else {
+      return 1;
+    }
+
+  }
+
+  private List<Path> movePathList(FileSystem fileSystem, Path dstDir, List<Path> lst) throws IOException {
+    List<Path> result = new ArrayList<Path>();
+    for (Path src : lst) {
+      Path dst = new Path(dstDir, src.getName());
+      if (fileSystem.rename(src, dst)) {
+        LOG.info("Moving [{0}] to [{1}]", src, dst);
+        result.add(dst);
+      } else {
+        LOG.error("Could not move [{0}] to [{1}]", src, dst);
+      }
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKey.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKey.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKey.java
new file mode 100644
index 0000000..4fef751
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKey.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+public class IndexKey implements WritableComparable<IndexKey> {
+
+  public enum TYPE {
+    NEW_DATA_MARKER((byte) 0), OLD_DATA((byte) 1), NEW_DATA((byte) 2);
+
+    private byte _type;
+
+    private TYPE(byte type) {
+      _type = type;
+    }
+
+    public byte getType() {
+      return _type;
+    }
+
+    public static TYPE findType(byte type) {
+      switch (type) {
+      case 0:
+        return NEW_DATA_MARKER;
+      case 1:
+        return OLD_DATA;
+      case 2:
+        return NEW_DATA;
+      default:
+        throw new RuntimeException("Type [" + type + "] not found.");
+      }
+    }
+  }
+
+  private Text _rowId = new Text();
+  private TYPE _type;
+  private Text _recordId = new Text();
+  private long _timestamp;
+
+  public static IndexKey newData(String rowId, String recordId, long timestamp) {
+    return newData(new Text(rowId), new Text(recordId), timestamp);
+  }
+
+  public static IndexKey newData(Text rowId, Text recordId, long timestamp) {
+    IndexKey updateKey = new IndexKey();
+    updateKey._rowId = rowId;
+    updateKey._recordId = recordId;
+    updateKey._timestamp = timestamp;
+    updateKey._type = TYPE.NEW_DATA;
+    return updateKey;
+  }
+
+  public static IndexKey newDataMarker(String rowId) {
+    return newDataMarker(new Text(rowId));
+  }
+
+  public static IndexKey newDataMarker(Text rowId) {
+    IndexKey updateKey = new IndexKey();
+    updateKey._rowId = rowId;
+    updateKey._type = TYPE.NEW_DATA_MARKER;
+    return updateKey;
+  }
+
+  public static IndexKey oldData(String rowId, String recordId) {
+    return oldData(new Text(rowId), new Text(recordId));
+  }
+
+  public static IndexKey oldData(Text rowId, Text recordId) {
+    IndexKey updateKey = new IndexKey();
+    updateKey._rowId = rowId;
+    updateKey._recordId = recordId;
+    updateKey._timestamp = 0L;
+    updateKey._type = TYPE.OLD_DATA;
+    return updateKey;
+  }
+
+  public Text getRowId() {
+    return _rowId;
+  }
+
+  public TYPE getType() {
+    return _type;
+  }
+
+  public Text getRecordId() {
+    return _recordId;
+  }
+
+  public long getTimestamp() {
+    return _timestamp;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    _rowId.write(out);
+    out.write(_type.getType());
+    switch (_type) {
+    case NEW_DATA_MARKER:
+      break;
+    default:
+      _recordId.write(out);
+      out.writeLong(_timestamp);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _rowId.readFields(in);
+    _type = TYPE.findType(in.readByte());
+    switch (_type) {
+    case NEW_DATA_MARKER:
+      _recordId = new Text();
+      _timestamp = 0L;
+      break;
+    default:
+      _recordId.readFields(in);
+      _timestamp = in.readLong();
+    }
+  }
+
+  @Override
+  public int compareTo(IndexKey o) {
+    int compareTo = _rowId.compareTo(o._rowId);
+    if (compareTo == 0) {
+      if (_type == TYPE.NEW_DATA_MARKER) {
+        compareTo = _type.compareTo(o._type);
+      } else {
+        compareTo = _recordId.compareTo(o._recordId);
+        if (compareTo == 0) {
+          compareTo = _type.compareTo(o._type);
+          if (compareTo == 0) {
+            compareTo = compare(_timestamp, o._timestamp);
+          }
+        }
+      }
+    }
+    return compareTo;
+  }
+
+  private int compare(long a, long b) {
+    return (a < b) ? -1 : ((a == b) ? 0 : 1);
+  }
+
+  @Override
+  public String toString() {
+    return "IndexKey [_rowId=" + _rowId + ", _type=" + _type + ", _recordId=" + _recordId + ", _timestamp="
+        + _timestamp + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_recordId == null) ? 0 : _recordId.hashCode());
+    result = prime * result + ((_rowId == null) ? 0 : _rowId.hashCode());
+    result = prime * result + (int) (_timestamp ^ (_timestamp >>> 32));
+    result = prime * result + ((_type == null) ? 0 : _type.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    IndexKey other = (IndexKey) obj;
+    if (_recordId == null) {
+      if (other._recordId != null)
+        return false;
+    } else if (!_recordId.equals(other._recordId))
+      return false;
+    if (_rowId == null) {
+      if (other._rowId != null)
+        return false;
+    } else if (!_rowId.equals(other._rowId))
+      return false;
+    if (_timestamp != other._timestamp)
+      return false;
+    if (_type != other._type)
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKeyPartitioner.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKeyPartitioner.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKeyPartitioner.java
new file mode 100644
index 0000000..5bd5d19
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKeyPartitioner.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+
+public class IndexKeyPartitioner extends Partitioner<IndexKey, IndexValue> {
+
+  private HashPartitioner<Text, Writable> _partitioner = new HashPartitioner<Text, Writable>();
+
+  @Override
+  public int getPartition(IndexKey key, IndexValue value, int numPartitions) {
+    return _partitioner.getPartition(key.getRowId(), value, numPartitions);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKeyWritableComparator.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKeyWritableComparator.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKeyWritableComparator.java
new file mode 100644
index 0000000..41c0fdc
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexKeyWritableComparator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+public class IndexKeyWritableComparator extends WritableComparator {
+
+  public IndexKeyWritableComparator() {
+    super(IndexKey.class, true);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public int compare(WritableComparable a, WritableComparable b) {
+    IndexKey keya = (IndexKey) a;
+    IndexKey keyb = (IndexKey) b;
+    return keya.getRowId().compareTo(keyb.getRowId());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexValue.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexValue.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexValue.java
new file mode 100644
index 0000000..2f8a61e
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexValue.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.hadoop.io.Writable;
+
+public class IndexValue implements Writable {
+
+  private BlurRecord _blurRecord;
+
+  public IndexValue() {
+
+  }
+
+  public IndexValue(BlurRecord blurRecord) {
+    _blurRecord = blurRecord;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (_blurRecord == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      _blurRecord.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (in.readBoolean()) {
+      _blurRecord = new BlurRecord();
+      _blurRecord.readFields(in);
+    } else {
+      _blurRecord = null;
+    }
+  }
+
+  public BlurRecord getBlurRecord() {
+    return _blurRecord;
+  }
+
+  public void setBlurRecord(BlurRecord blurRecord) {
+    this._blurRecord = blurRecord;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingData.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingData.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingData.java
new file mode 100644
index 0000000..b7d96ff
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForExistingData.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.mapreduce.lib.TableBlurRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class MapperForExistingData extends Mapper<Text, TableBlurRecord, IndexKey, IndexValue> {
+  
+  
+
+  private Counter _existingRecords;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    _existingRecords = context.getCounter("Blur", "Existing Records Read");
+  }
+
+  @Override
+  protected void map(Text key, TableBlurRecord value, Context context) throws IOException, InterruptedException {
+    BlurRecord blurRecord = value.getBlurRecord();
+    IndexKey oldDataKey = IndexKey.oldData(blurRecord.getRowId(), blurRecord.getRecordId());
+    context.write(oldDataKey, new IndexValue(blurRecord));
+    _existingRecords.increment(1L);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewData.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewData.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewData.java
new file mode 100644
index 0000000..0fa1aa3
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/MapperForNewData.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class MapperForNewData extends Mapper<Text, BlurRecord, IndexKey, IndexValue> {
+
+  private static final IndexValue EMPTY_RECORD = new IndexValue();
+  private long _timestamp;
+  private Counter _newRecords;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    InputSplit inputSplit = context.getInputSplit();
+    FileSplit fileSplit = getFileSplit(inputSplit);
+    Path path = fileSplit.getPath();
+    FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
+    FileStatus fileStatus = fileSystem.getFileStatus(path);
+    _timestamp = fileStatus.getModificationTime();
+    _newRecords = context.getCounter("Blur", "New Records Read");
+  }
+
+  private FileSplit getFileSplit(InputSplit inputSplit) throws IOException {
+    if (inputSplit instanceof FileSplit) {
+      return (FileSplit) inputSplit;
+    }
+    if (inputSplit.getClass().getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
+      try {
+        Field declaredField = inputSplit.getClass().getDeclaredField("inputSplit");
+        declaredField.setAccessible(true);
+        return getFileSplit((InputSplit) declaredField.get(inputSplit));
+      } catch (NoSuchFieldException e) {
+        throw new IOException(e);
+      } catch (SecurityException e) {
+        throw new IOException(e);
+      } catch (IllegalArgumentException e) {
+        throw new IOException(e);
+      } catch (IllegalAccessException e) {
+        throw new IOException(e);
+      }
+    } else {
+      throw new IOException("Unknown input split type [" + inputSplit + "] [" + inputSplit.getClass() + "]");
+    }
+  }
+
+  @Override
+  protected void map(Text key, BlurRecord blurRecord, Context context) throws IOException, InterruptedException {
+    IndexKey newDataKey = IndexKey.newData(blurRecord.getRowId(), blurRecord.getRecordId(), _timestamp);
+    context.write(newDataKey, new IndexValue(blurRecord));
+    _newRecords.increment(1L);
+
+    IndexKey newDataMarker = IndexKey.newDataMarker(blurRecord.getRowId());
+    context.write(newDataMarker, EMPTY_RECORD);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
new file mode 100644
index 0000000..c1b65ec
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurMutate;
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.blur.mapreduce.lib.update.IndexKey.TYPE;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.mapreduce.lib.GetCounter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutate> {
+
+  @Override
+  protected void setup(final Context context) throws IOException, InterruptedException {
+    BlurOutputFormat.setProgressable(context);
+    BlurOutputFormat.setGetCounter(new GetCounter() {
+      @Override
+      public Counter getCounter(Enum<?> counterName) {
+        return context.getCounter(counterName);
+      }
+    });
+  }
+
+  @Override
+  protected void reduce(IndexKey key, Iterable<IndexValue> values, Context context) throws IOException,
+      InterruptedException {
+    if (key.getType() != TYPE.NEW_DATA_MARKER) {
+      // There is no new data for this row, skip.
+      return;
+    } else {
+      BlurRecord prevBlurRecord = null;
+      String prevRecordId = null;
+      for (IndexValue value : values) {
+        BlurRecord br = value.getBlurRecord();
+        if (br == null) {
+          // Skip null records because there were likely many new data markers
+          // for the row.
+          continue;
+        }
+
+        // Safe Copy
+        BlurRecord blurRecord = new BlurRecord(br);
+        String recordId = blurRecord.getRecordId();
+        if (prevRecordId == null || prevRecordId.equals(recordId)) {
+          // reassign to new record.
+          prevBlurRecord = blurRecord;
+          prevRecordId = recordId;
+        } else {
+          // flush prev and assign
+          context.write(new Text(blurRecord.getRowId()), toMutate(blurRecord));
+        }
+      }
+      if (prevBlurRecord != null) {
+        context.write(new Text(prevBlurRecord.getRowId()), toMutate(prevBlurRecord));
+      }
+    }
+  }
+
+  private BlurMutate toMutate(BlurRecord blurRecord) {
+    return new BlurMutate(MUTATE_TYPE.REPLACE, blurRecord);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/67984a6a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
index 45d482c..f7d5d1e 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
@@ -16,16 +16,17 @@
  */
 package org.apache.blur.mapreduce.lib;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.Map.Entry;
 import java.util.TreeMap;
+import java.util.UUID;
 
 import org.apache.blur.MiniCluster;
 import org.apache.blur.store.buffer.BufferStore;
@@ -109,21 +110,53 @@ public class BlurInputFormatTest {
   }
 
   @Test
-  public void testBlurInputFormatFastDisabled() throws IOException, BlurException, TException, ClassNotFoundException,
-      InterruptedException {
-    String tableName = "testBlurInputFormatFastDisabled";
-    runTest(tableName, true);
+  public void testBlurInputFormatFastDisabledNoFileCache() throws IOException, BlurException, TException,
+      ClassNotFoundException, InterruptedException {
+    String tableName = "testBlurInputFormatFastDisabledNoFileCache";
+    runTest(tableName, true, null);
+  }
+
+  @Test
+  public void testBlurInputFormatFastEnabledNoFileCache() throws IOException, BlurException, TException,
+      ClassNotFoundException, InterruptedException {
+    String tableName = "testBlurInputFormatFastEnabledNoFileCache";
+    runTest(tableName, false, null);
+  }
+
+  @Test
+  public void testBlurInputFormatFastDisabledFileCache() throws IOException, BlurException, TException,
+      ClassNotFoundException, InterruptedException {
+    String tableName = "testBlurInputFormatFastDisabledFileCache";
+    Path fileCache = new Path(miniCluster.getFileSystemUri() + "/filecache");
+    runTest(tableName, true, fileCache);
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    // @TODO write some assertions.
+    // RemoteIterator<LocatedFileStatus> listFiles =
+    // fileSystem.listFiles(fileCache, true);
+    // while (listFiles.hasNext()) {
+    // LocatedFileStatus locatedFileStatus = listFiles.next();
+    // System.out.println(locatedFileStatus.getPath());
+    // }
   }
 
   @Test
-  public void testBlurInputFormatFastEnabled() throws IOException, BlurException, TException, ClassNotFoundException,
-      InterruptedException {
-    String tableName = "testBlurInputFormatFastEnabled";
-    runTest(tableName, false);
+  public void testBlurInputFormatFastEnabledFileCache() throws IOException, BlurException, TException,
+      ClassNotFoundException, InterruptedException {
+    String tableName = "testBlurInputFormatFastEnabledFileCache";
+    Path fileCache = new Path(miniCluster.getFileSystemUri() + "/filecache");
+    runTest(tableName, false, fileCache);
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    // @TODO write some assertions.
+    // RemoteIterator<LocatedFileStatus> listFiles =
+    // fileSystem.listFiles(fileCache, true);
+    // while (listFiles.hasNext()) {
+    // LocatedFileStatus locatedFileStatus = listFiles.next();
+    // System.out.println(locatedFileStatus.getPath());
+    // }
   }
 
-  private void runTest(String tableName, boolean disableFast) throws IOException, BlurException, TException,
-      InterruptedException, ClassNotFoundException {
+  private void runTest(String tableName, boolean disableFast, Path fileCache) throws IOException, BlurException,
+      TException, InterruptedException, ClassNotFoundException {
     FileSystem fileSystem = miniCluster.getFileSystem();
     Path root = new Path(fileSystem.getUri() + "/");
 
@@ -148,6 +181,10 @@ public class BlurInputFormatTest {
     String snapshot = UUID.randomUUID().toString();
     client.createSnapshot(tableName, snapshot);
 
+    if (fileCache != null) {
+      BlurInputFormat.setLocalCachePath(job, fileCache);
+    }
+
     BlurInputFormat.addTable(job, tableDescriptor, snapshot);
     FileOutputFormat.setOutputPath(job, output);
 


Mime
View raw message