incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [31/51] [partial] Fixed BLUR-126.
Date Thu, 06 Jun 2013 18:58:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/pom.xml
----------------------------------------------------------------------
diff --git a/blur-mapred/pom.xml b/blur-mapred/pom.xml
new file mode 100644
index 0000000..5e09359
--- /dev/null
+++ b/blur-mapred/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	you under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.blur</groupId>
+		<artifactId>blur</artifactId>
+		<version>0.1.5</version>
+		<relativePath>../pom.xml</relativePath>
+	</parent>
+	<groupId>org.apache.blur</groupId>
+	<artifactId>blur-mapred</artifactId>
+	<packaging>jar</packaging>
+	<name>Blur Map Reduce</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+			<version>${zookeeper.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-store</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-util</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>${log4j.version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.mail</groupId>
+					<artifactId>mail</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.mrunit</groupId>
+			<artifactId>mrunit</artifactId>
+			<version>${mrunit.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-test</artifactId>
+			<version>${hadoop.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<repositories>
+		<repository>
+			<id>libdir</id>
+			<url>file://${basedir}/../lib</url>
+		</repository>
+	</repositories>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java b/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
new file mode 100644
index 0000000..1545475
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapred/AbstractOutputCommitter.java
@@ -0,0 +1,96 @@
+package org.apache.blur.mapred;
+
+import java.io.IOException;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+
+public abstract class AbstractOutputCommitter extends OutputCommitter {
+
+  private final static Log LOG = LogFactory.getLog(AbstractOutputCommitter.class);
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    // look through all the shards for attempts that need to be cleaned up.
+    // also find all the attempts that are finished
+    // then rename all the attempts jobs to commits
+    LOG.info("Commiting Job [{0}]", jobContext.getJobID());
+    Configuration configuration = jobContext.getConfiguration();
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    makeSureNoEmptyShards(configuration, tableOutput);
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
+      if (isShard(fileStatus)) {
+        commitJob(jobContext, fileStatus.getPath());
+      }
+    }
+
+  }
+
+  private void makeSureNoEmptyShards(Configuration configuration, Path tableOutput) throws IOException {
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    int shardCount = tableDescriptor.getShardCount();
+    for (int i = 0; i < shardCount; i++) {
+      String shardName = BlurUtil.getShardName(i);
+      fileSystem.mkdirs(new Path(tableOutput, shardName));
+    }
+  }
+
+  private void commitJob(JobContext jobContext, Path shardPath) throws IOException {
+    FileSystem fileSystem = shardPath.getFileSystem(jobContext.getConfiguration());
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath);
+    for (FileStatus fileStatus : listStatus) {
+      Path path = fileStatus.getPath();
+      String name = path.getName();
+      if (fileStatus.isDir() && name.endsWith(".task_complete")) {
+        String taskAttemptName = getTaskAttemptName(name);
+        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptName);
+        if (taskAttemptID.getJobID().equals(jobContext.getJobID())) {
+          fileSystem.rename(path, new Path(shardPath, taskAttemptName + ".commit"));
+          LOG.info("Committing [{0}] in path [{1}]", taskAttemptID, path);
+        }
+      }
+    }
+  }
+
+  private String getTaskAttemptName(String name) {
+    int lastIndexOf = name.lastIndexOf('.');
+    return name.substring(0, lastIndexOf);
+  }
+
+  private boolean isShard(FileStatus fileStatus) {
+    return isShard(fileStatus.getPath());
+  }
+
+  private boolean isShard(Path path) {
+    return path.getName().startsWith(BlurConstants.SHARD_PREFIX);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, int status) throws IOException {
+    System.out.println("abortJob");
+  }
+
+  @Override
+  public void cleanupJob(JobContext context) throws IOException {
+    System.out.println("cleanupJob");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java
new file mode 100644
index 0000000..2c4ec1f
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java
@@ -0,0 +1,62 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurMutate;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+@Deprecated
+public abstract class BlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, Text, BlurMutate> {
+
+  protected BlurMutate _mutate;
+  protected Text _key;
+  protected BlurTask _blurTask;
+  protected Counter _recordCounter;
+  protected Counter _fieldCounter;
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    setup(context);
+    long maxRecordCount = _blurTask == null ? Long.MAX_VALUE : _blurTask.getMaxRecordCount();
+    if (maxRecordCount == -1) {
+      maxRecordCount = Long.MAX_VALUE;
+    }
+    for (long l = 0; l < maxRecordCount && context.nextKeyValue(); l++) {
+      map(context.getCurrentKey(), context.getCurrentValue(), context);
+    }
+    cleanup(context);
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    _blurTask = BlurTask.read(context.getConfiguration());
+    _mutate = new BlurMutate();
+    _mutate.setRecord(new BlurRecord());
+    _key = new Text();
+    _recordCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRecordCounterName());
+    _fieldCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getFieldCounterName());
+  }
+
+  @Override
+  protected abstract void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
new file mode 100644
index 0000000..f694c99
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
@@ -0,0 +1,517 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
+import static org.apache.blur.utils.BlurConstants.RECORD_ID;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.manager.writer.TransactionRecorder;
+import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
+import org.apache.blur.mapreduce.lib.BlurColumn;
+import org.apache.blur.mapreduce.lib.BlurMutate;
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.mapreduce.lib.DefaultBlurReducer;
+import org.apache.blur.mapreduce.lib.ProgressableDirectory;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.Converter;
+import org.apache.blur.utils.IterableConverter;
+import org.apache.blur.utils.ResetableDocumentStoredFieldVisitor;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.NoLockFactory;
+import org.apache.lucene.util.IOUtils;
+
+/**
+ * This class is depreciated please use {@link BlurOutputFormat} in combination
+ * with {@link DefaultBlurReducer}.
+ */
+@Deprecated
+public class BlurReducer extends Reducer<Text, BlurMutate, Text, BlurMutate> {
+
+  static class LuceneFileComparator implements Comparator<String> {
+
+    private Directory _directory;
+
+    public LuceneFileComparator(Directory directory) {
+      _directory = directory;
+    }
+
+    @Override
+    public int compare(String o1, String o2) {
+      try {
+        long fileLength1 = _directory.fileLength(o1);
+        long fileLength2 = _directory.fileLength(o2);
+        if (fileLength1 == fileLength2) {
+          return o1.compareTo(o2);
+        }
+        return (int) (fileLength2 - fileLength1);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  };
+
+  protected static final Log LOG = LogFactory.getLog(BlurReducer.class);
+  protected static final long REPORT_PERIOD = TimeUnit.SECONDS.toMillis(10);
+  protected static final double MB = 1024 * 1024;
+  protected IndexWriter _writer;
+  protected Directory _directory;
+  protected BlurAnalyzer _analyzer;
+  protected BlurTask _blurTask;
+
+  protected Counter _recordCounter;
+  protected Counter _rowCounter;
+  protected Counter _fieldCounter;
+  protected Counter _rowBreak;
+  protected Counter _rowFailures;
+  protected StringBuilder _builder = new StringBuilder();
+  protected byte[] _copyBuf;
+  protected Configuration _configuration;
+  protected long _start;
+  protected long _previousRow;
+  protected long _previousRecord;
+  protected long _prev;
+  protected IndexReader _reader;
+  protected Map<String, Document> _newDocs = new HashMap<String, Document>();
+  protected Set<String> _recordIdsToDelete = new HashSet<String>();
+  protected Term _rowIdTerm = new Term(BlurConstants.ROW_ID);
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    _blurTask = BlurTask.read(context.getConfiguration());
+    _configuration = context.getConfiguration();
+    setupCounters(context);
+    setupAnalyzer(context);
+    setupDirectory(context);
+    setupWriter(context);
+    if (_blurTask.getIndexingType() == INDEXING_TYPE.UPDATE) {
+      _reader = DirectoryReader.open(_directory);
+    }
+  }
+
+  protected void setupCounters(Context context) {
+    _rowCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRowCounterName());
+    _recordCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRecordCounterName());
+    _fieldCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getFieldCounterName());
+    _rowBreak = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRowBreakCounterName());
+    _rowFailures = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRowFailureCounterName());
+    _start = System.currentTimeMillis();
+    _prev = System.currentTimeMillis();
+  }
+
+  @Override
+  protected void reduce(Text key, Iterable<BlurMutate> values, Context context) throws IOException,
+      InterruptedException {
+    if (!index(key, values, context)) {
+      _rowFailures.increment(1);
+    }
+  }
+
+  protected boolean index(Text key, Iterable<BlurMutate> values, Context context) throws IOException {
+    int recordCount = 0;
+    _newDocs.clear();
+    _recordIdsToDelete.clear();
+    boolean rowIdSet = false;
+
+    for (BlurMutate mutate : values) {
+      BlurRecord record = mutate.getRecord();
+      if (!rowIdSet) {
+        String rowId = record.getRowId();
+        _rowIdTerm = new Term(BlurConstants.ROW_ID, rowId);
+        rowIdSet = true;
+      }
+      if (mutate.getMutateType() == MUTATE_TYPE.DELETE) {
+        _recordIdsToDelete.add(record.getRecordId());
+        continue;
+      }
+      Document document = toDocument(record, _builder);
+      _newDocs.put(record.getRecordId(), document);
+
+      context.progress();
+      recordCount++;
+      if (recordCount >= _blurTask.getMaxRecordsPerRow()) {
+        return false;
+      }
+      if (_blurTask.getIndexingType() == INDEXING_TYPE.UPDATE) {
+        fetchOldRecords();
+      }
+    }
+
+    List<Document> docs = documentsToIndex(new ArrayList<Document>(_newDocs.values()));
+    if (docs.size() > 0) {
+      docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+    }
+
+    switch (_blurTask.getIndexingType()) {
+    case REBUILD:
+      _writer.addDocuments(docs);
+      break;
+    case UPDATE:
+      _writer.updateDocuments(_rowIdTerm, docs);
+    default:
+      break;
+    }
+
+    _recordCounter.increment(recordCount);
+    _rowCounter.increment(1);
+    if (_prev + REPORT_PERIOD < System.currentTimeMillis()) {
+      long records = _recordCounter.getValue();
+      long rows = _rowCounter.getValue();
+
+      long now = System.currentTimeMillis();
+
+      double overAllSeconds = (now - _start) / 1000.0;
+      double overAllRecordRate = records / overAllSeconds;
+      double overAllRowsRate = rows / overAllSeconds;
+
+      double seconds = (now - _prev) / 1000.0;
+      double recordRate = (records - _previousRecord) / seconds;
+      double rowsRate = (rows - _previousRow) / seconds;
+
+      String status = String.format(
+          "Totals [%d Row, %d Records], Avg Rates [%.1f Row/s, %.1f Records/s] Rates [%.1f Row/s, %.1f Records/s]",
+          rows, records, overAllRowsRate, overAllRecordRate, rowsRate, recordRate);
+
+      LOG.info(status);
+      context.setStatus(status);
+
+      _previousRecord = records;
+      _previousRow = rows;
+      _prev = now;
+    }
+    return true;
+  }
+
+  protected List<Document> documentsToIndex(List<Document> list) {
+    return list;
+  }
+
+  protected void fetchOldRecords() throws IOException {
+    List<Document> docs = BlurUtil.fetchDocuments(_reader, _rowIdTerm, new ResetableDocumentStoredFieldVisitor(),
+        new Selector());
+    for (Document document : docs) {
+      String recordId = document.get(RECORD_ID);
+      // add them to the new records if the new records do not contain them.
+      if (!_newDocs.containsKey(recordId)) {
+        _newDocs.put(recordId, document);
+      }
+    }
+
+    // delete all records that should be removed.
+    for (String recordId : _recordIdsToDelete) {
+      _newDocs.remove(recordId);
+    }
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException, InterruptedException {
+    switch (_blurTask.getIndexingType()) {
+    case UPDATE:
+      cleanupFromUpdate(context);
+      return;
+    case REBUILD:
+      cleanupFromRebuild(context);
+      return;
+    default:
+      break;
+    }
+  }
+
+  protected void cleanupFromUpdate(Context context) throws IOException {
+    _writer.commit();
+    _writer.close();
+  }
+
+  protected void cleanupFromRebuild(Context context) throws IOException, InterruptedException {
+    _writer.commit();
+    _writer.close();
+
+    IndexReader reader = DirectoryReader.open(_directory);
+
+    TableDescriptor descriptor = _blurTask.getTableDescriptor();
+
+    Path directoryPath = _blurTask.getDirectoryPath(context);
+    remove(directoryPath);
+
+    NoLockFactory lockFactory = NoLockFactory.getNoLockFactory();
+
+    Directory destDirectory = getDestDirectory(context.getConfiguration(), descriptor, directoryPath);
+    destDirectory.setLockFactory(lockFactory);
+
+    boolean optimize = _blurTask.getOptimize();
+
+    if (optimize) {
+      context.setStatus("Starting Copy-Optimize Phase");
+      IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
+      TieredMergePolicy policy = (TieredMergePolicy) conf.getMergePolicy();
+      policy.setUseCompoundFile(false);
+      long s = System.currentTimeMillis();
+      IndexWriter writer = new IndexWriter(getBiggerBuffers(destDirectory), conf);
+      writer.addIndexes(reader);
+      writer.close();
+      long e = System.currentTimeMillis();
+      context.setStatus("Copying phase took [" + (e - s) + " ms]");
+      LOG.info("Copying phase took [" + (e - s) + " ms]");
+    } else {
+      context.setStatus("Starting Copy-Optimize Phase");
+      long s = System.currentTimeMillis();
+      List<String> files = getFilesOrderedBySize(_directory);
+      long totalBytesToCopy = getTotalBytes(_directory);
+      long totalBytesCopied = 0;
+      long startTime = System.currentTimeMillis();
+      for (String file : files) {
+        totalBytesCopied += copy(_directory, destDirectory, file, file, context, totalBytesCopied, totalBytesToCopy,
+            startTime);
+      }
+      long e = System.currentTimeMillis();
+      context.setStatus("Copying phase took [" + (e - s) + " ms]");
+      LOG.info("Copying phase took [" + (e - s) + " ms]");
+    }
+  }
+
+  protected Directory getBiggerBuffers(Directory destDirectory) {
+    return new BufferedDirectory(destDirectory, 32768);
+  }
+
+  protected Directory getDestDirectory(Configuration configuration, TableDescriptor descriptor, Path directoryPath)
+      throws IOException {
+    return new HdfsDirectory(configuration, directoryPath);
+  }
+
+  protected CompressionCodec getInstance(String compressionClass) throws IOException {
+    try {
+      CompressionCodec codec = (CompressionCodec) Class.forName(compressionClass).newInstance();
+      if (codec instanceof Configurable) {
+        Configurable configurable = (Configurable) codec;
+        configurable.setConf(_configuration);
+      }
+      return codec;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  protected void remove(Path directoryPath) throws IOException {
+    FileSystem fileSystem = FileSystem.get(directoryPath.toUri(), _configuration);
+    fileSystem.delete(directoryPath, true);
+  }
+
+  protected long getTotalBytes(Directory directory) throws IOException {
+    long total = 0;
+    for (String file : directory.listAll()) {
+      total += directory.fileLength(file);
+    }
+    return total;
+  }
+
+  protected long copy(Directory from, Directory to, String src, String dest, Context context, long totalBytesCopied,
+      long totalBytesToCopy, long startTime) throws IOException {
+    IndexOutput os = to.createOutput(dest, new IOContext());
+    IndexInput is = from.openInput(src, new IOContext());
+    IOException priorException = null;
+    try {
+      return copyBytes(is, os, is.length(), context, totalBytesCopied, totalBytesToCopy, startTime, src);
+    } catch (IOException ioe) {
+      priorException = ioe;
+    } finally {
+      IOUtils.closeWhileHandlingException(priorException, os, is);
+    }
+    return 0;// this should never be called
+  }
+
+  protected long copyBytes(IndexInput in, IndexOutput out, long numBytes, Context context, long totalBytesCopied,
+      long totalBytesToCopy, long startTime, String src) throws IOException {
+    if (_copyBuf == null) {
+      _copyBuf = new byte[BufferedIndexInput.BUFFER_SIZE];
+    }
+    long start = System.currentTimeMillis();
+    long copied = 0;
+    while (numBytes > 0) {
+      if (start + REPORT_PERIOD < System.currentTimeMillis()) {
+        report(context, totalBytesCopied + copied, totalBytesToCopy, startTime, src);
+        start = System.currentTimeMillis();
+      }
+      final int toCopy = (int) (numBytes > _copyBuf.length ? _copyBuf.length : numBytes);
+      in.readBytes(_copyBuf, 0, toCopy);
+      out.writeBytes(_copyBuf, 0, toCopy);
+      numBytes -= toCopy;
+      copied += toCopy;
+      context.progress();
+    }
+    return copied;
+  }
+
+  protected List<String> getFilesOrderedBySize(final Directory directory) throws IOException {
+    List<String> files = new ArrayList<String>(Arrays.asList(directory.listAll()));
+    Collections.sort(files, new LuceneFileComparator(_directory));
+    return files;
+  }
+
+  protected void setupDirectory(Context context) throws IOException {
+    TableDescriptor descriptor = _blurTask.getTableDescriptor();
+    switch (_blurTask.getIndexingType()) {
+    case UPDATE:
+      Path directoryPath = _blurTask.getDirectoryPath(context);
+      _directory = getDestDirectory(context.getConfiguration(), descriptor, directoryPath);
+
+      NoLockFactory lockFactory = NoLockFactory.getNoLockFactory();
+      _directory.setLockFactory(lockFactory);
+      return;
+    case REBUILD:
+      File dir = new File(System.getProperty("java.io.tmpdir"));
+      File path = new File(dir, "index");
+      rm(path);
+      LOG.info("Using local path [" + path + "] for indexing.");
+
+      Directory localDirectory = FSDirectory.open(path);
+      _directory = new ProgressableDirectory(localDirectory, context);
+      return;
+    default:
+      break;
+    }
+  }
+
+  protected String getNodeName(Context context) {
+    return context.getTaskAttemptID().toString();
+  }
+
+  protected void rm(File path) {
+    if (!path.exists()) {
+      return;
+    }
+    if (path.isDirectory()) {
+      for (File f : path.listFiles()) {
+        rm(f);
+      }
+    }
+    path.delete();
+  }
+
+  protected <T> T nullCheck(T o) {
+    if (o == null) {
+      throw new NullPointerException();
+    }
+    return o;
+  }
+
+  protected void setupWriter(Context context) throws IOException {
+    nullCheck(_directory);
+    nullCheck(_analyzer);
+    IndexWriterConfig config = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
+    config.setSimilarity(new FairSimilarity());
+    config.setRAMBufferSizeMB(_blurTask.getRamBufferSizeMB());
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) config.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+    _writer = new IndexWriter(_directory, config);
+  }
+
+  protected void setupAnalyzer(Context context) {
+    _analyzer = new BlurAnalyzer(_blurTask.getTableDescriptor().getAnalyzerDefinition());
+  }
+
+  protected Document toDocument(BlurRecord record, StringBuilder builder) {
+    Document document = new Document();
+    document.add(new Field(BlurConstants.ROW_ID, record.getRowId(), TransactionRecorder.ID_TYPE));
+    document.add(new Field(BlurConstants.RECORD_ID, record.getRecordId(), TransactionRecorder.ID_TYPE));
+
+    String columnFamily = record.getFamily();
+    TransactionRecorder.addColumns(document, _analyzer, builder, columnFamily, new IterableConverter<BlurColumn, Column>(
+        record.getColumns(), new Converter<BlurColumn, Column>() {
+          @Override
+          public Column convert(BlurColumn from) throws Exception {
+            _fieldCounter.increment(1);
+            return new Column(from.getName(), from.getValue());
+          }
+        }));
+    return document;
+  }
+
+  protected static void report(Context context, long totalBytesCopied, long totalBytesToCopy, long startTime, String src) {
+    long now = System.currentTimeMillis();
+    double seconds = (now - startTime) / 1000.0;
+    double rate = totalBytesCopied / seconds;
+    String time = estimateTimeToComplete(rate, totalBytesCopied, totalBytesToCopy);
+
+    String status = String
+        .format(
+            "%.1f Complete - Time Remaining [%s s], Copy rate [%.1f MB/s], Total Copied [%.1f MB], Total To Copy [%.1f MB]",
+            getPerComplete(totalBytesCopied, totalBytesToCopy), time, getMb(rate), getMb(totalBytesCopied),
+            getMb(totalBytesToCopy));
+    LOG.info(status);
+    context.setStatus(status);
+  }
+
+  protected static double getPerComplete(long totalBytesCopied, long totalBytesToCopy) {
+    return ((double) totalBytesCopied / (double) totalBytesToCopy) * 100.0;
+  }
+
+  protected static double getMb(double b) {
+    return b / MB;
+  }
+
+  protected static String estimateTimeToComplete(double rate, long totalBytesCopied, long totalBytesToCopy) {
+    long whatsLeft = totalBytesToCopy - totalBytesCopied;
+    long secondsLeft = (long) (whatsLeft / rate);
+    return BlurUtil.humanizeTime(secondsLeft, TimeUnit.SECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java
new file mode 100644
index 0000000..8542a43
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java
@@ -0,0 +1,289 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
+import org.apache.blur.mapreduce.lib.BlurMutate;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TBinaryProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+@Deprecated
+public class BlurTask implements Writable {
+
+  public enum INDEXING_TYPE {
+    REBUILD, UPDATE
+  }
+
+  private static final String BLUR_BLURTASK = "blur.blurtask";
+  private static final Log LOG = LogFactory.getLog(BlurTask.class);
+
+  public static String getCounterGroupName() {
+    return "Blur";
+  }
+
+  public static String getRowCounterName() {
+    return "Rows";
+  }
+
+  public static String getFieldCounterName() {
+    return "Fields";
+  }
+
+  public static String getRecordCounterName() {
+    return "Records";
+  }
+
+  public static String getRowBreakCounterName() {
+    return "Row Retries";
+  }
+
+  public static String getRowFailureCounterName() {
+    return "Row Failures";
+  }
+
+  private int _ramBufferSizeMB = 256;
+  private long _maxRecordCount = Long.MAX_VALUE;
+  private TableDescriptor _tableDescriptor;
+  private int _maxRecordsPerRow = 16384;
+  private boolean _optimize = true;
+  private INDEXING_TYPE _indexingType = INDEXING_TYPE.REBUILD;
+  private transient ZooKeeper _zooKeeper;
+
+  public String getShardName(TaskAttemptContext context) {
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    int id = taskAttemptID.getTaskID().getId();
+    return BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, id);
+  }
+
+  public Path getDirectoryPath(TaskAttemptContext context) {
+    String shardName = getShardName(context);
+    return new Path(new Path(_tableDescriptor.tableUri), shardName);
+  }
+
+  public int getNumReducers(Configuration configuration) {
+    Path tablePath = new Path(_tableDescriptor.tableUri);
+    try {
+      int num = _tableDescriptor.shardCount;
+      FileSystem fileSystem = FileSystem.get(tablePath.toUri(), configuration);
+      if (!fileSystem.exists(tablePath)) {
+        return num;
+      }
+      FileStatus[] files = fileSystem.listStatus(tablePath);
+      int shardCount = 0;
+      for (FileStatus fileStatus : files) {
+        if (fileStatus.isDir()) {
+          String name = fileStatus.getPath().getName();
+          if (name.startsWith(BlurConstants.SHARD_PREFIX)) {
+            shardCount++;
+          }
+        }
+      }
+
+      if (shardCount == 0) {
+        return num;
+      }
+      if (shardCount != num) {
+        LOG.warn("Asked for " + num + " reducers, but existing table " + _tableDescriptor.name + " has " + shardCount
+            + " shards. Using " + shardCount + " reducers");
+      }
+      return shardCount;
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to connect to filesystem", e);
+    }
+  }
+
+  public int getRamBufferSizeMB() {
+    return _ramBufferSizeMB;
+  }
+
+  public void setRamBufferSizeMB(int ramBufferSizeMB) {
+    _ramBufferSizeMB = ramBufferSizeMB;
+  }
+
+  public long getMaxRecordCount() {
+    return _maxRecordCount;
+  }
+
+  public void setMaxRecordCount(long maxRecordCount) {
+    _maxRecordCount = maxRecordCount;
+  }
+
+  public void setTableDescriptor(TableDescriptor tableDescriptor) {
+    _tableDescriptor = tableDescriptor;
+  }
+
+  public TableDescriptor getTableDescriptor() {
+    return _tableDescriptor;
+  }
+
+  public Job configureJob(Configuration configuration) throws IOException {
+    if (getIndexingType() == INDEXING_TYPE.UPDATE) {
+      checkTable();
+    }
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    DataOutputStream output = new DataOutputStream(os);
+    write(output);
+    output.close();
+    String blurTask = new String(Base64.encodeBase64(os.toByteArray()));
+    configuration.set(BLUR_BLURTASK, blurTask);
+
+    Job job = new Job(configuration, "Blur Indexer");
+    job.setReducerClass(BlurReducer.class);
+    job.setOutputKeyClass(BytesWritable.class);
+    job.setOutputValueClass(BlurMutate.class);
+    job.setNumReduceTasks(getNumReducers(configuration));
+    return job;
+  }
+
+  private void checkTable() {
+    ZookeeperClusterStatus status = new ZookeeperClusterStatus(_zooKeeper);
+    // check if table exists
+    String cluster = _tableDescriptor.cluster;
+    String table = _tableDescriptor.name;
+    if (!status.exists(false, cluster, table)) {
+      throw new RuntimeException("Table [" + table + "] in cluster [" + cluster + "] does not exist.");
+    }
+    // check if table is locked
+    try {
+      List<String> children = _zooKeeper.getChildren(ZookeeperPathConstants.getLockPath(cluster, table), false);
+      if (!children.isEmpty()) {
+        throw new RuntimeException("Table [" + table + "] in cluster [" + cluster
+            + "] has write locks enabled, cannot perform update.");
+      }
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  public static BlurTask read(Configuration configuration) throws IOException {
+    String base64String = configuration.get(BLUR_BLURTASK);
+    if (base64String == null) {
+      return null;
+    }
+    byte[] blurTaskBs = Base64.decodeBase64(base64String);
+    BlurTask blurTask = new BlurTask();
+    blurTask.readFields(new DataInputStream(new ByteArrayInputStream(blurTaskBs)));
+    return blurTask;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    _maxRecordCount = input.readLong();
+    _ramBufferSizeMB = input.readInt();
+    _optimize = input.readBoolean();
+    _indexingType = INDEXING_TYPE.valueOf(readString(input));
+    byte[] data = new byte[input.readInt()];
+    input.readFully(data);
+    ByteArrayInputStream is = new ByteArrayInputStream(data);
+    TIOStreamTransport trans = new TIOStreamTransport(is);
+    TBinaryProtocol protocol = new TBinaryProtocol(trans);
+    _tableDescriptor = new TableDescriptor();
+    try {
+      _tableDescriptor.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private String readString(DataInput input) throws IOException {
+    int length = input.readInt();
+    byte[] buf = new byte[length];
+    input.readFully(buf);
+    return new String(buf);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeLong(_maxRecordCount);
+    output.writeInt(_ramBufferSizeMB);
+    output.writeBoolean(_optimize);
+    writeString(output, _indexingType.name());
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    TIOStreamTransport trans = new TIOStreamTransport(os);
+    TBinaryProtocol protocol = new TBinaryProtocol(trans);
+    try {
+      _tableDescriptor.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    os.close();
+    byte[] bs = os.toByteArray();
+    output.writeInt(bs.length);
+    output.write(bs);
+  }
+
+  private void writeString(DataOutput output, String s) throws IOException {
+    byte[] bs = s.getBytes();
+    output.writeInt(bs.length);
+    output.write(bs);
+  }
+
+  public int getMaxRecordsPerRow() {
+    return _maxRecordsPerRow;
+  }
+
+  public void setMaxRecordsPerRow(int maxRecordsPerRow) {
+    _maxRecordsPerRow = maxRecordsPerRow;
+  }
+
+  public boolean getOptimize() {
+    return _optimize;
+  }
+
+  public void setOptimize(boolean optimize) {
+    _optimize = optimize;
+  }
+
+  public INDEXING_TYPE getIndexingType() {
+    return _indexingType;
+  }
+
+  public void setIndexingType(INDEXING_TYPE indexingType) {
+    _indexingType = indexingType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/BufferedDirectory.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/BufferedDirectory.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BufferedDirectory.java
new file mode 100644
index 0000000..855f1d5
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BufferedDirectory.java
@@ -0,0 +1,145 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class BufferedDirectory extends Directory {
+
+  private Directory _directory;
+  private int _buffer;
+
+  public BufferedDirectory(Directory directory, int buffer) {
+    _directory = directory;
+    _buffer = buffer;
+  }
+
+  @Override
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return _directory.createOutput(name, context);
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return new BigBufferIndexInput(name, _directory.openInput(name, context), _buffer);
+  }
+
+  public static class BigBufferIndexInput extends BufferedIndexInput {
+
+    private IndexInput _input;
+    private long _length;
+
+    public BigBufferIndexInput(String name, IndexInput input, int buffer) {
+      super(name, buffer);
+      _input = input;
+      _length = input.length();
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      _input.seek(getFilePointer());
+      _input.readBytes(b, offset, length);
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      _input.close();
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    public BigBufferIndexInput clone() {
+      BigBufferIndexInput clone = (BigBufferIndexInput) super.clone();
+      clone._input = (IndexInput) _input.clone();
+      return clone;
+    }
+  }
+
+  @Override
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  @Override
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  @Override
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
new file mode 100644
index 0000000..a796810
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
@@ -0,0 +1,49 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Base mapper class for Blur map reduce classes.
+ * 
+ * @param <KEY>
+ * @param <VALUE>
+ */
+public abstract class BaseBlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, Text, BlurMutate> {
+  protected BlurMutate _mutate;
+  protected Text _key;
+  protected Counter _recordCounter;
+  protected Counter _fieldCounter;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    _mutate = new BlurMutate();
+    _mutate.setRecord(new BlurRecord());
+    _key = new Text();
+    _recordCounter = context.getCounter(BlurCounters.RECORD_COUNT);
+    _fieldCounter = context.getCounter(BlurCounters.FIELD_COUNT);
+  }
+
+  @Override
+  protected abstract void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
new file mode 100644
index 0000000..d32a3bd
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
@@ -0,0 +1,109 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class BlurColumn implements Writable {
+
+  private String name;
+  private String value;
+
+  public BlurColumn() {
+  }
+
+  public BlurColumn(String name, String value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  public boolean hasNull() {
+    if (name == null || value == null) {
+      return true;
+    }
+    return false;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    name = IOUtil.readString(in);
+    value = IOUtil.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeString(out, name);
+    IOUtil.writeString(out, value);
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public void setValue(String value) {
+    this.value = value;
+  }
+
+  @Override
+  public String toString() {
+    return "{name=" + name + ", value=" + value + "}";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurColumn other = (BlurColumn) obj;
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
+      return false;
+    if (value == null) {
+      if (other.value != null)
+        return false;
+    } else if (!value.equals(other.value))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
new file mode 100644
index 0000000..ba8da67
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
@@ -0,0 +1,26 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * The enum class used for all the internal counters during map reduce jobs.
+ */
+public enum BlurCounters {
+  RECORD_COUNT, FIELD_COUNT, ROW_COUNT, RECORD_RATE, COPY_RATE, ROW_RATE, RECORD_DUPLICATE_COUNT, ROW_OVERFLOW_COUNT
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
new file mode 100644
index 0000000..3fdb97c
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
@@ -0,0 +1,157 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class BlurMutate implements Writable {
+
+  public enum MUTATE_TYPE {
+    /*ADD(0), UPDATE(1),*/ DELETE(2), REPLACE(3);
+    private int _value;
+
+    private MUTATE_TYPE(int value) {
+      _value = value;
+    }
+
+    public int getValue() {
+      return _value;
+    }
+
+    public MUTATE_TYPE find(int value) {
+      switch (value) {
+   // @TODO Updates through MR is going to be disabled
+//      case 0:
+//        return ADD;
+//      case 1:
+//        return UPDATE;
+      case 2:
+        return DELETE;
+      case 3:
+        return REPLACE;
+      default:
+        throw new RuntimeException("Value [" + value + "] not found.");
+      }
+    }
+  }
+
+  private MUTATE_TYPE _mutateType = MUTATE_TYPE.REPLACE;
+  private BlurRecord _record = new BlurRecord();
+
+  public BlurMutate() {
+
+  }
+
+  public BlurMutate(MUTATE_TYPE type, BlurRecord record) {
+    _mutateType = type;
+    _record = record;
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+    _record.setRecordId(recordId);
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId, String family) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+    _record.setRecordId(recordId);
+    _record.setFamily(family);
+  }
+
+  public BlurMutate addColumn(BlurColumn column) {
+    _record.addColumn(column);
+    return this;
+  }
+
+  public BlurMutate addColumn(String name, String value) {
+    return addColumn(new BlurColumn(name, value));
+  }
+
+  public BlurRecord getRecord() {
+    return _record;
+  }
+
+  public void setRecord(BlurRecord record) {
+    _record = record;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeVInt(out, _mutateType.getValue());
+    _record.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _mutateType.find(IOUtil.readVInt(in));
+    _record.readFields(in);
+  }
+
+  public MUTATE_TYPE getMutateType() {
+    return _mutateType;
+  }
+
+  public BlurMutate setMutateType(MUTATE_TYPE mutateType) {
+    _mutateType = mutateType;
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "BlurMutate [mutateType=" + _mutateType + ", record=" + _record + "]";
+  }
+
+  public BlurMutate setFamily(String family) {
+    _record.setFamily(family);
+    return this;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_mutateType == null) ? 0 : _mutateType.hashCode());
+    result = prime * result + ((_record == null) ? 0 : _record.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurMutate other = (BlurMutate) obj;
+    if (_mutateType != other._mutateType)
+      return false;
+    if (_record == null) {
+      if (other._record != null)
+        return false;
+    } else if (!_record.equals(other._record))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
new file mode 100644
index 0000000..6b485a9
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
@@ -0,0 +1,96 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapred.AbstractOutputCommitter;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+
+public class BlurOutputCommitter extends AbstractOutputCommitter {
+
+  private static final Log LOG = LogFactory.getLog(BlurOutputCommitter.class);
+
+  private Path _newIndex;
+  private Configuration _configuration;
+  private TaskAttemptID _taskAttemptID;
+  private Path _indexPath;
+  private final boolean _runTaskCommit;
+  private TableDescriptor _tableDescriptor;
+
+  public BlurOutputCommitter() {
+    _runTaskCommit = true;
+  }
+
+  public BlurOutputCommitter(boolean isMap, int numberOfReducers) {
+    _runTaskCommit = isMap && numberOfReducers != 0 ? false : true;
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+    return _runTaskCommit;
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    setup(context);
+    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
+    if (fileSystem.exists(_newIndex) && !fileSystem.isFile(_newIndex)) {
+      Path dst = new Path(_indexPath, _taskAttemptID.toString() + ".task_complete");
+      LOG.info("Committing [{0}] to [{1}]", _newIndex, dst);
+      fileSystem.rename(_newIndex, dst);
+    } else {
+      throw new IOException("Path [" + _newIndex + "] does not exist, can not commit.");
+    }
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    setup(context);
+    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
+    LOG.info("abortTask - Deleting [{0}]", _newIndex);
+    fileSystem.delete(_newIndex, true);
+  }
+
+  private void setup(TaskAttemptContext context) throws IOException {
+    _configuration = context.getConfiguration();
+    _tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+    int shardCount = _tableDescriptor.getShardCount();
+    int attemptId = context.getTaskAttemptID().getTaskID().getId();
+    int shardId = attemptId % shardCount;
+    _taskAttemptID = context.getTaskAttemptID();
+    Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
+    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+    _indexPath = new Path(tableOutput, shardName);
+    _newIndex = new Path(_indexPath, _taskAttemptID.toString() + ".tmp");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
new file mode 100644
index 0000000..ae91eb8
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -0,0 +1,576 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.LuceneVersionConstant;
+import org.apache.blur.manager.writer.TransactionRecorder;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.NoLockFactory;
+
+/**
+ * {@link BlurOutputFormat} is used to index data and delivery the indexes to
+ * the proper Blur table for searching. A typical usage of this class would be
+ * as follows.<br/>
+ * <br/>
+ * 
+ * <br/>
+ * {@link Iface} client = {@link BlurClient}.getClient("controller1:40010");<br/>
+ * <br/>
+ * TableDescriptor tableDescriptor = client.describe(tableName);<br/>
+ * <br/>
+ * Job job = new Job(jobConf, "blur index");<br/>
+ * job.setJarByClass(BlurOutputFormatTest.class);<br/>
+ * job.setMapperClass(CsvBlurMapper.class);<br/>
+ * job.setInputFormatClass(TextInputFormat.class);<br/>
+ * <br/>
+ * FileInputFormat.addInputPath(job, new Path(input));<br/>
+ * CsvBlurMapper.addColumns(job, "cf1", "col");<br/>
+ * <br/>
+ * BlurOutputFormat.setupJob(job, tableDescriptor);<br/>
+ * BlurOutputFormat.setIndexLocally(job, true);<br/>
+ * BlurOutputFormat.setOptimizeInFlight(job, false);<br/>
+ * <br/>
+ * job.waitForCompletion(true);<br/>
+ * 
+ */
+public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> {
+
+  public static final String BLUR_OUTPUT_REDUCER_MULTIPLIER = "blur.output.reducer.multiplier";
+  public static final String BLUR_OUTPUT_OPTIMIZEINFLIGHT = "blur.output.optimizeinflight";
+  public static final String BLUR_OUTPUT_INDEXLOCALLY = "blur.output.indexlocally";
+  public static final String BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE = "blur.output.max.document.buffer.size";
+  public static final String BLUR_TABLE_DESCRIPTOR = "blur.table.descriptor";
+  public static final String BLUR_OUTPUT_PATH = "blur.output.path";
+
+  private static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
+  private static final String MAPRED_OUTPUT_COMMITTER_CLASS = "mapred.output.committer.class";
+  private static ThreadLocal<Progressable> _progressable = new ThreadLocal<Progressable>();
+  private static ThreadLocal<GetCounter> _getCounter = new ThreadLocal<GetCounter>();
+
+  static void setProgressable(Progressable progressable) {
+    _progressable.set(progressable);
+  }
+
+  static Progressable getProgressable() {
+    return _progressable.get();
+  }
+
+  static void setGetCounter(GetCounter getCounter) {
+    _getCounter.set(getCounter);
+  }
+
+  static GetCounter getGetCounter() {
+    return _getCounter.get();
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context) throws IOException,
+      InterruptedException {
+    Configuration config = context.getConfiguration();
+    TableDescriptor tableDescriptor = getTableDescriptor(config);
+    if (tableDescriptor == null) {
+      throw new IOException("setTableDescriptor needs to be called first.");
+    }
+    int shardCount = tableDescriptor.getShardCount();
+    FileSystem fileSystem = getOutputPath(config).getFileSystem(config);
+    Path tablePath = new Path(tableDescriptor.getTableUri());
+    if(fileSystem.exists(tablePath)) {
+      BlurUtil.validateShardCount(shardCount, fileSystem, tablePath);
+    }else{
+      throw new IOException("Table path [ "+ tablePath + " ] doesn't exist for table [ " + tableDescriptor.getName() + " ].");
+    }
+
+    int reducers = context.getNumReduceTasks();
+    int reducerMultiplier = getReducerMultiplier(config);
+    int validNumberOfReducers = reducerMultiplier * shardCount;
+    if (reducers > 0 && reducers != validNumberOfReducers) {
+      throw new IllegalArgumentException("Invalid number of reducers [ " + reducers +" ]." + " Number of Reducers should be [ " + validNumberOfReducers + " ].");
+    }
+  }
+
+  @Override
+  public RecordWriter<Text, BlurMutate> getRecordWriter(TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    int id = context.getTaskAttemptID().getTaskID().getId();
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    return new BlurRecordWriter(context.getConfiguration(), new BlurAnalyzer(), id, taskAttemptID.toString() + ".tmp");
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+    return new BlurOutputCommitter(context.getTaskAttemptID().isMap(), context.getNumReduceTasks());
+  }
+
+  public static TableDescriptor getTableDescriptor(Configuration configuration) throws IOException {
+    String tableDesStr = configuration.get(BLUR_TABLE_DESCRIPTOR);
+    if (tableDesStr == null) {
+      return null;
+    }
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(tableDesStr.getBytes());
+    TIOStreamTransport transport = new TIOStreamTransport(inputStream);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    TableDescriptor descriptor = new TableDescriptor();
+    try {
+      descriptor.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    transport.close();
+    return descriptor;
+  }
+
+  /**
+   * This will multiple the number of reducers for this job. For example if the
+   * table has 256 shards the normal number of reducers is 256. However if the
+   * reducer multiplier is set to 4 then the number of reducers will be 1024 and
+   * each shard will get 4 new segments instead of the normal 1.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param multiple
+   *          the multiple to use.
+   * @throws IOException
+   */
+  public static void setReducerMultiplier(Job job, int multiple) throws IOException {
+    TableDescriptor tableDescriptor = getTableDescriptor(job.getConfiguration());
+    if (tableDescriptor == null) {
+      throw new IOException("setTableDescriptor needs to be called first.");
+    }
+    job.setNumReduceTasks(tableDescriptor.getShardCount() * multiple);
+    Configuration configuration = job.getConfiguration();
+    configuration.setInt(BLUR_OUTPUT_REDUCER_MULTIPLIER, multiple);
+  }
+  
+  public static int getReducerMultiplier(Configuration configuration) {
+    return configuration.getInt(BLUR_OUTPUT_REDUCER_MULTIPLIER, 1);
+  }
+
+  /**
+   * Sets the {@link TableDescriptor} for this job.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param tableDescriptor
+   *          the {@link TableDescriptor}.
+   * @throws IOException
+   */
+  public static void setTableDescriptor(Job job, TableDescriptor tableDescriptor) throws IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    TIOStreamTransport transport = new TIOStreamTransport(outputStream);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      tableDescriptor.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    transport.close();
+    Configuration configuration = job.getConfiguration();
+    configuration.set(BLUR_TABLE_DESCRIPTOR, new String(outputStream.toByteArray()));
+    setOutputPath(job, new Path(tableDescriptor.getTableUri()));
+  }
+
+  /**
+   * Sets the maximum number of documents that the buffer will hold in memory
+   * before overflowing to disk. By default this is 1000 which will probably be
+   * very low for most systems.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param maxDocumentBufferSize
+   *          the maxDocumentBufferSize.
+   */
+  public static void setMaxDocumentBufferSize(Job job, int maxDocumentBufferSize) {
+    setMaxDocumentBufferSize(job.getConfiguration(), maxDocumentBufferSize);
+  }
+
+  /**
+   * Sets the maximum number of documents that the buffer will hold in memory
+   * before overflowing to disk. By default this is 1000 which will probably be
+   * very low for most systems.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param maxDocumentBufferSize
+   *          the maxDocumentBufferSize.
+   */
+  public static void setMaxDocumentBufferSize(Configuration configuration, int maxDocumentBufferSize) {
+    configuration.setInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, maxDocumentBufferSize);
+  }
+
+  public static int getMaxDocumentBufferSize(Configuration configuration) {
+    return configuration.getInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, 1000);
+  }
+
+  public static void setOutputPath(Job job, Path path) {
+    Configuration configuration = job.getConfiguration();
+    configuration.set(BLUR_OUTPUT_PATH, path.toString());
+    configuration.set(MAPRED_OUTPUT_COMMITTER_CLASS, BlurOutputCommitter.class.getName());
+  }
+
+  public static Path getOutputPath(Configuration configuration) {
+    return new Path(configuration.get(BLUR_OUTPUT_PATH));
+  }
+
+  /**
+   * Enabled by default, this will enable local indexing on the machine where
+   * the task is running. Then when the {@link RecordWriter} closes the index is
+   * copied to the remote destination in HDFS.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setIndexLocally(Job job, boolean b) {
+    setIndexLocally(job.getConfiguration(), b);
+  }
+
+  /**
+   * Enabled by default, this will enable local indexing on the machine where
+   * the task is running. Then when the {@link RecordWriter} closes the index is
+   * copied to the remote destination in HDFS.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setIndexLocally(Configuration configuration, boolean b) {
+    configuration.setBoolean(BLUR_OUTPUT_INDEXLOCALLY, b);
+  }
+
+  public static boolean isIndexLocally(Configuration configuration) {
+    return configuration.getBoolean(BLUR_OUTPUT_INDEXLOCALLY, true);
+  }
+
+  /**
+   * Enabled by default, this will optimize the index while copying from the
+   * local index to the remote destination in HDFS. Used in conjunction with the
+   * setIndexLocally.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setOptimizeInFlight(Job job, boolean b) {
+    setOptimizeInFlight(job.getConfiguration(), b);
+  }
+
+  /**
+   * Enabled by default, this will optimize the index while copying from the
+   * local index to the remote destination in HDFS. Used in conjunction with the
+   * setIndexLocally.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setOptimizeInFlight(Configuration configuration, boolean b) {
+    configuration.setBoolean(BLUR_OUTPUT_OPTIMIZEINFLIGHT, b);
+  }
+
+  public static boolean isOptimizeInFlight(Configuration configuration) {
+    return configuration.getBoolean(BLUR_OUTPUT_OPTIMIZEINFLIGHT, true);
+  }
+
+  static class BlurRecordWriter extends RecordWriter<Text, BlurMutate> {
+
+    private static final Log LOG = LogFactory.getLog(BlurRecordWriter.class);
+
+    private final Text _prevKey = new Text();
+    private final Map<String, Document> _documents = new TreeMap<String, Document>();
+    private final IndexWriter _writer;
+    private final BlurAnalyzer _analyzer;
+    private final StringBuilder _builder = new StringBuilder();
+    private final Directory _finalDir;
+    private final Directory _localDir;
+    private final File _localPath;
+    private final int _maxDocumentBufferSize;
+    private final IndexWriterConfig _conf;
+    private final Path _newIndex;
+    private final boolean _indexLocally;
+    private final boolean _optimizeInFlight;
+    private Counter _fieldCount;
+    private Counter _recordCount;
+    private Counter _rowCount;
+    private Counter _recordDuplicateCount;
+    private boolean _countersSetup = false;
+    private RateCounter _recordRateCounter;
+    private RateCounter _rowRateCounter;
+    private RateCounter _copyRateCounter;
+    private IndexWriter _localTmpWriter;
+    private boolean _usingLocalTmpindex;
+    private File _localTmpPath;
+    private ProgressableDirectory _localTmpDir;
+    private Counter _rowOverFlowCount;
+
+    public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int attemptId, String tmpDirName)
+        throws IOException {
+
+      _indexLocally = BlurOutputFormat.isIndexLocally(configuration);
+      _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(configuration);
+
+      TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+      int shardCount = tableDescriptor.getShardCount();
+      int shardId = attemptId % shardCount;
+
+      _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(configuration);
+      Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+      String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+      Path indexPath = new Path(tableOutput, shardName);
+      _newIndex = new Path(indexPath, tmpDirName);
+      _finalDir = new ProgressableDirectory(new HdfsDirectory(configuration, _newIndex),
+          BlurOutputFormat.getProgressable());
+      _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
+
+      _analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzerDefinition());
+      _conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, _analyzer);
+      TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
+      mergePolicy.setUseCompoundFile(false);
+
+      if (_indexLocally) {
+        String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+        _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+        _localDir = new ProgressableDirectory(FSDirectory.open(_localPath), BlurOutputFormat.getProgressable());
+        _writer = new IndexWriter(_localDir, _conf.clone());
+      } else {
+        _localPath = null;
+        _localDir = null;
+        _writer = new IndexWriter(_finalDir, _conf.clone());
+      }
+    }
+
+    @Override
+    public void write(Text key, BlurMutate value) throws IOException, InterruptedException {
+      if (!_countersSetup) {
+        setupCounter();
+        _countersSetup = true;
+      }
+      if (!_prevKey.equals(key)) {
+        flush();
+        _prevKey.set(key);
+      }
+      add(value);
+    }
+
+    private void setupCounter() {
+      GetCounter getCounter = BlurOutputFormat.getGetCounter();
+      _fieldCount = getCounter.getCounter(BlurCounters.FIELD_COUNT);
+      _recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
+      _recordDuplicateCount = getCounter.getCounter(BlurCounters.RECORD_DUPLICATE_COUNT);
+      _rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
+      _rowOverFlowCount = getCounter.getCounter(BlurCounters.ROW_OVERFLOW_COUNT);
+      _recordRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.RECORD_RATE));
+      _rowRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.ROW_RATE));
+      _copyRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.COPY_RATE));
+    }
+
+    private void add(BlurMutate value) throws IOException {
+      BlurRecord blurRecord = value.getRecord();
+      Record record = getRecord(blurRecord);
+      String recordId = record.getRecordId();
+      Document document = TransactionRecorder.convert(blurRecord.getRowId(), record, _builder, _analyzer);
+      if (_documents.size() == 0) {
+        document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+      }
+      Document dup = _documents.put(recordId, document);
+      if (dup != null) {
+        _recordDuplicateCount.increment(1);
+      } else {
+        _fieldCount.increment(document.getFields().size());
+        _recordCount.increment(1);
+      }
+      flushToTmpIndexIfNeeded();
+    }
+
+    private void flushToTmpIndexIfNeeded() throws IOException {
+      if (_documents.size() > _maxDocumentBufferSize) {
+        flushToTmpIndex();
+      }
+    }
+
+    private void flushToTmpIndex() throws IOException {
+      if (_documents.isEmpty()) {
+        return;
+      }
+      _usingLocalTmpindex = true;
+      if (_localTmpWriter == null) {
+        String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+        _localTmpPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+        _localTmpDir = new ProgressableDirectory(FSDirectory.open(_localTmpPath), BlurOutputFormat.getProgressable());
+        _localTmpWriter = new IndexWriter(_localTmpDir, _conf.clone());
+      }
+      _localTmpWriter.addDocuments(_documents.values());
+      _documents.clear();
+    }
+
+    private void resetLocalTmp() {
+      _usingLocalTmpindex = false;
+      _localTmpWriter = null;
+      _localTmpDir = null;
+      rm(_localTmpPath);
+      _localTmpPath = null;
+    }
+
+    private Record getRecord(BlurRecord value) {
+      Record record = new Record();
+      record.setRecordId(value.getRecordId());
+      record.setFamily(value.getFamily());
+      for (BlurColumn col : value.getColumns()) {
+        record.addToColumns(new Column(col.getName(), col.getValue()));
+      }
+      return record;
+    }
+
+    private void flush() throws CorruptIndexException, IOException {
+      if (_usingLocalTmpindex) {
+        flushToTmpIndex();
+        _localTmpWriter.close(false);
+        DirectoryReader reader = DirectoryReader.open(_localTmpDir);
+        _recordRateCounter.mark(reader.numDocs());
+        _writer.addIndexes(reader);
+        reader.close();
+        resetLocalTmp();
+        _rowOverFlowCount.increment(1);
+      } else {
+        if (_documents.isEmpty()) {
+          return;
+        }
+        _writer.addDocuments(_documents.values());
+        _recordRateCounter.mark(_documents.size());
+        _documents.clear();
+      }
+      _rowRateCounter.mark();
+      _rowCount.increment(1);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+      flush();
+      _writer.close();
+      _recordRateCounter.close();
+      _rowRateCounter.close();
+      if (_indexLocally) {
+        if (_optimizeInFlight) {
+          copyAndOptimizeInFlightDir();
+        } else {
+          copyDir();
+        }
+      }
+      _copyRateCounter.close();
+    }
+
+    private void copyAndOptimizeInFlightDir() throws IOException {
+      CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+      copyRateDirectory.setLockFactory(NoLockFactory.getNoLockFactory());
+      DirectoryReader reader = DirectoryReader.open(_localDir);
+      IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
+      writer.addIndexes(reader);
+      writer.close();
+      rm(_localPath);
+    }
+
+    private void copyDir() throws IOException {
+      CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+      String[] fileNames = _localDir.listAll();
+      for (String fileName : fileNames) {
+        LOG.info("Copying [{0}] to [{1}]", fileName, _newIndex);
+        _localDir.copy(copyRateDirectory, fileName, fileName, IOContext.DEFAULT);
+      }
+      rm(_localPath);
+    }
+
+    private void rm(File file) {
+      if (!file.exists()) {
+        return;
+      }
+      if (file.isDirectory()) {
+        for (File f : file.listFiles()) {
+          rm(f);
+        }
+      }
+      file.delete();
+    }
+  }
+
+  /**
+   * Sets up the output portion of the map reduce job. This does effect the map
+   * side of the job, of a map and reduce job.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param tableDescriptor
+   *          the table descriptor to write the output of the indexing job.
+   * @throws IOException
+   */
+  public static void setupJob(Job job, TableDescriptor tableDescriptor) throws IOException {
+    job.setReducerClass(DefaultBlurReducer.class);
+    job.setNumReduceTasks(tableDescriptor.getShardCount());
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(BlurMutate.class);
+    job.setOutputFormatClass(BlurOutputFormat.class);
+    setTableDescriptor(job, tableDescriptor);
+  }
+
+}


Mime
View raw message