incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [08/11] Reconfiguring projects again, some of the units test for hadoop2 now run successfully.
Date Thu, 01 May 2014 20:49:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
new file mode 100644
index 0000000..99f2ac5
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
@@ -0,0 +1,89 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * This class is to be used in conjunction with {@link BlurOutputFormat}
+ * .</br></br>
+ * 
+ * Here is a basic example of how to use both the {@link BlurOutputFormat} and
+ * the {@link DefaultBlurReducer} together to build indexes.</br></br>
+ * 
+ * Once this job has successfully completed the indexes will be imported by the
+ * running shard servers and be placed online. This is a polling mechicism in
+ * the shard servers and by default they poll every 10 seconds.
+ * 
+ * 
+ * </br></br>
+ * 
+ * Job job = new Job(conf, "blur index");</br>
+ * job.setJarByClass(BlurOutputFormatTest.class);</br>
+ * job.setMapperClass(CsvBlurMapper.class);</br>
+ * job.setReducerClass(DefaultBlurReducer.class);</br>
+ * job.setNumReduceTasks(1);</br>
+ * job.setInputFormatClass(TrackingTextInputFormat.class);</br>
+ * job.setOutputKeyClass(Text.class);
+ * </br>job.setOutputValueClass(BlurMutate.class);</br>
+ * job.setOutputFormatClass(BlurOutputFormat.class);</br> </br>
+ * FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));</br>
+ * CsvBlurMapper.addColumns(job, "cf1", "col");</br> </br> TableDescriptor
+ * tableDescriptor = new TableDescriptor();</br>
+ * tableDescriptor.setShardCount(1)
+ * ;</br>tableDescriptor.setAnalyzerDefinition(new
+ * AnalyzerDefinition());</br>tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR
+ * + "/out").toString());</br>BlurOutputFormat.setTableDescriptor(job,
+ * tableDescriptor);</br>
+ * 
+ * 
+ */
+public class DefaultBlurReducer extends Reducer<Writable, BlurMutate, Writable, BlurMutate> {
+
+  @Override
+  protected void setup(final Context context) throws IOException, InterruptedException {
+    BlurOutputFormat.setProgressable(context);
+    BlurOutputFormat.setGetCounter(new GetCounter() {
+      @Override
+      public Counter getCounter(Enum<?> counterName) {
+        return context.getCounter(counterName);
+      }
+    });
+  }
+
+  @Override
+  protected void reduce(Writable key, Iterable<BlurMutate> values, Context context) throws IOException,
+      InterruptedException {
+    Text textKey = getTextKey(key);
+    for (BlurMutate value : values) {
+      context.write(textKey, value);
+    }
+  }
+
+  protected Text getTextKey(Writable key) {
+    if (key instanceof Text) {
+      return (Text) key;
+    }
+    throw new IllegalArgumentException("Key is not of type Text, you will need to "
+        + "override DefaultBlurReducer and implement \"getTextKey\" method.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
new file mode 100644
index 0000000..e138cd5
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.blur.analysis.FieldManager;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.LuceneVersionConstant;
+import org.apache.blur.lucene.codec.Blur022Codec;
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.NoLockFactory;
+
+public class GenericBlurRecordWriter {
+
+  private static final Log LOG = LogFactory.getLog(GenericBlurRecordWriter.class);
+  private static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
+
+  private final Text _prevKey = new Text();
+  private final Map<String, List<Field>> _documents = new TreeMap<String, List<Field>>();
+  private final IndexWriter _writer;
+  private final FieldManager _fieldManager;
+  private final Directory _finalDir;
+  private final Directory _localDir;
+  private final File _localPath;
+  private final int _maxDocumentBufferSize;
+  private final IndexWriterConfig _conf;
+  private final IndexWriterConfig _overFlowConf;
+  private final Path _newIndex;
+  private final boolean _indexLocally;
+  private final boolean _optimizeInFlight;
+  private Counter _columnCount;
+  private Counter _fieldCount;
+  private Counter _recordCount;
+  private Counter _rowCount;
+  private Counter _recordDuplicateCount;
+  private Counter _rowOverFlowCount;
+  private Counter _rowDeleteCount;
+  private RateCounter _recordRateCounter;
+  private RateCounter _rowRateCounter;
+  private RateCounter _copyRateCounter;
+  private boolean _countersSetup = false;
+  private IndexWriter _localTmpWriter;
+  private boolean _usingLocalTmpindex;
+  private File _localTmpPath;
+  private ProgressableDirectory _localTmpDir;
+  private String _deletedRowId;
+
+  public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName) throws IOException {
+
+    _indexLocally = BlurOutputFormat.isIndexLocally(configuration);
+    _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(configuration);
+
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    int shardCount = tableDescriptor.getShardCount();
+    int shardId = attemptId % shardCount;
+
+    _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(configuration);
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+    Path indexPath = new Path(tableOutput, shardName);
+    _newIndex = new Path(indexPath, tmpDirName);
+    _finalDir = new ProgressableDirectory(new HdfsDirectory(configuration, _newIndex), getProgressable());
+    _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
+
+    TableContext tableContext = TableContext.create(tableDescriptor);
+    _fieldManager = tableContext.getFieldManager();
+    Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
+
+    _conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, analyzer);
+    _conf.setCodec(new Blur022Codec());
+    _conf.setSimilarity(tableContext.getSimilarity());
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+
+    _overFlowConf = _conf.clone();
+    _overFlowConf.setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES);
+
+    if (_indexLocally) {
+      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+      _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+      _localDir = new ProgressableDirectory(FSDirectory.open(_localPath), getProgressable());
+      _writer = new IndexWriter(_localDir, _conf.clone());
+    } else {
+      _localPath = null;
+      _localDir = null;
+      _writer = new IndexWriter(_finalDir, _conf.clone());
+    }
+  }
+
+  private Progressable getProgressable() {
+    return new Progressable() {
+      @Override
+      public void progress() {
+        Progressable progressable = BlurOutputFormat.getProgressable();
+        if (progressable != null) {
+          progressable.progress();
+        }
+      }
+    };
+  }
+
+  public void write(Text key, BlurMutate value) throws IOException {
+    if (!_countersSetup) {
+      setupCounter();
+      _countersSetup = true;
+    }
+    if (!_prevKey.equals(key)) {
+      flush();
+      _prevKey.set(key);
+    }
+    add(value);
+  }
+
+  private void setupCounter() {
+    GetCounter getCounter = BlurOutputFormat.getGetCounter();
+    _fieldCount = getCounter.getCounter(BlurCounters.LUCENE_FIELD_COUNT);
+    _columnCount = getCounter.getCounter(BlurCounters.COLUMN_COUNT);
+    _recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
+    _recordDuplicateCount = getCounter.getCounter(BlurCounters.RECORD_DUPLICATE_COUNT);
+    _rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
+    _rowDeleteCount = getCounter.getCounter(BlurCounters.ROW_DELETE_COUNT);
+    _rowOverFlowCount = getCounter.getCounter(BlurCounters.ROW_OVERFLOW_COUNT);
+    _recordRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.RECORD_RATE));
+    _rowRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.ROW_RATE));
+    _copyRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.COPY_RATE));
+  }
+
+  private void add(BlurMutate value) throws IOException {
+    BlurRecord blurRecord = value.getRecord();
+    Record record = getRecord(blurRecord);
+    String recordId = record.getRecordId();
+    if (value.getMutateType() == MUTATE_TYPE.DELETE) {
+      _deletedRowId = blurRecord.getRowId();
+      return;
+    }
+    if (_countersSetup) {
+      _columnCount.increment(record.getColumns().size());
+    }
+    List<Field> document = RowDocumentUtil.getDoc(_fieldManager, blurRecord.getRowId(), record);
+    List<Field> dup = _documents.put(recordId, document);
+    if (_countersSetup) {
+      if (dup != null) {
+        _recordDuplicateCount.increment(1);
+      } else {
+        _fieldCount.increment(document.size());
+        _recordCount.increment(1);
+      }
+    }
+    flushToTmpIndexIfNeeded();
+  }
+
+  private void flushToTmpIndexIfNeeded() throws IOException {
+    if (_documents.size() > _maxDocumentBufferSize) {
+      flushToTmpIndex();
+    }
+  }
+
+  private void flushToTmpIndex() throws IOException {
+    if (_documents.isEmpty()) {
+      return;
+    }
+    _usingLocalTmpindex = true;
+    if (_localTmpWriter == null) {
+      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+      _localTmpPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+      _localTmpDir = new ProgressableDirectory(FSDirectory.open(_localTmpPath), BlurOutputFormat.getProgressable());
+      _localTmpWriter = new IndexWriter(_localTmpDir, _overFlowConf.clone());
+      // The local tmp writer has merging disabled so the first document in is
+      // going to be doc 0.
+      // Therefore the first document added is the prime doc
+      List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
+      docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+      _localTmpWriter.addDocuments(docs);
+    } else {
+      _localTmpWriter.addDocuments(_documents.values());
+    }
+    _documents.clear();
+  }
+
+  private void resetLocalTmp() {
+    _usingLocalTmpindex = false;
+    _localTmpWriter = null;
+    _localTmpDir = null;
+    rm(_localTmpPath);
+    _localTmpPath = null;
+  }
+
+  private Record getRecord(BlurRecord value) {
+    Record record = new Record();
+    record.setRecordId(value.getRecordId());
+    record.setFamily(value.getFamily());
+    for (BlurColumn col : value.getColumns()) {
+      record.addToColumns(new Column(col.getName(), col.getValue()));
+    }
+    return record;
+  }
+
+  private void flush() throws CorruptIndexException, IOException {
+    if (_usingLocalTmpindex) {
+      // since we have flushed to disk then we do not need to index the
+      // delete.
+      flushToTmpIndex();
+      _localTmpWriter.close(false);
+      DirectoryReader reader = DirectoryReader.open(_localTmpDir);
+      if (_countersSetup) {
+        _recordRateCounter.mark(reader.numDocs());
+      }
+      _writer.addIndexes(reader);
+      reader.close();
+      resetLocalTmp();
+      if (_countersSetup) {
+        _rowOverFlowCount.increment(1);
+      }
+    } else {
+      if (_documents.isEmpty()) {
+        if (_deletedRowId != null) {
+          _writer.addDocument(getDeleteDoc());
+          if (_countersSetup) {
+            _rowDeleteCount.increment(1);
+          }
+        }
+      } else {
+        List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
+        docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+        _writer.addDocuments(docs);
+        if (_countersSetup) {
+          _recordRateCounter.mark(_documents.size());
+        }
+        _documents.clear();
+      }
+    }
+    _deletedRowId = null;
+    if (_countersSetup) {
+      _rowRateCounter.mark();
+      _rowCount.increment(1);
+    }
+  }
+
+  private Document getDeleteDoc() {
+    Document document = new Document();
+    document.add(new StringField(BlurConstants.ROW_ID, _deletedRowId, Store.NO));
+    document.add(new StringField(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE, Store.NO));
+    return document;
+  }
+
+  public void close() throws IOException {
+    flush();
+    _writer.close();
+    if (_countersSetup) {
+      _recordRateCounter.close();
+      _rowRateCounter.close();
+    }
+    if (_indexLocally) {
+      if (_optimizeInFlight) {
+        copyAndOptimizeInFlightDir();
+      } else {
+        copyDir();
+      }
+    }
+    if (_countersSetup) {
+      _copyRateCounter.close();
+    }
+  }
+
+  private void copyAndOptimizeInFlightDir() throws IOException {
+    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+    copyRateDirectory.setLockFactory(NoLockFactory.getNoLockFactory());
+    DirectoryReader reader = DirectoryReader.open(_localDir);
+    IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
+    writer.addIndexes(reader);
+    writer.close();
+    rm(_localPath);
+  }
+
+  private void copyDir() throws IOException {
+    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+    String[] fileNames = _localDir.listAll();
+    for (String fileName : fileNames) {
+      LOG.info("Copying [{0}] to [{1}]", fileName, _newIndex);
+      _localDir.copy(copyRateDirectory, fileName, fileName, IOContext.DEFAULT);
+    }
+    rm(_localPath);
+  }
+
+  private void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
new file mode 100644
index 0000000..6814ac2
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
@@ -0,0 +1,29 @@
+package org.apache.blur.mapreduce.lib;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * 
+ */
+public interface GetCounter {
+  
+  Counter getCounter(Enum<?> counterName);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
new file mode 100644
index 0000000..46c030f
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
@@ -0,0 +1,58 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class IOUtil {
+
+  public static final String UTF_8 = "UTF-8";
+
+  public static String readString(DataInput input) throws IOException {
+    int length = readVInt(input);
+    byte[] buffer = new byte[length];
+    input.readFully(buffer);
+    return new String(buffer, UTF_8);
+  }
+
+  public static void writeString(DataOutput output, String s) throws IOException {
+    byte[] bs = s.getBytes(UTF_8);
+    writeVInt(output, bs.length);
+    output.write(bs);
+  }
+
+  public static int readVInt(DataInput input) throws IOException {
+    byte b = input.readByte();
+    int i = b & 0x7F;
+    for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+      b = input.readByte();
+      i |= (b & 0x7F) << shift;
+    }
+    return i;
+  }
+
+  public static void writeVInt(DataOutput output, int i) throws IOException {
+    while ((i & ~0x7F) != 0) {
+      output.writeByte((byte) ((i & 0x7f) | 0x80));
+      i >>>= 7;
+    }
+    output.writeByte((byte) i);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
new file mode 100644
index 0000000..004e1fa
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
@@ -0,0 +1,289 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+/**
+ * The {@link ProgressableDirectory} allows for progress to be recorded while
+ * Lucene is blocked and merging. This prevents the Task from being killed after
+ * not reporting progress because of the blocked merge.
+ */
+public class ProgressableDirectory extends Directory {
+
+  private static final Log LOG = LogFactory.getLog(ProgressableDirectory.class);
+
+  private final Directory _directory;
+  private final Progressable _progressable;
+
+  public ProgressableDirectory(Directory directory, Progressable progressable) {
+    _directory = directory;
+    if (progressable == null) {
+      LOG.warn("Progressable is null.");
+      _progressable = new Progressable() {
+        @Override
+        public void progress() {
+
+        }
+      };
+    } else {
+      _progressable = progressable;
+    }
+  }
+
+  @Override
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+  private IndexInput wrapInput(String name, IndexInput openInput) {
+    return new ProgressableIndexInput(name, openInput, 16384, _progressable);
+  }
+
+  private IndexOutput wrapOutput(IndexOutput createOutput) {
+    return new ProgressableIndexOutput(createOutput, _progressable);
+  }
+
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return wrapOutput(_directory.createOutput(name, context));
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return _directory.equals(obj);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  @Override
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  @Override
+  public int hashCode() {
+    return _directory.hashCode();
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  @Override
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return wrapInput(name, _directory.openInput(name, context));
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  @Override
+  public String toString() {
+    return _directory.toString();
+  }
+
+  @SuppressWarnings("deprecation")
+  static class ProgressableIndexOutput extends IndexOutput {
+
+    private Progressable _progressable;
+    private IndexOutput _indexOutput;
+
+    public ProgressableIndexOutput(IndexOutput indexOutput, Progressable progressable) {
+      _indexOutput = indexOutput;
+      _progressable = progressable;
+    }
+
+    @Override
+    public void close() throws IOException {
+      _indexOutput.close();
+      _progressable.progress();
+    }
+
+    @Override
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+      _indexOutput.copyBytes(input, numBytes);
+      _progressable.progress();
+    }
+
+    @Override
+    public void flush() throws IOException {
+      _indexOutput.flush();
+      _progressable.progress();
+    }
+
+    @Override
+    public long getFilePointer() {
+      return _indexOutput.getFilePointer();
+    }
+
+    @Override
+    public long length() throws IOException {
+      return _indexOutput.length();
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      _indexOutput.seek(pos);
+      _progressable.progress();
+    }
+
+    @Override
+    public void setLength(long length) throws IOException {
+      _indexOutput.setLength(length);
+      _progressable.progress();
+    }
+
+    @Override
+    public String toString() {
+      return _indexOutput.toString();
+    }
+
+    @Override
+    public void writeByte(byte b) throws IOException {
+      _indexOutput.writeByte(b);
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      _indexOutput.writeBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int length) throws IOException {
+      _indexOutput.writeBytes(b, length);
+      _progressable.progress();
+    }
+
+    @Override
+    public void writeInt(int i) throws IOException {
+      _indexOutput.writeInt(i);
+    }
+
+    @Override
+    public void writeLong(long i) throws IOException {
+      _indexOutput.writeLong(i);
+    }
+
+    @Override
+    public void writeString(String s) throws IOException {
+      _indexOutput.writeString(s);
+    }
+
+    @Override
+    public void writeStringStringMap(Map<String, String> map) throws IOException {
+      _indexOutput.writeStringStringMap(map);
+    }
+
+  }
+
+  static class ProgressableIndexInput extends BufferedIndexInput {
+
+    private IndexInput _indexInput;
+    private final long _length;
+    private Progressable _progressable;
+
+    ProgressableIndexInput(String name, IndexInput indexInput, int buffer, Progressable progressable) {
+      super("ProgressableIndexInput(" + indexInput.toString() + ")", buffer);
+      _indexInput = indexInput;
+      _length = indexInput.length();
+      _progressable = progressable;
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      long filePointer = getFilePointer();
+      if (filePointer != _indexInput.getFilePointer()) {
+        _indexInput.seek(filePointer);
+      }
+      _indexInput.readBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      _indexInput.close();
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    public ProgressableIndexInput clone() {
+      ProgressableIndexInput clone = (ProgressableIndexInput) super.clone();
+      clone._indexInput = (IndexInput) _indexInput.clone();
+      return clone;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
new file mode 100644
index 0000000..694759e
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
@@ -0,0 +1,64 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * This turns a standard hadoop counter into a rate counter.
+ */
+public class RateCounter {
+
+  private final Counter _counter;
+  private final long _reportTime;
+  private final long _rateTime;
+  private long _lastReport;
+  private long _count = 0;
+
+  public RateCounter(Counter counter) {
+    this(counter, TimeUnit.SECONDS, 1);
+  }
+
+  public RateCounter(Counter counter, TimeUnit unit, long reportTime) {
+    _counter = counter;
+    _lastReport = System.nanoTime();
+    _reportTime = unit.toNanos(reportTime);
+    _rateTime = unit.toSeconds(reportTime);
+  }
+
+  public void mark() {
+    mark(1l);
+  }
+
+  public void mark(long n) {
+    long now = System.nanoTime();
+    if (_lastReport + _reportTime < now) {
+      long rate = _count / _rateTime;
+      _counter.setValue(rate);
+      _lastReport = System.nanoTime();
+      _count = 0;
+    }
+    _count += n;
+  }
+
+  public void close() {
+    _counter.setValue(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/pom.xml b/blur-mapred-hadoop2/pom.xml
index e541417..4127e4e 100644
--- a/blur-mapred-hadoop2/pom.xml
+++ b/blur-mapred-hadoop2/pom.xml
@@ -35,8 +35,14 @@ under the License.
 
 	<dependencies>
 		<dependency>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+			<version>${zookeeper.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-mapred-common</artifactId>
+			<artifactId>blur-core</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
@@ -48,6 +54,16 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-store</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-util</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
 			<artifactId>blur-util</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
new file mode 100644
index 0000000..037edec
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
@@ -0,0 +1,49 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Base mapper class for Blur map reduce classes.
+ * 
+ * @param <KEY>
+ * @param <VALUE>
+ */
+public abstract class BaseBlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, Text, BlurMutate> {
+  protected BlurMutate _mutate;
+  protected Text _key;
+  protected Counter _recordCounter;
+  protected Counter _columnCounter;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    _mutate = new BlurMutate();
+    _mutate.setRecord(new BlurRecord());
+    _key = new Text();
+    _recordCounter = context.getCounter(BlurCounters.RECORD_COUNT);
+    _columnCounter = context.getCounter(BlurCounters.COLUMN_COUNT);
+  }
+
+  @Override
+  protected abstract void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
new file mode 100644
index 0000000..d32a3bd
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
@@ -0,0 +1,109 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class BlurColumn implements Writable {
+
+  private String name;
+  private String value;
+
+  public BlurColumn() {
+  }
+
+  public BlurColumn(String name, String value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  public boolean hasNull() {
+    if (name == null || value == null) {
+      return true;
+    }
+    return false;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    name = IOUtil.readString(in);
+    value = IOUtil.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeString(out, name);
+    IOUtil.writeString(out, value);
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public void setValue(String value) {
+    this.value = value;
+  }
+
+  @Override
+  public String toString() {
+    return "{name=" + name + ", value=" + value + "}";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurColumn other = (BlurColumn) obj;
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
+      return false;
+    if (value == null) {
+      if (other.value != null)
+        return false;
+    } else if (!value.equals(other.value))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
new file mode 100644
index 0000000..0691dce
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
@@ -0,0 +1,26 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * The enum class used for all the internal counters during map reduce jobs.
+ */
+public enum BlurCounters {
+  RECORD_COUNT, LUCENE_FIELD_COUNT, ROW_COUNT, RECORD_RATE, COPY_RATE, ROW_RATE, RECORD_DUPLICATE_COUNT, ROW_OVERFLOW_COUNT, ROW_DELETE_COUNT, COLUMN_COUNT
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
new file mode 100644
index 0000000..ae2bcde
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This utility code was taken from HBase to locate classes and the jars files
+ * to add to the MapReduce Job.
+ */
+public class BlurMapReduceUtil {
+
+  private final static Log LOG = LogFactory.getLog(BlurMapReduceUtil.class);
+
+  /**
+   * Add the Blur dependency jars as well as jars for any of the configured job
+   * classes to the job configuration, so that JobClient will ship them to the
+   * cluster and add them to the DistributedCache.
+   */
+  public static void addDependencyJars(Job job) throws IOException {
+    try {
+      addDependencyJars(job.getConfiguration(), org.apache.zookeeper.ZooKeeper.class, job.getMapOutputKeyClass(),
+          job.getMapOutputValueClass(), job.getInputFormatClass(), job.getOutputKeyClass(), job.getOutputValueClass(),
+          job.getOutputFormatClass(), job.getPartitionerClass(), job.getCombinerClass());
+      addAllJarsInBlurLib(job.getConfiguration());
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Adds all the jars in the same path as the blur jar files.
+   * @param conf
+   * @throws IOException
+   */
+  public static void addAllJarsInBlurLib(Configuration conf) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Set<String> jars = new HashSet<String>();
+    jars.addAll(conf.getStringCollection("tmpjars"));
+
+    String property = System.getProperty("java.class.path");
+    String[] files = property.split("\\:");
+
+    String blurLibPath = getPath("blur-", files);
+    if (blurLibPath == null) {
+      return;
+    }
+    List<String> pathes = getPathes(blurLibPath, files);
+    for (String pathStr : pathes) {
+      Path path = new Path(pathStr);
+      if (!localFs.exists(path)) {
+        LOG.warn("Could not validate jar file " + path);
+        continue;
+      }
+      jars.add(path.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toString());
+    }
+    if (jars.isEmpty()) {
+      return;
+    }
+    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
+  }
+
+  private static List<String> getPathes(String path, String[] files) {
+    List<String> pathes = new ArrayList<String>();
+    for (String file : files) {
+      if (file.startsWith(path)) {
+        pathes.add(file);
+      }
+    }
+    return pathes;
+  }
+
+  private static String getPath(String startsWith, String[] files) {
+    for (String file : files) {
+      int lastIndexOf = file.lastIndexOf('/');
+      String fileName = file.substring(lastIndexOf + 1);
+      if (fileName.startsWith(startsWith)) {
+        return file.substring(0, lastIndexOf);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Add the jars containing the given classes to the job's configuration such
+   * that JobClient will ship them to the cluster and add them to the
+   * DistributedCache.
+   */
+  public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Set<String> jars = new HashSet<String>();
+    // Add jars that are already in the tmpjars variable
+    jars.addAll(conf.getStringCollection("tmpjars"));
+
+    // Add jars containing the specified classes
+    for (Class<?> clazz : classes) {
+      if (clazz == null) {
+        continue;
+      }
+
+      String pathStr = findOrCreateJar(clazz);
+      if (pathStr == null) {
+        LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
+        continue;
+      }
+      Path path = new Path(pathStr);
+      if (!localFs.exists(path)) {
+        LOG.warn("Could not validate jar file " + path + " for class " + clazz);
+        continue;
+      }
+      jars.add(path.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toString());
+    }
+    if (jars.isEmpty()) {
+      return;
+    }
+
+    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
+  }
+
+  /**
+   * If org.apache.hadoop.util.JarFinder is available (0.23+ hadoop), finds the
+   * Jar for a class or creates it if it doesn't exist. If the class is in a
+   * directory in the classpath, it creates a Jar on the fly with the contents
+   * of the directory and returns the path to that Jar. If a Jar is created, it
+   * is created in the system temporary directory.
+   * 
+   * Otherwise, returns an existing jar that contains a class of the same name.
+   * 
+   * @param my_class
+   *          the class to find.
+   * @return a jar file that contains the class, or null.
+   * @throws IOException
+   */
+  private static String findOrCreateJar(Class<?> my_class) throws IOException {
+    try {
+      Class<?> jarFinder = Class.forName("org.apache.hadoop.util.JarFinder");
+      // hadoop-0.23 has a JarFinder class that will create the jar
+      // if it doesn't exist. Note that this is needed to run the mapreduce
+      // unit tests post-0.23, because mapreduce v2 requires the relevant jars
+      // to be in the mr cluster to do output, split, etc. At unit test time,
+      // the hbase jars do not exist, so we need to create some. Note that we
+      // can safely fall back to findContainingJars for pre-0.23 mapreduce.
+      Method m = jarFinder.getMethod("getJar", Class.class);
+      return (String) m.invoke(null, my_class);
+    } catch (InvocationTargetException ite) {
+      // function was properly called, but threw it's own exception
+      throw new IOException(ite.getCause());
+    } catch (Exception e) {
+      // ignore all other exceptions. related to reflection failure
+    }
+
+    LOG.debug("New JarFinder: org.apache.hadoop.util.JarFinder.getJar " + "not available.  Using old findContainingJar");
+    return findContainingJar(my_class);
+  }
+
+  /**
+   * Find a jar that contains a class of the same name, if any. It will return a
+   * jar file, even if that is not the first thing on the class path that has a
+   * class with the same name.
+   * 
+   * This is shamelessly copied from JobConf
+   * 
+   * @param my_class
+   *          the class to find.
+   * @return a jar file that contains the class, or null.
+   * @throws IOException
+   */
+  private static String findContainingJar(Class<?> my_class) {
+    ClassLoader loader = my_class.getClassLoader();
+    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+    try {
+      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
+        URL url = itr.nextElement();
+        if ("jar".equals(url.getProtocol())) {
+          String toReturn = url.getPath();
+          if (toReturn.startsWith("file:")) {
+            toReturn = toReturn.substring("file:".length());
+          }
+          // URLDecoder is a misnamed class, since it actually decodes
+          // x-www-form-urlencoded MIME type rather than actual
+          // URL encoding (which the file path has). Therefore it would
+          // decode +s to ' 's which is incorrect (spaces are actually
+          // either unencoded or encoded as "%20"). Replace +s first, so
+          // that they are kept sacred during the decoding process.
+          toReturn = toReturn.replaceAll("\\+", "%2B");
+          toReturn = URLDecoder.decode(toReturn, "UTF-8");
+          return toReturn.replaceAll("!.*$", "");
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
new file mode 100644
index 0000000..36d7f4f
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
@@ -0,0 +1,178 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * {@link BlurMutate} carries the {@link Record}s bound for the {@link Row} for
+ * indexing. If this mutate represents a delete of the {@link Row} the recordId
+ * of the {@link BlurRecord} is ignored.
+ */
+public class BlurMutate implements Writable {
+
+  /**
+   * The {@link MUTATE_TYPE} controls the mutating of the {@link Row}. DELETE
+   * indicates that the {@link Row} is to be deleted. REPLACE indicates that the
+   * group of mutates are to replace the existing {@link Row}.
+   * 
+   * If both a DELETE and a REPLACE exist for a single {@link Row} in the
+   * {@link BlurOutputFormat} then the {@link Row} will be replaced not just
+   * deleted.
+   */
+  public enum MUTATE_TYPE {
+    /* ADD(0), UPDATE(1), */DELETE(2), REPLACE(3);
+    private int _value;
+
+    private MUTATE_TYPE(int value) {
+      _value = value;
+    }
+
+    public int getValue() {
+      return _value;
+    }
+
+    public MUTATE_TYPE find(int value) {
+      switch (value) {
+      // @TODO Updates through MR is going to be disabled
+      // case 0:
+      // return ADD;
+      // case 1:
+      // return UPDATE;
+      case 2:
+        return DELETE;
+      case 3:
+        return REPLACE;
+      default:
+        throw new RuntimeException("Value [" + value + "] not found.");
+      }
+    }
+  }
+
+  private MUTATE_TYPE _mutateType = MUTATE_TYPE.REPLACE;
+  private BlurRecord _record = new BlurRecord();
+
+  public BlurMutate() {
+
+  }
+
+  public BlurMutate(MUTATE_TYPE type, BlurRecord record) {
+    _mutateType = type;
+    _record = record;
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+    _record.setRecordId(recordId);
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId, String family) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+    _record.setRecordId(recordId);
+    _record.setFamily(family);
+  }
+
+  public BlurMutate addColumn(BlurColumn column) {
+    _record.addColumn(column);
+    return this;
+  }
+
+  public BlurMutate addColumn(String name, String value) {
+    return addColumn(new BlurColumn(name, value));
+  }
+
+  public BlurRecord getRecord() {
+    return _record;
+  }
+
+  public void setRecord(BlurRecord record) {
+    _record = record;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeVInt(out, _mutateType.getValue());
+    _record.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _mutateType.find(IOUtil.readVInt(in));
+    _record.readFields(in);
+  }
+
+  public MUTATE_TYPE getMutateType() {
+    return _mutateType;
+  }
+
+  public BlurMutate setMutateType(MUTATE_TYPE mutateType) {
+    _mutateType = mutateType;
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "BlurMutate [mutateType=" + _mutateType + ", record=" + _record + "]";
+  }
+
+  public BlurMutate setFamily(String family) {
+    _record.setFamily(family);
+    return this;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_mutateType == null) ? 0 : _mutateType.hashCode());
+    result = prime * result + ((_record == null) ? 0 : _record.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurMutate other = (BlurMutate) obj;
+    if (_mutateType != other._mutateType)
+      return false;
+    if (_record == null) {
+      if (other._record != null)
+        return false;
+    } else if (!_record.equals(other._record))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
new file mode 100644
index 0000000..d906974
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
@@ -0,0 +1,220 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class BlurOutputCommitter extends OutputCommitter {
+
+  private static final Log LOG = LogFactory.getLog(BlurOutputCommitter.class);
+
+  private Path _newIndex;
+  private Configuration _configuration;
+  private TaskAttemptID _taskAttemptID;
+  private Path _indexPath;
+  private final boolean _runTaskCommit;
+  private TableDescriptor _tableDescriptor;
+
+  public BlurOutputCommitter(TaskType taskType, int numReduceTasks) {
+    if (taskType == TaskType.MAP && numReduceTasks != 0) {
+      _runTaskCommit = false;
+    } else {
+      _runTaskCommit = true;
+    }
+  }
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+    LOG.info("Running setup job.");
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    // look through all the shards for attempts that need to be cleaned up.
+    // also find all the attempts that are finished
+    // then rename all the attempts jobs to commits
+    LOG.info("Commiting Job [{0}]", jobContext.getJobID());
+    Configuration configuration = jobContext.getConfiguration();
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    LOG.info("TableOutput path [{0}]", tableOutput);
+    makeSureNoEmptyShards(configuration, tableOutput);
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
+      LOG.info("Checking file status [{0}] with path [{1}]", fileStatus, fileStatus.getPath());
+      if (isShard(fileStatus)) {
+        commitOrAbortJob(jobContext, fileStatus.getPath(), true);
+      }
+    }
+    LOG.info("Commiting Complete [{0}]", jobContext.getJobID());
+  }
+
+  private void makeSureNoEmptyShards(Configuration configuration, Path tableOutput) throws IOException {
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    int shardCount = tableDescriptor.getShardCount();
+    for (int i = 0; i < shardCount; i++) {
+      String shardName = BlurUtil.getShardName(i);
+      fileSystem.mkdirs(new Path(tableOutput, shardName));
+    }
+  }
+
+  private void commitOrAbortJob(JobContext jobContext, Path shardPath, boolean commit) throws IOException {
+    LOG.info("CommitOrAbort [{0}] path [{1}]", commit, shardPath);
+    FileSystem fileSystem = shardPath.getFileSystem(jobContext.getConfiguration());
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        LOG.info("Checking path [{0}]", path);
+        if (path.getName().endsWith(".task_complete")) {
+          return true;
+        }
+        return false;
+      }
+    });
+    for (FileStatus fileStatus : listStatus) {
+      Path path = fileStatus.getPath();
+      LOG.info("Trying to commitOrAbort [{0}]", path);
+      String name = path.getName();
+      boolean taskComplete = name.endsWith(".task_complete");
+      if (fileStatus.isDirectory()) {
+        String taskAttemptName = getTaskAttemptName(name);
+        if (taskAttemptName == null) {
+          LOG.info("Dir name [{0}] not task attempt", name);
+          continue;
+        }
+        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptName);
+        if (taskAttemptID.getJobID().equals(jobContext.getJobID())) {
+          if (commit) {
+            if (taskComplete) {
+              fileSystem.rename(path, new Path(shardPath, taskAttemptName + ".commit"));
+              LOG.info("Committing [{0}] in path [{1}]", taskAttemptID, path);
+            } else {
+              fileSystem.delete(path, true);
+              LOG.info("Deleteing tmp dir [{0}] in path [{1}]", taskAttemptID, path);
+            }
+          } else {
+            fileSystem.delete(path, true);
+            LOG.info("Deleteing aborted job dir [{0}] in path [{1}]", taskAttemptID, path);
+          }
+        } else {
+          LOG.warn("TaskAttempt JobID [{0}] does not match JobContext JobId [{1}]", taskAttemptID.getJobID(),
+              jobContext.getJobID());
+        }
+      }
+    }
+  }
+
+  private String getTaskAttemptName(String name) {
+    int lastIndexOf = name.lastIndexOf('.');
+    if (lastIndexOf < 0) {
+      return null;
+    }
+    return name.substring(0, lastIndexOf);
+  }
+
+  private boolean isShard(FileStatus fileStatus) {
+    return isShard(fileStatus.getPath());
+  }
+
+  private boolean isShard(Path path) {
+    return path.getName().startsWith(BlurConstants.SHARD_PREFIX);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, State state) throws IOException {
+    LOG.info("Abort Job [{0}]", jobContext.getJobID());
+    Configuration configuration = jobContext.getConfiguration();
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    makeSureNoEmptyShards(configuration, tableOutput);
+    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
+    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
+      if (isShard(fileStatus)) {
+        commitOrAbortJob(jobContext, fileStatus.getPath(), false);
+      }
+    }
+  }
+
+  @Override
+  public void cleanupJob(JobContext context) throws IOException {
+    LOG.info("Running job cleanup.");
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+    return _runTaskCommit;
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    LOG.info("Running task setup.");
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    LOG.info("Running commit task.");
+    setup(context);
+    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
+    if (fileSystem.exists(_newIndex) && !fileSystem.isFile(_newIndex)) {
+      Path dst = new Path(_indexPath, _taskAttemptID.toString() + ".task_complete");
+      LOG.info("Committing [{0}] to [{1}]", _newIndex, dst);
+      fileSystem.rename(_newIndex, dst);
+    } else {
+      throw new IOException("Path [" + _newIndex + "] does not exist, can not commit.");
+    }
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    LOG.info("Running abort task.");
+    setup(context);
+    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
+    LOG.info("abortTask - Deleting [{0}]", _newIndex);
+    fileSystem.delete(_newIndex, true);
+  }
+
+  private void setup(TaskAttemptContext context) throws IOException {
+    _configuration = context.getConfiguration();
+    _tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+    int shardCount = _tableDescriptor.getShardCount();
+    int attemptId = context.getTaskAttemptID().getTaskID().getId();
+    int shardId = attemptId % shardCount;
+    _taskAttemptID = context.getTaskAttemptID();
+    Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
+    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+    _indexPath = new Path(tableOutput, shardName);
+    _newIndex = new Path(_indexPath, _taskAttemptID.toString() + ".tmp");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
new file mode 100644
index 0000000..20d2e33
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -0,0 +1,341 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * {@link BlurOutputFormat} is used to index data and delivery the indexes to
+ * the proper Blur table for searching. A typical usage of this class would be
+ * as follows.<br/>
+ * <br/>
+ * 
+ * <br/>
+ * {@link Iface} client = {@link BlurClient}.getClient("controller1:40010");<br/>
+ * <br/>
+ * TableDescriptor tableDescriptor = client.describe(tableName);<br/>
+ * <br/>
+ * Job job = new Job(jobConf, "blur index");<br/>
+ * job.setJarByClass(BlurOutputFormatTest.class);<br/>
+ * job.setMapperClass(CsvBlurMapper.class);<br/>
+ * job.setInputFormatClass(TextInputFormat.class);<br/>
+ * <br/>
+ * FileInputFormat.addInputPath(job, new Path(input));<br/>
+ * CsvBlurMapper.addColumns(job, "cf1", "col");<br/>
+ * <br/>
+ * BlurOutputFormat.setupJob(job, tableDescriptor);<br/>
+ * BlurOutputFormat.setIndexLocally(job, true);<br/>
+ * BlurOutputFormat.setOptimizeInFlight(job, false);<br/>
+ * <br/>
+ * job.waitForCompletion(true);<br/>
+ * 
+ */
+public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> {
+
+  public static final String BLUR_OUTPUT_REDUCER_MULTIPLIER = "blur.output.reducer.multiplier";
+  public static final String BLUR_OUTPUT_OPTIMIZEINFLIGHT = "blur.output.optimizeinflight";
+  public static final String BLUR_OUTPUT_INDEXLOCALLY = "blur.output.indexlocally";
+  public static final String BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE = "blur.output.max.document.buffer.size";
+  public static final String BLUR_TABLE_DESCRIPTOR = "blur.table.descriptor";
+  public static final String BLUR_OUTPUT_PATH = "blur.output.path";
+
+  private static final String MAPRED_OUTPUT_COMMITTER_CLASS = "mapred.output.committer.class";
+  private static ThreadLocal<Progressable> _progressable = new ThreadLocal<Progressable>();
+  private static ThreadLocal<GetCounter> _getCounter = new ThreadLocal<GetCounter>();
+
+  public static void setProgressable(Progressable progressable) {
+    _progressable.set(progressable);
+  }
+
+  public static Progressable getProgressable() {
+    return _progressable.get();
+  }
+
+  public static void setGetCounter(GetCounter getCounter) {
+    _getCounter.set(getCounter);
+  }
+
+  public static GetCounter getGetCounter() {
+    return _getCounter.get();
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+    CheckOutputSpecs.checkOutputSpecs(context.getConfiguration(), context.getNumReduceTasks());
+  }
+
+  @Override
+  public RecordWriter<Text, BlurMutate> getRecordWriter(TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    int id = context.getTaskAttemptID().getTaskID().getId();
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    final GenericBlurRecordWriter writer = new GenericBlurRecordWriter(context.getConfiguration(), id,
+        taskAttemptID.toString() + ".tmp");
+    return new RecordWriter<Text, BlurMutate>() {
+
+      @Override
+      public void write(Text key, BlurMutate value) throws IOException, InterruptedException {
+        writer.write(key, value);
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+        writer.close();
+      }
+    };
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    TaskType taskType = taskAttemptID.getTaskType();
+    return new BlurOutputCommitter(taskType, context.getNumReduceTasks());
+  }
+
+  public static TableDescriptor getTableDescriptor(Configuration configuration) throws IOException {
+    String tableDesStr = configuration.get(BLUR_TABLE_DESCRIPTOR);
+    if (tableDesStr == null) {
+      return null;
+    }
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(tableDesStr.getBytes());
+    TIOStreamTransport transport = new TIOStreamTransport(inputStream);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    TableDescriptor descriptor = new TableDescriptor();
+    try {
+      descriptor.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    transport.close();
+    return descriptor;
+  }
+
+  /**
+   * This will multiple the number of reducers for this job. For example if the
+   * table has 256 shards the normal number of reducers is 256. However if the
+   * reducer multiplier is set to 4 then the number of reducers will be 1024 and
+   * each shard will get 4 new segments instead of the normal 1.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param multiple
+   *          the multiple to use.
+   * @throws IOException
+   */
+  public static void setReducerMultiplier(Job job, int multiple) throws IOException {
+    TableDescriptor tableDescriptor = getTableDescriptor(job.getConfiguration());
+    if (tableDescriptor == null) {
+      throw new IOException("setTableDescriptor needs to be called first.");
+    }
+    job.setNumReduceTasks(tableDescriptor.getShardCount() * multiple);
+    Configuration configuration = job.getConfiguration();
+    configuration.setInt(BLUR_OUTPUT_REDUCER_MULTIPLIER, multiple);
+  }
+
+  public static int getReducerMultiplier(Configuration configuration) {
+    return configuration.getInt(BLUR_OUTPUT_REDUCER_MULTIPLIER, 1);
+  }
+
+  /**
+   * Sets the {@link TableDescriptor} for this job.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param tableDescriptor
+   *          the {@link TableDescriptor}.
+   * @throws IOException
+   */
+  public static void setTableDescriptor(Job job, TableDescriptor tableDescriptor) throws IOException {
+    setTableDescriptor(job.getConfiguration(), tableDescriptor);
+  }
+
+  /**
+   * Sets the {@link TableDescriptor} for this job.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param tableDescriptor
+   *          the {@link TableDescriptor}.
+   * @throws IOException
+   */
+  public static void setTableDescriptor(Configuration configuration, TableDescriptor tableDescriptor)
+      throws IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    TIOStreamTransport transport = new TIOStreamTransport(outputStream);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      tableDescriptor.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    transport.close();
+    configuration.set(BLUR_TABLE_DESCRIPTOR, new String(outputStream.toByteArray()));
+    setOutputPath(configuration, new Path(tableDescriptor.getTableUri()));
+  }
+
+  /**
+   * Sets the maximum number of documents that the buffer will hold in memory
+   * before overflowing to disk. By default this is 1000 which will probably be
+   * very low for most systems.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param maxDocumentBufferSize
+   *          the maxDocumentBufferSize.
+   */
+  public static void setMaxDocumentBufferSize(Job job, int maxDocumentBufferSize) {
+    setMaxDocumentBufferSize(job.getConfiguration(), maxDocumentBufferSize);
+  }
+
+  /**
+   * Sets the maximum number of documents that the buffer will hold in memory
+   * before overflowing to disk. By default this is 1000 which will probably be
+   * very low for most systems.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param maxDocumentBufferSize
+   *          the maxDocumentBufferSize.
+   */
+  public static void setMaxDocumentBufferSize(Configuration configuration, int maxDocumentBufferSize) {
+    configuration.setInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, maxDocumentBufferSize);
+  }
+
+  public static int getMaxDocumentBufferSize(Configuration configuration) {
+    return configuration.getInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, 1000);
+  }
+
+  public static void setOutputPath(Job job, Path path) {
+    setOutputPath(job.getConfiguration(), path);
+  }
+
+  public static void setOutputPath(Configuration configuration, Path path) {
+    configuration.set(BLUR_OUTPUT_PATH, path.toString());
+    configuration.set(MAPRED_OUTPUT_COMMITTER_CLASS, BlurOutputCommitter.class.getName());
+  }
+
+  public static Path getOutputPath(Configuration configuration) {
+    return new Path(configuration.get(BLUR_OUTPUT_PATH));
+  }
+
+  /**
+   * Enabled by default, this will enable local indexing on the machine where
+   * the task is running. Then when the {@link RecordWriter} closes the index is
+   * copied to the remote destination in HDFS.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setIndexLocally(Job job, boolean b) {
+    setIndexLocally(job.getConfiguration(), b);
+  }
+
+  /**
+   * Enabled by default, this will enable local indexing on the machine where
+   * the task is running. Then when the {@link RecordWriter} closes the index is
+   * copied to the remote destination in HDFS.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setIndexLocally(Configuration configuration, boolean b) {
+    configuration.setBoolean(BLUR_OUTPUT_INDEXLOCALLY, b);
+  }
+
+  public static boolean isIndexLocally(Configuration configuration) {
+    return configuration.getBoolean(BLUR_OUTPUT_INDEXLOCALLY, true);
+  }
+
+  /**
+   * Enabled by default, this will optimize the index while copying from the
+   * local index to the remote destination in HDFS. Used in conjunction with the
+   * setIndexLocally.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setOptimizeInFlight(Job job, boolean b) {
+    setOptimizeInFlight(job.getConfiguration(), b);
+  }
+
+  /**
+   * Enabled by default, this will optimize the index while copying from the
+   * local index to the remote destination in HDFS. Used in conjunction with the
+   * setIndexLocally.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setOptimizeInFlight(Configuration configuration, boolean b) {
+    configuration.setBoolean(BLUR_OUTPUT_OPTIMIZEINFLIGHT, b);
+  }
+
+  public static boolean isOptimizeInFlight(Configuration configuration) {
+    return configuration.getBoolean(BLUR_OUTPUT_OPTIMIZEINFLIGHT, true);
+  }
+
+  /**
+   * Sets up the output portion of the map reduce job. This does effect the map
+   * side of the job, of a map and reduce job.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param tableDescriptor
+   *          the table descriptor to write the output of the indexing job.
+   * @throws IOException
+   */
+  public static void setupJob(Job job, TableDescriptor tableDescriptor) throws IOException {
+    job.setReducerClass(DefaultBlurReducer.class);
+    job.setNumReduceTasks(tableDescriptor.getShardCount());
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(BlurMutate.class);
+    job.setOutputFormatClass(BlurOutputFormat.class);
+    setTableDescriptor(job, tableDescriptor);
+    BlurMapReduceUtil.addDependencyJars(job);
+    BlurMapReduceUtil.addAllJarsInBlurLib(job.getConfiguration());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
new file mode 100644
index 0000000..7c12a76
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
@@ -0,0 +1,178 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.utils.ReaderBlurRecord;
+import org.apache.hadoop.io.Writable;
+
+public class BlurRecord implements Writable, ReaderBlurRecord {
+
+  private String _rowId;
+  private String _recordId;
+  private String _family;
+
+  private List<BlurColumn> _columns = new ArrayList<BlurColumn>();
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _rowId = IOUtil.readString(in);
+    _recordId = IOUtil.readString(in);
+    _family = IOUtil.readString(in);
+    int size = IOUtil.readVInt(in);
+    _columns.clear();
+    for (int i = 0; i < size; i++) {
+      BlurColumn column = new BlurColumn();
+      column.readFields(in);
+      _columns.add(column);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeString(out, _rowId);
+    IOUtil.writeString(out, _recordId);
+    IOUtil.writeString(out, _family);
+    IOUtil.writeVInt(out, _columns.size());
+    for (BlurColumn column : _columns) {
+      column.write(out);
+    }
+  }
+
+  public String getRowId() {
+    return _rowId;
+  }
+
+  public void setRowId(String rowId) {
+    this._rowId = rowId;
+  }
+
+  public String getRecordId() {
+    return _recordId;
+  }
+
+  public void setRecordId(String recordId) {
+    this._recordId = recordId;
+  }
+
+  public String getFamily() {
+    return _family;
+  }
+
+  public void setFamily(String family) {
+    this._family = family;
+  }
+
+  public List<BlurColumn> getColumns() {
+    return _columns;
+  }
+
+  public void setColumns(List<BlurColumn> columns) {
+    this._columns = columns;
+  }
+
+  public void clearColumns() {
+    _columns.clear();
+  }
+
+  public void addColumn(BlurColumn column) {
+    _columns.add(column);
+  }
+
+  public void addColumn(String name, String value) {
+    BlurColumn blurColumn = new BlurColumn();
+    blurColumn.setName(name);
+    blurColumn.setValue(value);
+    addColumn(blurColumn);
+  }
+
+  @Override
+  public void setRecordIdStr(String value) {
+    setRecordId(value);
+  }
+
+  @Override
+  public void setFamilyStr(String family) {
+    setFamily(family);
+  }
+
+  public void reset() {
+    clearColumns();
+    _rowId = null;
+    _recordId = null;
+    _family = null;
+  }
+
+  @Override
+  public void setRowIdStr(String rowId) {
+    setRowId(rowId);
+  }
+
+  @Override
+  public String toString() {
+    return "{rowId=" + _rowId + ", recordId=" + _recordId + ", family=" + _family + ", columns=" + _columns + "}";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_columns == null) ? 0 : _columns.hashCode());
+    result = prime * result + ((_family == null) ? 0 : _family.hashCode());
+    result = prime * result + ((_recordId == null) ? 0 : _recordId.hashCode());
+    result = prime * result + ((_rowId == null) ? 0 : _rowId.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurRecord other = (BlurRecord) obj;
+    if (_columns == null) {
+      if (other._columns != null)
+        return false;
+    } else if (!_columns.equals(other._columns))
+      return false;
+    if (_family == null) {
+      if (other._family != null)
+        return false;
+    } else if (!_family.equals(other._family))
+      return false;
+    if (_recordId == null) {
+      if (other._recordId != null)
+        return false;
+    } else if (!_recordId.equals(other._recordId))
+      return false;
+    if (_rowId == null) {
+      if (other._rowId != null)
+        return false;
+    } else if (!_rowId.equals(other._rowId))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
new file mode 100644
index 0000000..5f4fec6
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
@@ -0,0 +1,90 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+
+public abstract class BlurRecordReader extends RecordReader<Text, BlurRecord> {
+
+//  private IndexReader reader;
+//  private Directory directory;
+//  private int startingDocId;
+//  private int endingDocId;
+//  private int position;
+//  private Text rowid = new Text();
+//  private BlurRecord record = new BlurRecord();
+//
+//  @Override
+//  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+//    BlurInputSplit blurSplit = (BlurInputSplit) split;
+//    Path path = blurSplit.getIndexPath();
+//    String segmentName = blurSplit.getSegmentName();
+//    startingDocId = blurSplit.getStartingDocId();
+//    endingDocId = blurSplit.getEndingDocId();
+//    directory = new HdfsDirectory(context.getConfiguration(), path);
+//
+//    IndexCommit commit = Utils.findLatest(directory);
+//    reader = Utils.openSegmentReader(directory, commit, segmentName, Utils.getTermInfosIndexDivisor(context.getConfiguration()));
+//    int maxDoc = reader.maxDoc();
+//    if (endingDocId >= maxDoc) {
+//      endingDocId = maxDoc - 1;
+//    }
+//    position = startingDocId - 1;
+//  }
+//
+//  @Override
+//  public boolean nextKeyValue() throws IOException, InterruptedException {
+//    do {
+//      position++;
+//      if (position > endingDocId) {
+//        return false;
+//      }
+//    } while (reader.isDeleted(position));
+//    readDocument();
+//    return true;
+//  }
+//
+//  private void readDocument() throws CorruptIndexException, IOException {
+//    Document document = reader.document(position);
+//    record.reset();
+//    rowid.set(RowDocumentUtil.readRecord(document, record));
+//  }
+//
+//  @Override
+//  public Text getCurrentKey() throws IOException, InterruptedException {
+//    return rowid;
+//  }
+//
+//  @Override
+//  public BlurRecord getCurrentValue() throws IOException, InterruptedException {
+//    return record;
+//  }
+//
+//  @Override
+//  public float getProgress() throws IOException, InterruptedException {
+//    int total = endingDocId - startingDocId;
+//    return (float) position / (float) total;
+//  }
+//
+//  @Override
+//  public void close() throws IOException {
+//    reader.close();
+//    directory.close();
+//  }
+}


Mime
View raw message