incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [10/13] git commit: Adding blur indexer project.
Date Tue, 30 Aug 2016 01:57:55 GMT
Adding blur indexer project.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/98359a40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/98359a40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/98359a40

Branch: refs/heads/master
Commit: 98359a409be47ceb9f06468e3bf701f299c35fe2
Parents: 1d103e4
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Aug 29 19:16:45 2016 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Aug 29 19:16:45 2016 -0400

----------------------------------------------------------------------
 .../apache/blur/indexer/BlurIndexCounter.java   |  27 +
 .../org/apache/blur/indexer/ClusterDriver.java  | 365 ++++++++++++++
 .../HdfsConfigurationNamespaceMerge.java        | 131 +++++
 .../apache/blur/indexer/IndexerJobDriver.java   | 498 +++++++++++++++++++
 .../blur/indexer/InputSplitPruneUtil.java       | 149 ++++++
 .../blur/indexer/MergeSortRowIdMatcher.java     | 386 ++++++++++++++
 .../ExistingDataIndexLookupMapper.java          | 230 +++++++++
 .../indexer/mapreduce/ExistingDataMapper.java   |  49 ++
 .../indexer/mapreduce/LookupBuilderMapper.java  |  34 ++
 .../indexer/mapreduce/LookupBuilderReducer.java | 184 +++++++
 .../blur/indexer/mapreduce/NewDataMapper.java   |  85 ++++
 .../mapreduce/PrunedBlurInputFormat.java        |  74 +++
 .../PrunedSequenceFileInputFormat.java          |  76 +++
 .../mapreduce/lib/update/BlurIndexCounter.java  |  27 -
 .../mapreduce/lib/update/ClusterDriver.java     | 378 --------------
 .../blur/mapreduce/lib/update/FasterDriver.java | 486 ------------------
 .../update/HdfsConfigurationNamespaceMerge.java | 131 -----
 .../lib/update/InputSplitPruneUtil.java         | 149 ------
 .../lib/update/LookupBuilderMapper.java         |  34 --
 .../lib/update/LookupBuilderReducer.java        | 181 -------
 .../lib/update/MapperForExistingDataMod.java    |  46 --
 .../MapperForExistingDataWithIndexLookup.java   | 228 ---------
 .../lib/update/MapperForNewDataMod.java         |  82 ---
 .../lib/update/MergeSortRowIdMatcher.java       | 388 ---------------
 .../lib/update/PrunedBlurInputFormat.java       |  73 ---
 .../update/PrunedSequenceFileInputFormat.java   |  75 ---
 distribution-bin/pom.xml                        |   5 +
 27 files changed, 2293 insertions(+), 2278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/BlurIndexCounter.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/BlurIndexCounter.java b/blur-indexer/src/main/java/org/apache/blur/indexer/BlurIndexCounter.java
new file mode 100644
index 0000000..d143d12
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/indexer/BlurIndexCounter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer;
+
+public enum BlurIndexCounter {
+
+  NEW_RECORDS, ROW_IDS_FROM_INDEX, ROW_IDS_TO_UPDATE_FROM_NEW_DATA, ROW_IDS_FROM_NEW_DATA,
+
+  INPUT_FORMAT_MAPPER, INPUT_FORMAT_EXISTING_RECORDS,
+
+  LOOKUP_MAPPER, LOOKUP_MAPPER_EXISTING_RECORDS, LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/ClusterDriver.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/ClusterDriver.java b/blur-indexer/src/main/java/org/apache/blur/indexer/ClusterDriver.java
new file mode 100644
index 0000000..b53add2
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/indexer/ClusterDriver.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class ClusterDriver extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(ClusterDriver.class);
+  private static final String BLUR_ENV = "blur.env";
+  private static final String SEP = "_";
+  private static final String IMPORT = "import";
+
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new Configuration(), new ClusterDriver(), args));
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    int c = 0;
+    final String blurEnv = args[c++];
+    final String blurZkConnection = args[c++];
+    final String extraConfig = args[c++];
+    final int reducerMultiplier = 1;
+    final Configuration conf = getConf();
+
+    final ExecutorService service = Executors.newCachedThreadPool();
+    final AtomicBoolean running = new AtomicBoolean();
+    running.set(true);
+
+    // Load configs for all filesystems.
+    Path path = new Path(extraConfig);
+    Configuration mergeHdfsConfigs = HdfsConfigurationNamespaceMerge.mergeHdfsConfigs(path.getFileSystem(conf), path);
+    conf.addResource(mergeHdfsConfigs);
+    conf.set(BlurConstants.BLUR_ZOOKEEPER_CONNECTION, blurZkConnection);
+    conf.set(BLUR_ENV, blurEnv);
+
+    final Iface client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
+
+    stopAllExistingMRJobs(blurEnv, conf);
+    cleanUpOldImportDirs(client, conf);
+    moveInprogressDirsBackToNew(client, conf);
+    unlockLockedTables(client);
+
+    Map<String, Future<Void>> futures = new HashMap<String, Future<Void>>();
+    while (running.get()) {
+      LOG.debug("Starting index update check for blur cluster [" + blurZkConnection + "].");
+      try {
+        List<String> tableList = client.tableList();
+        startMissingIndexerThreads(tableList, service, futures, blurZkConnection, conf, client, reducerMultiplier);
+      } catch (TException t) {
+        LOG.error("Unknown Blur Thrift Error, Retrying...", t);
+      }
+      Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+    }
+    return 0;
+  }
+
+  private void unlockLockedTables(Iface client) throws BlurException, TException {
+    List<String> tableList = client.tableList();
+    for (String table : tableList) {
+      TableDescriptor tableDescriptor = client.describe(table);
+      if (tableDescriptor.isEnabled()) {
+        unlockLockedTables(client, table);
+      }
+    }
+  }
+
+  private void unlockLockedTables(Iface client, String table) throws BlurException, TException {
+    Map<String, List<String>> listSnapshots = client.listSnapshots(table);
+    for (Entry<String, List<String>> e : listSnapshots.entrySet()) {
+      List<String> value = e.getValue();
+      if (value.contains(IndexerJobDriver.MRUPDATE_SNAPSHOT)) {
+        LOG.info("Unlocking table [{0}]", table);
+        client.removeSnapshot(table, IndexerJobDriver.MRUPDATE_SNAPSHOT);
+        return;
+      }
+    }
+  }
+
+  private void moveInprogressDirsBackToNew(Iface client, Configuration conf) throws BlurException, TException,
+      IOException {
+    List<String> tableList = client.tableList();
+    for (String table : tableList) {
+      String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table);
+      Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
+      Path newData = new Path(mrIncWorkingPath, IndexerJobDriver.NEW);
+      Path inprogressData = new Path(mrIncWorkingPath, IndexerJobDriver.INPROGRESS);
+      FileSystem fileSystem = inprogressData.getFileSystem(conf);
+      FileStatus[] listStatus = fileSystem.listStatus(inprogressData);
+      for (FileStatus fileStatus : listStatus) {
+        Path src = fileStatus.getPath();
+        Path dst = new Path(newData, src.getName());
+        if (fileSystem.rename(src, dst)) {
+          LOG.info("Moved [{0}] to [{1}] to be reprocessed.", src, dst);
+        } else {
+          LOG.error("Could not move [{0}] to [{1}] to be reprocessed.", src, dst);
+        }
+      }
+    }
+  }
+
+  private void cleanUpOldImportDirs(Iface client, Configuration conf) throws BlurException, TException, IOException {
+    List<String> tableList = client.tableList();
+    for (String table : tableList) {
+      cleanUpOldImportDirs(client, conf, table);
+    }
+  }
+
+  private void cleanUpOldImportDirs(Iface client, Configuration conf, String table) throws BlurException, TException,
+      IOException {
+    TableDescriptor descriptor = client.describe(table);
+    String tableUri = descriptor.getTableUri();
+    Path tablePath = new Path(tableUri);
+    FileSystem fileSystem = tablePath.getFileSystem(getConf());
+    Path importPath = new Path(tablePath, IMPORT);
+    if (fileSystem.exists(importPath)) {
+      for (FileStatus fileStatus : fileSystem.listStatus(importPath)) {
+        Path path = fileStatus.getPath();
+        LOG.info("Removing failed import [{0}]", path);
+        fileSystem.delete(path, true);
+      }
+    }
+  }
+
+  private void stopAllExistingMRJobs(String blurEnv, Configuration conf) throws YarnException, IOException,
+      InterruptedException {
+    Cluster cluster = new Cluster(conf);
+    JobStatus[] allJobStatuses = cluster.getAllJobStatuses();
+    for (JobStatus jobStatus : allJobStatuses) {
+      if (jobStatus.isJobComplete()) {
+        continue;
+      }
+      String jobFile = jobStatus.getJobFile();
+      JobID jobID = jobStatus.getJobID();
+      Job job = cluster.getJob(jobID);
+      FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+      Configuration configuration = new Configuration(false);
+      Path path = new Path(jobFile);
+      Path makeQualified = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+      if (hasReadAccess(fileSystem, makeQualified)) {
+        try (FSDataInputStream in = fileSystem.open(makeQualified)) {
+          configuration.addResource(copy(in));
+        }
+        String jobBlurEnv = configuration.get(BLUR_ENV);
+        LOG.info("Checking job [{0}] has env [{1}] current env set to [{2}]", jobID, jobBlurEnv, blurEnv);
+        if (blurEnv.equals(jobBlurEnv)) {
+          LOG.info("Killing running job [{0}]", jobID);
+          job.killJob();
+        }
+      }
+    }
+  }
+
+  private static InputStream copy(FSDataInputStream input) throws IOException {
+    try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+      IOUtils.copy(input, outputStream);
+      return new ByteArrayInputStream(outputStream.toByteArray());
+    }
+  }
+
+  private static boolean hasReadAccess(FileSystem fileSystem, Path p) {
+    try {
+      fileSystem.access(p, FsAction.READ);
+      return true;
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  private Callable<Void> getCallable(final String blurZkConnection, final Configuration conf, final Iface client,
+      final String table, final int reducerMultiplier) {
+    return new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        String originalThreadName = Thread.currentThread().getName();
+        try {
+          Thread.currentThread().setName(table);
+          if (!isEnabled(client, table)) {
+            LOG.info("Table [" + table + "] is not enabled.");
+            return null;
+          }
+          waitForDataToLoad(client, table);
+          LOG.debug("Starting index update for table [" + table + "].");
+          final String mrIncWorkingPathStr = getMRIncWorkingPathStr(client, table);
+          final String outputPathStr = getOutputPathStr(client, table);
+          Path path = new Path(outputPathStr);
+          FileSystem fileSystem = path.getFileSystem(getConf());
+
+          Configuration configuration = new Configuration(conf);
+          BlurInputFormat.setMaxNumberOfMaps(configuration, 10000);
+
+          IndexerJobDriver driver = new IndexerJobDriver();
+          driver.setConf(configuration);
+          try {
+            driver.run(new String[] { table, mrIncWorkingPathStr, outputPathStr, blurZkConnection,
+                Integer.toString(reducerMultiplier) });
+          } finally {
+            if (fileSystem.exists(path)) {
+              fileSystem.delete(path, true);
+            }
+          }
+          return null;
+        } finally {
+          Thread.currentThread().setName(originalThreadName);
+        }
+      }
+    };
+  }
+
+  private void startMissingIndexerThreads(List<String> tableList, ExecutorService service,
+      Map<String, Future<Void>> futures, final String blurZkConnection, final Configuration conf, final Iface client,
+      int reducerMultiplier) throws BlurException, TException {
+    Set<String> tables = new HashSet<String>(tableList);
+
+    // remove futures that are complete
+    for (String table : tables) {
+      Future<Void> future = futures.get(table);
+      if (future != null) {
+        if (future.isDone()) {
+          try {
+            future.get();
+          } catch (InterruptedException e) {
+            LOG.error("Unknown error while processing table [" + table + "].", e);
+          } catch (ExecutionException e) {
+            LOG.error("Unknown error while processing table [" + table + "].", e.getCause());
+          }
+          futures.remove(table);
+        } else {
+          LOG.info("Update for table [" + table + "] still running.");
+        }
+      }
+    }
+
+    // start missing tables
+    for (String table : tables) {
+      if (!futures.containsKey(table)) {
+        if (isEnabled(client, table)) {
+          Future<Void> future = service.submit(getCallable(blurZkConnection, conf, client, table, reducerMultiplier));
+          futures.put(table, future);
+        }
+      }
+    }
+  }
+
+  public static void waitForDataToLoad(Iface client, String table) throws BlurException, TException,
+      InterruptedException {
+    if (isFullyLoaded(client.tableStats(table))) {
+      return;
+    }
+    while (true) {
+      TableStats tableStats = client.tableStats(table);
+      if (isFullyLoaded(tableStats)) {
+        LOG.info("Data load complete in table [" + table + "] [" + tableStats + "]");
+        return;
+      }
+      LOG.info("Waiting for data to load in table [" + table + "] [" + tableStats + "]");
+      Thread.sleep(5000);
+    }
+  }
+
+  private static boolean isFullyLoaded(TableStats tableStats) {
+    if (tableStats.getSegmentImportInProgressCount() == 0 && tableStats.getSegmentImportPendingCount() == 0) {
+      return true;
+    }
+    return false;
+  }
+
+  private boolean isEnabled(Iface client, String table) throws BlurException, TException {
+    TableDescriptor tableDescriptor = client.describe(table);
+    return tableDescriptor.isEnabled();
+  }
+
+  private void mkdirs(FileSystem fileSystem, Path path) throws IOException {
+    if (fileSystem.exists(path)) {
+      return;
+    }
+    LOG.info("Creating path [" + path + "].");
+    if (!fileSystem.mkdirs(path)) {
+      LOG.error("Path [" + path + "] could not be created.");
+    }
+  }
+
+  public static String getMRIncWorkingPathStr(Iface client, String table) throws BlurException, TException, IOException {
+    TableDescriptor descriptor = client.describe(table);
+    Map<String, String> tableProperties = descriptor.getTableProperties();
+    if (tableProperties != null) {
+      String workingPath = tableProperties.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
+      if (workingPath != null) {
+        return workingPath;
+      }
+    }
+    throw new IOException("Table [" + table + "] does not have the property ["
+        + BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH + "] setup correctly.");
+  }
+
+  private String getOutputPathStr(Iface client, String table) throws BlurException, TException, IOException {
+    TableDescriptor descriptor = client.describe(table);
+    String tableUri = descriptor.getTableUri();
+    Path tablePath = new Path(tableUri);
+    FileSystem fileSystem = tablePath.getFileSystem(getConf());
+    Path importPath = new Path(tablePath, IMPORT);
+    mkdirs(fileSystem, importPath);
+    return new Path(importPath, IMPORT + SEP + System.currentTimeMillis() + SEP + UUID.randomUUID().toString())
+        .toString();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/HdfsConfigurationNamespaceMerge.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/HdfsConfigurationNamespaceMerge.java b/blur-indexer/src/main/java/org/apache/blur/indexer/HdfsConfigurationNamespaceMerge.java
new file mode 100644
index 0000000..93ded33
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/indexer/HdfsConfigurationNamespaceMerge.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HdfsConfigurationNamespaceMerge {
+
+  private static final String DFS_NAMESERVICES = "dfs.nameservices";
+  private static final Log LOG = LogFactory.getLog(HdfsConfigurationNamespaceMerge.class);
+
+  public static void main(String[] args) throws IOException {
+    Path p = new Path("./src/main/scripts/conf/hdfs");
+
+    Configuration configuration = mergeHdfsConfigs(p.getFileSystem(new Configuration()), p);
+
+    // configuration.writeXml(System.out);
+
+    Collection<String> nameServices = configuration.getStringCollection(DFS_NAMESERVICES);
+    for (String name : nameServices) {
+      Path path = new Path("hdfs://" + name + "/");
+      FileSystem fileSystem = path.getFileSystem(configuration);
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fileStatus : listStatus) {
+        System.out.println(fileStatus.getPath());
+      }
+    }
+  }
+
+  private static boolean checkHostName(String host) {
+    try {
+      InetAddress.getAllByName(host);
+      return true;
+    } catch (UnknownHostException e) {
+      LOG.warn("Host not found [" + host + "]");
+      return false;
+    }
+  }
+
+  public static Configuration mergeHdfsConfigs(FileSystem fs, Path p) throws IOException {
+    List<Configuration> configList = new ArrayList<Configuration>();
+    gatherConfigs(fs, p, configList);
+    return merge(configList);
+  }
+
+  public static Configuration merge(List<Configuration> configList) throws IOException {
+    Configuration merge = new Configuration(false);
+    Set<String> nameServices = new HashSet<String>();
+    for (Configuration configuration : configList) {
+      String nameService = configuration.get(DFS_NAMESERVICES);
+      if (nameServices.contains(nameService)) {
+        throw new IOException("Multiple confs define namespace [" + nameService + "]");
+      }
+      nameServices.add(nameService);
+      if (shouldAdd(configuration, nameService)) {
+        for (Entry<String, String> e : configuration) {
+          String key = e.getKey();
+          if (key.contains(nameService)) {
+            String value = e.getValue();
+            merge.set(key, value);
+          }
+        }
+      }
+    }
+    merge.set(DFS_NAMESERVICES, StringUtils.join(nameServices, ","));
+    return merge;
+  }
+
+  private static boolean shouldAdd(Configuration configuration, String nameService) {
+    for (Entry<String, String> e : configuration) {
+      String key = e.getKey();
+      if (key.contains(nameService) && key.startsWith("dfs.namenode.rpc-address.")) {
+        return checkHostName(getHost(e.getValue()));
+      }
+    }
+    return false;
+  }
+
+  private static String getHost(String host) {
+    return host.substring(0, host.indexOf(":"));
+  }
+
+  public static void gatherConfigs(FileSystem fs, Path p, List<Configuration> configList) throws IOException {
+    if (fs.isFile(p)) {
+      if (p.getName().endsWith(".xml")) {
+        LOG.info("Loading file [" + p + "]");
+        Configuration configuration = new Configuration(false);
+        configuration.addResource(p);
+        configList.add(configuration);
+      } else {
+        LOG.info("Skipping file [" + p + "]");
+      }
+    } else {
+      FileStatus[] listStatus = fs.listStatus(p);
+      for (FileStatus fileStatus : listStatus) {
+        gatherConfigs(fs, fileStatus.getPath(), configList);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/IndexerJobDriver.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/IndexerJobDriver.java b/blur-indexer/src/main/java/org/apache/blur/indexer/IndexerJobDriver.java
new file mode 100644
index 0000000..9fea980
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/indexer/IndexerJobDriver.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.indexer.mapreduce.ExistingDataIndexLookupMapper;
+import org.apache.blur.indexer.mapreduce.ExistingDataMapper;
+import org.apache.blur.indexer.mapreduce.LookupBuilderMapper;
+import org.apache.blur.indexer.mapreduce.LookupBuilderReducer;
+import org.apache.blur.indexer.mapreduce.NewDataMapper;
+import org.apache.blur.indexer.mapreduce.PrunedBlurInputFormat;
+import org.apache.blur.indexer.mapreduce.PrunedSequenceFileInputFormat;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.mapreduce.lib.update.IndexKey;
+import org.apache.blur.mapreduce.lib.update.IndexKeyPartitioner;
+import org.apache.blur.mapreduce.lib.update.IndexKeyWritableComparator;
+import org.apache.blur.mapreduce.lib.update.IndexValue;
+import org.apache.blur.mapreduce.lib.update.UpdateReducer;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class IndexerJobDriver extends Configured implements Tool {
+
+  public static final String BLUR_UPDATE_ID = "blur.update.id";
+  private static final String BLUR_EXEC_TYPE = "blur.exec.type";
+  public static final String TMP = "tmp";
+
+  public enum EXEC {
+    MR_ONLY, MR_WITH_LOOKUP, AUTOMATIC
+  }
+
+  public static final String MRUPDATE_SNAPSHOT = "mrupdate-snapshot";
+  public static final String CACHE = "cache";
+  public static final String COMPLETE = "complete";
+  public static final String INPROGRESS = "inprogress";
+  public static final String NEW = "new";
+  private static final Log LOG = LogFactory.getLog(IndexerJobDriver.class);
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new IndexerJobDriver(), args);
+    System.exit(res);
+  }
+
+  static class PartitionedInputResult {
+    final Path _partitionedInputData;
+    final Counters _counters;
+    final long[] _rowIdsFromNewData;
+    final long[] _rowIdsToUpdateFromNewData;
+    final long[] _rowIdsFromIndex;
+
+    PartitionedInputResult(Path partitionedInputData, Counters counters, int shards, TaskReport[] taskReports) {
+      _partitionedInputData = partitionedInputData;
+      _counters = counters;
+      _rowIdsFromNewData = new long[shards];
+      _rowIdsToUpdateFromNewData = new long[shards];
+      _rowIdsFromIndex = new long[shards];
+      for (TaskReport tr : taskReports) {
+        int id = tr.getTaskID().getId();
+        Counters taskCounters = tr.getTaskCounters();
+        Counter total = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
+        _rowIdsFromNewData[id] = total.getValue();
+        Counter update = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
+        _rowIdsToUpdateFromNewData[id] = update.getValue();
+        Counter index = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
+        _rowIdsFromIndex[id] = index.getValue();
+      }
+    }
+
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    int c = 0;
+    if (args.length < 5) {
+      System.err
+          .println("Usage Driver <table> <mr inc working path> <output path> <zk connection> <reducer multipler> <extra config files...>");
+      return 1;
+    }
+    String table = args[c++];
+    String mrIncWorkingPathStr = args[c++];
+    String outputPathStr = args[c++];
+    String blurZkConnection = args[c++];
+    int reducerMultipler = Integer.parseInt(args[c++]);
+    for (; c < args.length; c++) {
+      String externalConfigFileToAdd = args[c];
+      getConf().addResource(new Path(externalConfigFileToAdd));
+    }
+
+    Path outputPath = new Path(outputPathStr);
+    Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
+    FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());
+
+    Path newData = new Path(mrIncWorkingPath, NEW);
+    Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
+    Path completeData = new Path(mrIncWorkingPath, COMPLETE);
+    Path fileCache = new Path(mrIncWorkingPath, CACHE);
+    Path tmpPathDontDelete = new Path(mrIncWorkingPath, TMP);
+
+    Path tmpPath = new Path(tmpPathDontDelete, UUID.randomUUID().toString());
+
+    fileSystem.mkdirs(newData);
+    fileSystem.mkdirs(inprogressData);
+    fileSystem.mkdirs(completeData);
+    fileSystem.mkdirs(fileCache);
+
+    List<Path> srcPathList = new ArrayList<Path>();
+    for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
+      srcPathList.add(fileStatus.getPath());
+    }
+    if (srcPathList.isEmpty()) {
+      return 0;
+    }
+
+    List<Path> inprogressPathList = new ArrayList<Path>();
+    boolean success = false;
+    Iface client = null;
+
+    EXEC exec = EXEC.valueOf(getConf().get(BLUR_EXEC_TYPE, EXEC.AUTOMATIC.name()).toUpperCase());
+
+    String uuid = UUID.randomUUID().toString();
+
+    try {
+      client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
+      TableDescriptor descriptor = client.describe(table);
+      Map<String, String> tableProperties = descriptor.getTableProperties();
+      String fastDir = tableProperties.get("blur.table.disable.fast.dir");
+      if (fastDir == null || !fastDir.equals("true")) {
+        LOG.error("Table [{0}] has blur.table.disable.fast.dir enabled, not supported in fast MR update.", table);
+        return 1;
+      }
+
+      waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
+      client.createSnapshot(table, MRUPDATE_SNAPSHOT);
+      TableStats tableStats = client.tableStats(table);
+
+      inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);
+
+      switch (exec) {
+      case MR_ONLY:
+        success = runMrOnly(descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler);
+        break;
+      case MR_WITH_LOOKUP:
+        success = runMrWithLookup(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler,
+            tmpPath, tableStats, MRUPDATE_SNAPSHOT);
+        break;
+      case AUTOMATIC:
+        success = runAutomatic(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler,
+            tmpPath, tableStats, MRUPDATE_SNAPSHOT);
+        break;
+      default:
+        throw new RuntimeException("Exec type [" + exec + "] not supported.");
+      }
+    } finally {
+      if (success) {
+        LOG.info("Associate lookup cache with new data!");
+        associateLookupCache(uuid, fileCache, outputPath);
+        LOG.info("Indexing job succeeded!");
+        client.loadData(table, outputPathStr);
+        LOG.info("Load data called");
+        movePathList(fileSystem, completeData, inprogressPathList);
+        LOG.info("Input data moved to complete");
+        ClusterDriver.waitForDataToLoad(client, table);
+        LOG.info("Data loaded");
+      } else {
+        LOG.error("Indexing job failed!");
+        movePathList(fileSystem, newData, inprogressPathList);
+      }
+      fileSystem.delete(tmpPath, true);
+      if (client != null) {
+        client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
+      }
+    }
+
+    if (success) {
+      return 0;
+    } else {
+      return 1;
+    }
+  }
+
+  private void associateLookupCache(String uuid, Path fileCache, Path outputPath) throws IOException {
+    FileSystem fileSystem = fileCache.getFileSystem(getConf());
+    cleanupExtraFileFromSpecX(fileSystem, uuid, fileCache);
+    associateLookupCache(fileSystem, uuid, fileSystem.getFileStatus(fileCache), outputPath);
+  }
+
+  private void cleanupExtraFileFromSpecX(FileSystem fileSystem, String uuid, Path fileCache) throws IOException {
+    FileStatus[] listStatus = fileSystem.listStatus(fileCache);
+    List<FileStatus> uuidPaths = new ArrayList<FileStatus>();
+    for (FileStatus fs : listStatus) {
+      Path path = fs.getPath();
+      if (fs.isDirectory()) {
+        cleanupExtraFileFromSpecX(fileSystem, uuid, path);
+      } else if (path.getName().startsWith(uuid)) {
+        uuidPaths.add(fs);
+      }
+    }
+    if (uuidPaths.size() > 1) {
+      deleteIncomplete(fileSystem, uuidPaths);
+    }
+  }
+
+  private void deleteIncomplete(FileSystem fileSystem, List<FileStatus> uuidPaths) throws IOException {
+    long max = 0;
+    FileStatus keeper = null;
+    for (FileStatus fs : uuidPaths) {
+      long len = fs.getLen();
+      if (len > max) {
+        keeper = fs;
+        max = len;
+      }
+    }
+    for (FileStatus fs : uuidPaths) {
+      if (fs != keeper) {
+        LOG.info("Deleteing incomplete cache file [{0}]", fs.getPath());
+        fileSystem.delete(fs.getPath(), false);
+      }
+    }
+  }
+
+  private void associateLookupCache(FileSystem fileSystem, String uuid, FileStatus fileCache, Path outputPath)
+      throws IOException {
+    Path path = fileCache.getPath();
+    if (fileCache.isDirectory()) {
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fs : listStatus) {
+        associateLookupCache(fileSystem, uuid, fs, outputPath);
+      }
+    } else if (path.getName().startsWith(uuid)) {
+      Path parent = path.getParent();
+      String shardName = parent.getName();
+      Path indexPath = findOutputDirPath(outputPath, shardName);
+      LOG.info("Path found for shard [{0}] outputPath [{1}]", shardName, outputPath);
+      String id = MergeSortRowIdMatcher.getIdForSingleSegmentIndex(getConf(), indexPath);
+      Path file = new Path(path.getParent(), id + ".seq");
+      MergeSortRowIdMatcher.commitWriter(getConf(), file, path);
+    }
+  }
+
+  private Path findOutputDirPath(Path outputPath, String shardName) throws IOException {
+    FileSystem fileSystem = outputPath.getFileSystem(getConf());
+    Path shardPath = new Path(outputPath, shardName);
+    if (!fileSystem.exists(shardPath)) {
+      throw new IOException("Shard path [" + shardPath + "]");
+    }
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".commit");          
+      }
+    });
+    if (listStatus.length == 1) {
+      FileStatus fs = listStatus[0];
+      return fs.getPath();
+    } else {
+      throw new IOException("More than one sub dir [" + shardPath + "]");
+    }
+  }
+
+  private boolean runAutomatic(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
+      Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
+      throws ClassNotFoundException, IOException, InterruptedException {
+    PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
+        fileCache);
+
+    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
+
+    InputSplitPruneUtil.setBlurLookupRowIdFromNewDataCounts(job, table, result._rowIdsFromNewData);
+    InputSplitPruneUtil.setBlurLookupRowIdUpdateFromNewDataCounts(job, table, result._rowIdsToUpdateFromNewData);
+    InputSplitPruneUtil.setBlurLookupRowIdFromIndexCounts(job, table, result._rowIdsFromIndex);
+    InputSplitPruneUtil.setTable(job, table);
+
+    BlurInputFormat.setLocalCachePath(job, fileCache);
+
+    // Existing data - This adds the copy data files first open and stream
+    // through all documents.
+    {
+      Path tablePath = new Path(descriptor.getTableUri());
+      BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
+      MultipleInputs.addInputPath(job, tablePath, PrunedBlurInputFormat.class, ExistingDataMapper.class);
+    }
+
+    // Existing data - This adds the row id lookup
+    {
+      ExistingDataIndexLookupMapper.setSnapshot(job, MRUPDATE_SNAPSHOT);
+      FileInputFormat.addInputPath(job, result._partitionedInputData);
+      MultipleInputs.addInputPath(job, result._partitionedInputData, PrunedSequenceFileInputFormat.class,
+          ExistingDataIndexLookupMapper.class);
+    }
+
+    // New Data
+    for (Path p : inprogressPathList) {
+      FileInputFormat.addInputPath(job, p);
+      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, NewDataMapper.class);
+    }
+
+    BlurOutputFormat.setOutputPath(job, outputPath);
+    BlurOutputFormat.setupJob(job, descriptor);
+
+    job.setReducerClass(UpdateReducer.class);
+    job.setMapOutputKeyClass(IndexKey.class);
+    job.setMapOutputValueClass(IndexValue.class);
+    job.setPartitionerClass(IndexKeyPartitioner.class);
+    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
+
+    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
+
+    boolean success = job.waitForCompletion(true);
+    Counters counters = job.getCounters();
+    LOG.info("Counters [" + counters + "]");
+    return success;
+  }
+
+  private boolean runMrWithLookup(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
+      Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
+      throws ClassNotFoundException, IOException, InterruptedException {
+    PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
+        fileCache);
+
+    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
+
+    ExistingDataIndexLookupMapper.setSnapshot(job, MRUPDATE_SNAPSHOT);
+    FileInputFormat.addInputPath(job, result._partitionedInputData);
+    MultipleInputs.addInputPath(job, result._partitionedInputData, SequenceFileInputFormat.class,
+        ExistingDataIndexLookupMapper.class);
+
+    for (Path p : inprogressPathList) {
+      FileInputFormat.addInputPath(job, p);
+      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, NewDataMapper.class);
+    }
+
+    BlurOutputFormat.setOutputPath(job, outputPath);
+    BlurOutputFormat.setupJob(job, descriptor);
+
+    job.setReducerClass(UpdateReducer.class);
+    job.setMapOutputKeyClass(IndexKey.class);
+    job.setMapOutputValueClass(IndexValue.class);
+    job.setPartitionerClass(IndexKeyPartitioner.class);
+    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
+
+    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
+
+    boolean success = job.waitForCompletion(true);
+    Counters counters = job.getCounters();
+    LOG.info("Counters [" + counters + "]");
+    return success;
+  }
+
+  private boolean runMrOnly(TableDescriptor descriptor, List<Path> inprogressPathList, String table, Path fileCache,
+      Path outputPath, int reducerMultipler) throws IOException, ClassNotFoundException, InterruptedException {
+    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
+    Path tablePath = new Path(descriptor.getTableUri());
+    BlurInputFormat.setLocalCachePath(job, fileCache);
+    BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
+    MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, ExistingDataMapper.class);
+
+    for (Path p : inprogressPathList) {
+      FileInputFormat.addInputPath(job, p);
+      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, NewDataMapper.class);
+    }
+
+    BlurOutputFormat.setOutputPath(job, outputPath);
+    BlurOutputFormat.setupJob(job, descriptor);
+
+    job.setReducerClass(UpdateReducer.class);
+    job.setMapOutputKeyClass(IndexKey.class);
+    job.setMapOutputValueClass(IndexValue.class);
+    job.setPartitionerClass(IndexKeyPartitioner.class);
+    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
+
+    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
+
+    boolean success = job.waitForCompletion(true);
+    Counters counters = job.getCounters();
+    LOG.info("Counters [" + counters + "]");
+    return success;
+  }
+
+  private PartitionedInputResult buildPartitionedInputData(String uuid, Path tmpPath, TableDescriptor descriptor,
+      List<Path> inprogressPathList, String snapshot, Path fileCachePath) throws IOException, ClassNotFoundException,
+      InterruptedException {
+    Job job = Job.getInstance(getConf(), "Partitioning data for table [" + descriptor.getName() + "]");
+    job.getConfiguration().set(BLUR_UPDATE_ID, uuid);
+
+    // Needed for the bloom filter path information.
+    BlurOutputFormat.setTableDescriptor(job, descriptor);
+    BlurInputFormat.setLocalCachePath(job, fileCachePath);
+    ExistingDataIndexLookupMapper.setSnapshot(job, snapshot);
+
+    for (Path p : inprogressPathList) {
+      FileInputFormat.addInputPath(job, p);
+    }
+    Path outputPath = new Path(tmpPath, UUID.randomUUID().toString());
+    job.setJarByClass(getClass());
+    job.setMapperClass(LookupBuilderMapper.class);
+    job.setReducerClass(LookupBuilderReducer.class);
+
+    int shardCount = descriptor.getShardCount();
+    job.setNumReduceTasks(shardCount);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(BooleanWritable.class);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    if (job.waitForCompletion(true)) {
+      return new PartitionedInputResult(outputPath, job.getCounters(), shardCount, job.getTaskReports(TaskType.REDUCE));
+    } else {
+      throw new IOException("Partitioning failed!");
+    }
+  }
+
+  private void waitForOtherSnapshotsToBeRemoved(Iface client, String table, String snapshot) throws BlurException,
+      TException, InterruptedException {
+    while (true) {
+      Map<String, List<String>> listSnapshots = client.listSnapshots(table);
+      boolean mrupdateSnapshots = false;
+      for (Entry<String, List<String>> e : listSnapshots.entrySet()) {
+        List<String> value = e.getValue();
+        if (value.contains(snapshot)) {
+          mrupdateSnapshots = true;
+        }
+      }
+      if (!mrupdateSnapshots) {
+        return;
+      } else {
+        LOG.info(snapshot + " Snapshot for table [{0}] already exists", table);
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+        LOG.info("Retrying");
+      }
+    }
+  }
+
+  private List<Path> movePathList(FileSystem fileSystem, Path dstDir, List<Path> lst) throws IOException {
+    List<Path> result = new ArrayList<Path>();
+    for (Path src : lst) {
+      Path dst = new Path(dstDir, src.getName());
+      if (fileSystem.rename(src, dst)) {
+        LOG.info("Moving [{0}] to [{1}]", src, dst);
+        result.add(dst);
+      } else {
+        LOG.error("Could not move [{0}] to [{1}]", src, dst);
+      }
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/InputSplitPruneUtil.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/InputSplitPruneUtil.java b/blur-indexer/src/main/java/org/apache/blur/indexer/InputSplitPruneUtil.java
new file mode 100644
index 0000000..043b071
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/indexer/InputSplitPruneUtil.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer;
+
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+public class InputSplitPruneUtil {
+
+  private static final String BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.update.from.new.data.count";
+  private static final String BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX = "blur.lookup.rowid.from.new.data.count.";
+  private static final String BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX = "blur.lookup.rowid.from.index.count.";
+
+  private static final String BLUR_LOOKUP_TABLE = "blur.lookup.table";
+  private static final String BLUR_LOOKUP_RATIO_PER_SHARD = "blur.lookup.ratio.per.shard";
+  private static final String BLUR_LOOKUP_MAX_TOTAL_PER_SHARD = "blur.lookup.max.total.per.shard";
+
+  private static final double DEFAULT_LOOKUP_RATIO = 0.5;
+  private static final long DEFAULT_LOOKUP_MAX_TOTAL = Long.MAX_VALUE;
+
+  public static boolean shouldLookupExecuteOnShard(Configuration configuration, String table, int shard) {
+    double lookupRatio = getLookupRatio(configuration);
+    long maxLookupCount = getMaxLookupCount(configuration);
+    long rowIdFromNewDataCount = getBlurLookupRowIdFromNewDataCount(configuration, table, shard);
+    long rowIdUpdateFromNewDataCount = getBlurLookupRowIdUpdateFromNewDataCount(configuration, table, shard);
+    long rowIdFromIndexCount = getBlurLookupRowIdFromIndexCount(configuration, table, shard);
+    return shouldLookupRun(rowIdFromIndexCount, rowIdFromNewDataCount, rowIdUpdateFromNewDataCount, lookupRatio,
+        maxLookupCount);
+  }
+
+  private static boolean shouldLookupRun(long rowIdFromIndexCount, long rowIdFromNewDataCount,
+      long rowIdUpdateFromNewDataCount, double lookupRatio, long maxLookupCount) {
+    if (rowIdUpdateFromNewDataCount > maxLookupCount) {
+      return false;
+    }
+    double d = (double) rowIdUpdateFromNewDataCount / (double) rowIdFromIndexCount;
+    if (d <= lookupRatio) {
+      return true;
+    }
+    return false;
+  }
+
+  public static double getLookupRatio(Configuration configuration) {
+    return configuration.getDouble(BLUR_LOOKUP_RATIO_PER_SHARD, DEFAULT_LOOKUP_RATIO);
+  }
+
+  private static long getMaxLookupCount(Configuration configuration) {
+    return configuration.getLong(BLUR_LOOKUP_MAX_TOTAL_PER_SHARD, DEFAULT_LOOKUP_MAX_TOTAL);
+  }
+
+  public static void setTable(Job job, String table) {
+    setTable(job.getConfiguration(), table);
+  }
+
+  public static void setTable(Configuration configuration, String table) {
+    configuration.set(BLUR_LOOKUP_TABLE, table);
+  }
+
+  public static String getTable(Configuration configuration) {
+    return configuration.get(BLUR_LOOKUP_TABLE);
+  }
+
+  public static String getBlurLookupRowIdFromIndexCountName(String table) {
+    return BLUR_LOOKUP_ROWID_FROM_INDEX_COUNT_PREFIX + table;
+  }
+
+  public static String getBlurLookupRowIdFromNewDataCountName(String table) {
+    return BLUR_LOOKUP_ROWID_FROM_NEW_DATA_COUNT_PREFIX + table;
+  }
+
+  public static String getBlurLookupRowIdUpdateFromNewDataCountName(String table) {
+    return BLUR_LOOKUP_ROWID_UPDATE_FROM_NEW_DATA_COUNT_PREFIX + table;
+  }
+
+  public static long getBlurLookupRowIdUpdateFromNewDataCount(Configuration configuration, String table, int shard) {
+    String[] strings = configuration.getStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table));
+    return getCount(strings, shard);
+  }
+
+  public static long getBlurLookupRowIdFromNewDataCount(Configuration configuration, String table, int shard) {
+    String[] strings = configuration.getStrings(getBlurLookupRowIdFromNewDataCountName(table));
+    return getCount(strings, shard);
+  }
+
+  public static long getBlurLookupRowIdFromIndexCount(Configuration configuration, String table, int shard) {
+    String[] strings = configuration.getStrings(getBlurLookupRowIdFromIndexCountName(table));
+    return getCount(strings, shard);
+  }
+
+  public static void setBlurLookupRowIdFromNewDataCounts(Job job, String table, long[] counts) {
+    setBlurLookupRowIdFromNewDataCounts(job.getConfiguration(), table, counts);
+  }
+
+  public static void setBlurLookupRowIdFromNewDataCounts(Configuration configuration, String table, long[] counts) {
+    configuration.setStrings(getBlurLookupRowIdFromNewDataCountName(table), toStrings(counts));
+  }
+
+  public static void setBlurLookupRowIdUpdateFromNewDataCounts(Job job, String table, long[] counts) {
+    setBlurLookupRowIdUpdateFromNewDataCounts(job.getConfiguration(), table, counts);
+  }
+
+  public static void setBlurLookupRowIdUpdateFromNewDataCounts(Configuration configuration, String table, long[] counts) {
+    configuration.setStrings(getBlurLookupRowIdUpdateFromNewDataCountName(table), toStrings(counts));
+  }
+
+  public static void setBlurLookupRowIdFromIndexCounts(Job job, String table, long[] counts) {
+    setBlurLookupRowIdFromIndexCounts(job.getConfiguration(), table, counts);
+  }
+
+  public static void setBlurLookupRowIdFromIndexCounts(Configuration configuration, String table, long[] counts) {
+    configuration.setStrings(getBlurLookupRowIdFromIndexCountName(table), toStrings(counts));
+  }
+
+  public static long getCount(String[] strings, int shard) {
+    return Long.parseLong(strings[shard]);
+  }
+
+  public static int getShardFromDirectoryPath(Path path) {
+    return ShardUtil.getShardIndex(path.getName());
+  }
+
+  public static String[] toStrings(long[] counts) {
+    if (counts == null) {
+      return null;
+    }
+    String[] strs = new String[counts.length];
+    for (int i = 0; i < counts.length; i++) {
+      strs[i] = Long.toString(counts[i]);
+    }
+    return strs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/MergeSortRowIdMatcher.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/MergeSortRowIdMatcher.java b/blur-indexer/src/main/java/org/apache/blur/indexer/MergeSortRowIdMatcher.java
new file mode 100644
index 0000000..66cd2ac
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/indexer/MergeSortRowIdMatcher.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.index.AtomicReaderUtil;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.store.hdfs.DirectoryDecorator;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+
+public class MergeSortRowIdMatcher {
+
+  private static final String DEL = ".del";
+  private static final Log LOG = LogFactory.getLog(MergeSortRowIdMatcher.class);
+  private static final Progressable NO_OP = new Progressable() {
+    @Override
+    public void progress() {
+
+    }
+  };
+  private static final long _10_SECONDS = TimeUnit.SECONDS.toNanos(10);
+
+  public interface Action {
+    void found(Text rowId) throws IOException;
+  }
+
+  private final MyReader[] _readers;
+  private final Configuration _configuration;
+  private final Path _cachePath;
+  private final IndexCommit _indexCommit;
+  private final Directory _directory;
+  private final Progressable _progressable;
+
+  private DirectoryReader _reader;
+
+  public MergeSortRowIdMatcher(Directory directory, long generation, Configuration configuration, Path cachePath)
+      throws IOException {
+    this(directory, generation, configuration, cachePath, null);
+  }
+
+  public MergeSortRowIdMatcher(Directory directory, long generation, Configuration configuration, Path cachePath,
+      Progressable progressable) throws IOException {
+    List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
+    _indexCommit = findIndexCommit(listCommits, generation);
+    _configuration = configuration;
+    _cachePath = cachePath;
+    _directory = directory;
+    _progressable = progressable == null ? NO_OP : progressable;
+    _readers = openReaders();
+  }
+
+  public void lookup(Text rowId, Action action) throws IOException {
+    if (lookup(rowId)) {
+      action.found(rowId);
+    }
+  }
+
+  private boolean lookup(Text rowId) throws IOException {
+    advanceReadersIfNeeded(rowId);
+    sortReaders();
+    return checkReaders(rowId);
+  }
+
+  private boolean checkReaders(Text rowId) {
+    for (MyReader reader : _readers) {
+      int compareTo = reader.getCurrentRowId().compareTo(rowId);
+      if (compareTo == 0) {
+        return true;
+      } else if (compareTo > 0) {
+        return false;
+      }
+    }
+    return false;
+  }
+
+  private void advanceReadersIfNeeded(Text rowId) throws IOException {
+    _progressable.progress();
+    for (MyReader reader : _readers) {
+      if (rowId.compareTo(reader.getCurrentRowId()) > 0) {
+        advanceReader(reader, rowId);
+      }
+    }
+  }
+
+  private void advanceReader(MyReader reader, Text rowId) throws IOException {
+    while (reader.next()) {
+      if (rowId.compareTo(reader.getCurrentRowId()) <= 0) {
+        return;
+      }
+    }
+  }
+
+  private static final Comparator<MyReader> COMP = new Comparator<MyReader>() {
+    @Override
+    public int compare(MyReader o1, MyReader o2) {
+      return o1.getCurrentRowId().compareTo(o2.getCurrentRowId());
+    }
+  };
+
+  private void sortReaders() {
+    Arrays.sort(_readers, COMP);
+  }
+
+  private MyReader[] openReaders() throws IOException {
+    Collection<SegmentKey> segmentKeys = getSegmentKeys();
+    MyReader[] readers = new MyReader[segmentKeys.size()];
+    int i = 0;
+    for (SegmentKey segmentKey : segmentKeys) {
+      readers[i++] = openReader(segmentKey);
+    }
+    return readers;
+  }
+
+  private MyReader openReader(SegmentKey segmentKey) throws IOException {
+    Path file = getCacheFilePath(segmentKey);
+    FileSystem fileSystem = _cachePath.getFileSystem(_configuration);
+    if (!fileSystem.exists(file)) {
+      createCacheFile(file, segmentKey);
+    }
+    Reader reader = new SequenceFile.Reader(_configuration, SequenceFile.Reader.file(file));
+    return new MyReader(reader);
+  }
+
+  private void createCacheFile(Path file, SegmentKey segmentKey) throws IOException {
+    LOG.info("Building cache for segment [{0}] to [{1}]", segmentKey, file);
+    Path tmpPath = getTmpWriterPath(file.getParent());
+    try (Writer writer = createWriter(_configuration, tmpPath)) {
+      DirectoryReader reader = getReader();
+      for (AtomicReaderContext context : reader.leaves()) {
+        SegmentReader segmentReader = AtomicReaderUtil.getSegmentReader(context.reader());
+        if (segmentReader.getSegmentName().equals(segmentKey.getSegmentName())) {
+          writeRowIds(writer, segmentReader);
+          break;
+        }
+      }
+    }
+    commitWriter(_configuration, file, tmpPath);
+  }
+
+  public static void commitWriter(Configuration configuration, Path file, Path tmpPath) throws IOException {
+    FileSystem fileSystem = tmpPath.getFileSystem(configuration);
+    LOG.info("Commit tmp [{0}] to file [{1}]", tmpPath, file);
+    if (!fileSystem.rename(tmpPath, file)) {
+      LOG.warn("Could not commit tmp file [{0}] to file [{1}]", tmpPath, file);
+    }
+  }
+
+  public static Path getTmpWriterPath(Path dir) {
+    return new Path(dir, UUID.randomUUID().toString() + ".tmp");
+  }
+
+  public static Writer createWriter(Configuration configuration, Path tmpPath) throws IOException {
+    return SequenceFile.createWriter(configuration, SequenceFile.Writer.file(tmpPath),
+        SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(NullWritable.class),
+        SequenceFile.Writer.compression(CompressionType.BLOCK, getCodec(configuration)));
+  }
+
+  private static CompressionCodec getCodec(Configuration configuration) {
+    if (ZlibFactory.isNativeZlibLoaded(configuration)) {
+      return new GzipCodec();
+    }
+    return new DeflateCodec();
+  }
+
+  private void writeRowIds(Writer writer, SegmentReader segmentReader) throws IOException {
+    Terms terms = segmentReader.terms(BlurConstants.ROW_ID);
+    if (terms == null) {
+      return;
+    }
+    TermsEnum termsEnum = terms.iterator(null);
+    BytesRef rowId;
+    long s = System.nanoTime();
+    while ((rowId = termsEnum.next()) != null) {
+      long n = System.nanoTime();
+      if (n + _10_SECONDS > s) {
+        _progressable.progress();
+        s = System.nanoTime();
+      }
+      writer.append(new Text(rowId.utf8ToString()), NullWritable.get());
+    }
+  }
+
+  private IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation) throws IOException {
+    for (IndexCommit commit : listCommits) {
+      if (commit.getGeneration() == generation) {
+        return commit;
+      }
+    }
+    throw new IOException("Generation [" + generation + "] not found.");
+  }
+
+  static class SegmentKey {
+
+    final String _segmentName;
+    final String _id;
+
+    SegmentKey(String segmentName, String id) throws IOException {
+      _segmentName = segmentName;
+      _id = id;
+    }
+
+    String getSegmentName() {
+      return _segmentName;
+    }
+
+    @Override
+    public String toString() {
+      return _id;
+    }
+  }
+
+  private DirectoryReader getReader() throws IOException {
+    if (_reader == null) {
+      _reader = DirectoryReader.open(_indexCommit);
+    }
+    return _reader;
+  }
+
+  private Collection<SegmentKey> getSegmentKeys() throws IOException {
+    List<SegmentKey> keys = new ArrayList<SegmentKey>();
+    SegmentInfos segmentInfos = new SegmentInfos();
+    segmentInfos.read(_directory, _indexCommit.getSegmentsFileName());
+    for (SegmentInfoPerCommit segmentInfoPerCommit : segmentInfos) {
+      String name = segmentInfoPerCommit.info.name;
+      String id = getId(segmentInfoPerCommit.info);
+      keys.add(new SegmentKey(name, id));
+    }
+    return keys;
+  }
+
+  private String getId(SegmentInfo si) throws IOException {
+    HdfsDirectory dir = getHdfsDirectory(si.dir);
+    Set<String> files = new TreeSet<String>(si.files());
+    return getId(_configuration, dir, files);
+  }
+
+  private static String getId(Configuration configuration, HdfsDirectory dir, Set<String> files) throws IOException {
+    long ts = 0;
+    String file = null;
+    for (String f : files) {
+      if (f.endsWith(DEL)) {
+        continue;
+      }
+      long fileModified = dir.getFileModified(f);
+      if (fileModified > ts) {
+        ts = fileModified;
+        file = f;
+      }
+    }
+
+    Path path = dir.getPath();
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    Path realFile = new Path(path, file);
+    if (!fileSystem.exists(realFile)) {
+      realFile = dir.getRealFilePathFromSymlink(file);
+      if (!fileSystem.exists(realFile)) {
+        throw new IOException("Lucene file [" + file + "] for dir [" + path + "] can not be found.");
+      }
+    }
+    return getFirstBlockId(fileSystem, realFile);
+  }
+
+  public static String getIdForSingleSegmentIndex(Configuration configuration, Path indexPath) throws IOException {
+    HdfsDirectory dir = new HdfsDirectory(configuration, indexPath);
+    Set<String> files = new TreeSet<String>(Arrays.asList(dir.listAll()));
+    return getId(configuration, dir, files);
+  }
+
+  private static String getFirstBlockId(FileSystem fileSystem, Path realFile) throws IOException {
+    FileStatus fileStatus = fileSystem.getFileStatus(realFile);
+    BlockLocation[] locations = fileSystem.getFileBlockLocations(fileStatus, 0, 1);
+    HdfsBlockLocation location = (HdfsBlockLocation) locations[0];
+    LocatedBlock locatedBlock = location.getLocatedBlock();
+    ExtendedBlock block = locatedBlock.getBlock();
+    return toNiceString(block.getBlockId());
+  }
+
+  private static String toNiceString(long blockId) {
+    return "b" + blockId;
+  }
+
+  private static HdfsDirectory getHdfsDirectory(Directory dir) {
+    if (dir instanceof HdfsDirectory) {
+      return (HdfsDirectory) dir;
+    } else if (dir instanceof DirectoryDecorator) {
+      DirectoryDecorator dd = (DirectoryDecorator) dir;
+      return getHdfsDirectory(dd.getOriginalDirectory());
+    } else {
+      throw new RuntimeException("Unknown directory type.");
+    }
+  }
+
+  private Path getCacheFilePath(SegmentKey segmentKey) {
+    return new Path(_cachePath, segmentKey + ".seq");
+  }
+
+  static class MyReader {
+
+    final Reader _reader;
+    final Text _rowId = new Text();
+    boolean _finished = false;
+
+    public MyReader(Reader reader) {
+      _reader = reader;
+    }
+
+    public Text getCurrentRowId() {
+      return _rowId;
+    }
+
+    public boolean next() throws IOException {
+      if (_finished) {
+        return false;
+      }
+      if (_reader.next(_rowId)) {
+        return true;
+      }
+      _finished = true;
+      return false;
+    }
+
+    public boolean isFinished() {
+      return _finished;
+    }
+  }
+
+  public static Path getCachePath(Path cachePath, String table, String shardName) {
+    return new Path(new Path(cachePath, table), shardName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/ExistingDataIndexLookupMapper.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/ExistingDataIndexLookupMapper.java b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/ExistingDataIndexLookupMapper.java
new file mode 100644
index 0000000..90e5f9c
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/ExistingDataIndexLookupMapper.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.indexer.BlurIndexCounter;
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.mapreduce.lib.update.IndexKey;
+import org.apache.blur.mapreduce.lib.update.IndexValue;
+import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.FetchRecordResult;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import com.google.common.io.Closer;
+
+public class ExistingDataIndexLookupMapper extends Mapper<Text, BooleanWritable, IndexKey, IndexValue> {
+
+  private static final Log LOG = LogFactory.getLog(ExistingDataIndexLookupMapper.class);
+  private static final String BLUR_SNAPSHOT = "blur.snapshot";
+  
+  private Counter _existingRecords;
+  private Counter _rowLookup;
+  private BlurPartitioner _blurPartitioner;
+  private Path _tablePath;
+  private int _numberOfShardsInTable;
+  private Configuration _configuration;
+  private String _snapshot;
+  private int _indexShard = -1;
+  private DirectoryReader _reader;
+  private IndexSearcher _indexSearcher;
+  private long _totalNumberOfBytes;
+  private Closer _closer;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    Counter counter = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER);
+    counter.increment(1);
+
+    _configuration = context.getConfiguration();
+    _existingRecords = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_EXISTING_RECORDS);
+    _rowLookup = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT);
+    _blurPartitioner = new BlurPartitioner();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+    _numberOfShardsInTable = tableDescriptor.getShardCount();
+    _tablePath = new Path(tableDescriptor.getTableUri());
+    _snapshot = getSnapshot(_configuration);
+    _totalNumberOfBytes = _configuration.getLong(LookupBuilderReducer.BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024);
+    _closer = Closer.create();
+  }
+
+  @Override
+  protected void map(Text key, BooleanWritable value, Context context) throws IOException, InterruptedException {
+    if (value.get()) {
+      String rowId = key.toString();
+      LOG.debug("Looking up rowid [" + rowId + "]");
+      _rowLookup.increment(1);
+      IndexSearcher indexSearcher = getIndexSearcher(rowId);
+      Term term = new Term(BlurConstants.ROW_ID, rowId);
+      RowCollector collector = getCollector(context);
+      indexSearcher.search(new TermQuery(term), collector);
+      LOG.debug("Looking for rowid [" + rowId + "] has [" + collector.records + "] records");
+    }
+  }
+
+  @Override
+  protected void cleanup(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context) throws IOException,
+      InterruptedException {
+    _closer.close();
+  }
+
+  static class RowCollector extends Collector {
+
+    private AtomicReader reader;
+    private Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context _context;
+    private Counter _existingRecords;
+    int records;
+
+    RowCollector(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context, Counter existingRecords) {
+      _context = context;
+      _existingRecords = existingRecords;
+    }
+
+    @Override
+    public void setScorer(Scorer scorer) throws IOException {
+
+    }
+
+    @Override
+    public void setNextReader(AtomicReaderContext context) throws IOException {
+      reader = context.reader();
+    }
+
+    @Override
+    public void collect(int doc) throws IOException {
+      Document document = reader.document(doc);
+      FetchRecordResult result = RowDocumentUtil.getRecord(document);
+      String rowid = result.getRowid();
+      Record record = result.getRecord();
+      String recordId = record.getRecordId();
+      IndexKey oldDataKey = IndexKey.oldData(rowid, recordId);
+      try {
+        _context.write(oldDataKey, new IndexValue(toBlurRecord(rowid, record)));
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      _existingRecords.increment(1L);
+    }
+
+    private BlurRecord toBlurRecord(String rowId, Record record) {
+      BlurRecord blurRecord = new BlurRecord();
+      blurRecord.setRowId(rowId);
+      blurRecord.setRecordId(record.getRecordId());
+      blurRecord.setFamily(record.getFamily());
+      List<Column> columns = record.getColumns();
+      for (Column column : columns) {
+        blurRecord.addColumn(column.getName(), column.getValue());
+      }
+      return blurRecord;
+    }
+
+    @Override
+    public boolean acceptsDocsOutOfOrder() {
+      return false;
+    }
+  }
+
+  private RowCollector getCollector(Mapper<Text, BooleanWritable, IndexKey, IndexValue>.Context context) {
+    return new RowCollector(context, _existingRecords);
+  }
+
+  private IndexSearcher getIndexSearcher(String rowId) throws IOException {
+    int shard = _blurPartitioner.getShard(rowId, _numberOfShardsInTable);
+    if (_indexSearcher != null) {
+      if (shard != _indexShard) {
+        throw new IOException("Input data is not partitioned correctly.");
+      }
+      return _indexSearcher;
+    } else {
+      _indexShard = shard;
+      Path shardPath = new Path(_tablePath, ShardUtil.getShardName(_indexShard));
+      HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath);
+      SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration,
+          SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath));
+      Long generation = policy.getGeneration(_snapshot);
+      if (generation == null) {
+        hdfsDirectory.close();
+        throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]");
+      }
+
+      BlurConfiguration bc = new BlurConfiguration();
+      BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc,
+          _totalNumberOfBytes);
+      _closer.register(blockCacheDirectoryFactoryV2);
+      Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null);
+
+      List<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
+      IndexCommit indexCommit = findIndexCommit(listCommits, generation, shardPath);
+      _reader = DirectoryReader.open(indexCommit);
+      return _indexSearcher = new IndexSearcher(_reader);
+    }
+  }
+
+  public static IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation, Path shardDir)
+      throws IOException {
+    for (IndexCommit commit : listCommits) {
+      if (commit.getGeneration() == generation) {
+        return commit;
+      }
+    }
+    throw new IOException("Generation [" + generation + "] not found in shard [" + shardDir + "]");
+  }
+
+  public static void setSnapshot(Job job, String snapshot) {
+    setSnapshot(job.getConfiguration(), snapshot);
+  }
+
+  public static void setSnapshot(Configuration configuration, String snapshot) {
+    configuration.set(BLUR_SNAPSHOT, snapshot);
+  }
+
+  public static String getSnapshot(Configuration configuration) {
+    return configuration.get(BLUR_SNAPSHOT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/ExistingDataMapper.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/ExistingDataMapper.java b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/ExistingDataMapper.java
new file mode 100644
index 0000000..5cb0948
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/ExistingDataMapper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.blur.indexer.BlurIndexCounter;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.mapreduce.lib.TableBlurRecord;
+import org.apache.blur.mapreduce.lib.update.IndexKey;
+import org.apache.blur.mapreduce.lib.update.IndexValue;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class ExistingDataMapper extends Mapper<Text, TableBlurRecord, IndexKey, IndexValue> {
+
+  private Counter _existingRecords;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    Counter counter = context.getCounter(BlurIndexCounter.INPUT_FORMAT_MAPPER);
+    counter.increment(1);
+    _existingRecords = context.getCounter(BlurIndexCounter.INPUT_FORMAT_EXISTING_RECORDS);
+  }
+
+  @Override
+  protected void map(Text key, TableBlurRecord value, Context context) throws IOException, InterruptedException {
+    BlurRecord blurRecord = value.getBlurRecord();
+    IndexKey oldDataKey = IndexKey.oldData(blurRecord.getRowId(), blurRecord.getRecordId());
+    context.write(oldDataKey, new IndexValue(blurRecord));
+    _existingRecords.increment(1L);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/98359a40/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderMapper.java
----------------------------------------------------------------------
diff --git a/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderMapper.java b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderMapper.java
new file mode 100644
index 0000000..8b3dcb7
--- /dev/null
+++ b/blur-indexer/src/main/java/org/apache/blur/indexer/mapreduce/LookupBuilderMapper.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.indexer.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class LookupBuilderMapper extends Mapper<Text, BlurRecord, Text, NullWritable> {
+
+  @Override
+  protected void map(Text key, BlurRecord value, Mapper<Text, BlurRecord, Text, NullWritable>.Context context)
+      throws IOException, InterruptedException {
+    context.write(new Text(value.getRowId()), NullWritable.get());
+  }
+
+}


Mime
View raw message