incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/5] git commit: Renaming the blur-mapred-common project back to blur-mapred and removing old hadoop1 and hadoop2 projects because they are no longer needed.
Date Sun, 12 Apr 2015 16:16:40 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 5b047efb2 -> 2fa413560


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
new file mode 100644
index 0000000..7a0aabf
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.analysis.FieldManager;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.LuceneVersionConstant;
+import org.apache.blur.lucene.codec.Blur024Codec;
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SlowCompositeReaderWrapper;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.NoLockFactory;
+import org.apache.lucene.store.SimpleFSDirectory;
+
+public class GenericBlurRecordWriter {
+
+  private static final Log LOG = LogFactory.getLog(GenericBlurRecordWriter.class);
+  private static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
+  private static final Counter NULL_COUNTER = NullCounter.getNullCounter();
+
+  private final Text _prevKey = new Text();
+
+  private final IndexWriter _writer;
+  private final FieldManager _fieldManager;
+  private final Directory _finalDir;
+  private final Directory _localDir;
+  private final File _localPath;
+  // private final int _maxDocumentBufferSize;
+  private final IndexWriterConfig _conf;
+  private final IndexWriterConfig _overFlowConf;
+  private final Path _newIndex;
+  private final boolean _indexLocally;
+  private final boolean _optimizeInFlight;
+  private final DocumentBufferStrategy _documentBufferStrategy;
+  private Counter _columnCount = NULL_COUNTER;
+  private Counter _fieldCount = NULL_COUNTER;
+  private Counter _recordCount = NULL_COUNTER;
+  private Counter _rowCount = NULL_COUNTER;
+  private Counter _recordDuplicateCount = NULL_COUNTER;
+  private Counter _rowOverFlowCount = NULL_COUNTER;
+  private Counter _rowDeleteCount = NULL_COUNTER;
+  private RateCounter _recordRateCounter = new RateCounter(NULL_COUNTER);
+  private RateCounter _rowRateCounter = new RateCounter(NULL_COUNTER);
+  private RateCounter _copyRateCounter = new RateCounter(NULL_COUNTER);
+  private boolean _countersSetup = false;
+  private IndexWriter _localTmpWriter;
+  private boolean _usingLocalTmpindex;
+  private File _localTmpPath;
+  private ProgressableDirectory _localTmpDir;
+  private String _deletedRowId;
+  private Configuration _configuration;
+
+  public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName) throws IOException {
+    _configuration = configuration;
+    _documentBufferStrategy = BlurOutputFormat.getDocumentBufferStrategy(_configuration);
+    _indexLocally = BlurOutputFormat.isIndexLocally(_configuration);
+    _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(_configuration);
+
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+    int shardCount = tableDescriptor.getShardCount();
+    int shardId = attemptId % shardCount;
+
+    Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
+    String shardName = ShardUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+    Path indexPath = new Path(tableOutput, shardName);
+    _newIndex = new Path(indexPath, tmpDirName);
+    _finalDir = new ProgressableDirectory(new HdfsDirectory(_configuration, _newIndex), getProgressable());
+    _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
+
+    TableContext tableContext = TableContext.create(tableDescriptor);
+    _fieldManager = tableContext.getFieldManager();
+    Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
+
+    _conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, analyzer);
+    _conf.setCodec(new Blur024Codec());
+    _conf.setSimilarity(tableContext.getSimilarity());
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+
+    _overFlowConf = _conf.clone();
+
+    if (_indexLocally) {
+      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+      _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+      SimpleFSDirectory directory = new SimpleFSDirectory(_localPath);
+      _localDir = new ProgressableDirectory(directory, getProgressable());
+      _writer = new IndexWriter(_localDir, _conf.clone());
+    } else {
+      _localPath = null;
+      _localDir = null;
+      _writer = new IndexWriter(_finalDir, _conf.clone());
+    }
+  }
+
+  private Progressable getProgressable() {
+    final Progressable prg = BlurOutputFormat.getProgressable();
+    return new Progressable() {
+
+      private Progressable _progressable = prg;
+      private long _lastWarn = 0;
+      private boolean _progressSetupLogged = false;
+
+      @Override
+      public void progress() {
+        if (_progressable != null) {
+          _progressable.progress();
+          if (!_progressSetupLogged) {
+            LOG.info("Progress has been setup correctly.");
+            _progressSetupLogged = true;
+          }
+        } else {
+          Progressable progressable = BlurOutputFormat.getProgressable();
+          if (progressable != null) {
+            _progressable = progressable;
+          } else {
+            long now = System.nanoTime();
+            if (_lastWarn + TimeUnit.SECONDS.toNanos(10) < now) {
+              LOG.warn("Progress not being reported.");
+              _lastWarn = System.nanoTime();
+            }
+          }
+        }
+      }
+    };
+  }
+
+  public void write(Text key, BlurMutate value) throws IOException {
+    if (!_countersSetup) {
+      setupCounter();
+      _countersSetup = true;
+    }
+    if (!_prevKey.equals(key)) {
+      flush();
+      _prevKey.set(key);
+    }
+    add(value);
+  }
+
+  private void setupCounter() {
+    GetCounter getCounter = BlurOutputFormat.getGetCounter();
+    if (getCounter != null) {
+      _fieldCount = getCounter.getCounter(BlurCounters.LUCENE_FIELD_COUNT);
+      _columnCount = getCounter.getCounter(BlurCounters.COLUMN_COUNT);
+      _recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
+      _recordDuplicateCount = getCounter.getCounter(BlurCounters.RECORD_DUPLICATE_COUNT);
+      _rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
+      _rowDeleteCount = getCounter.getCounter(BlurCounters.ROW_DELETE_COUNT);
+      _rowOverFlowCount = getCounter.getCounter(BlurCounters.ROW_OVERFLOW_COUNT);
+      _recordRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.RECORD_RATE));
+      _rowRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.ROW_RATE));
+      _copyRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.COPY_RATE));
+    }
+  }
+
+  private void add(BlurMutate value) throws IOException {
+    BlurRecord blurRecord = value.getRecord();
+    Record record = getRecord(blurRecord);
+    String recordId = record.getRecordId();
+    if (value.getMutateType() == MUTATE_TYPE.DELETE) {
+      _deletedRowId = blurRecord.getRowId();
+      return;
+    }
+    if (_countersSetup) {
+      _columnCount.increment(record.getColumns().size());
+    }
+    List<Field> document = RowDocumentUtil.getDoc(_fieldManager, blurRecord.getRowId(), record);
+    List<Field> dup = _documentBufferStrategy.add(recordId, document);
+    if (_countersSetup) {
+      if (dup != null) {
+        _recordDuplicateCount.increment(1);
+      } else {
+        _fieldCount.increment(document.size());
+        _recordCount.increment(1);
+      }
+    }
+    flushToTmpIndexIfNeeded();
+  }
+
+  private void flushToTmpIndexIfNeeded() throws IOException {
+    if (_documentBufferStrategy.isFull()) {
+      LOG.info("Document Buffer is full overflow to disk.");
+      flushToTmpIndex();
+    }
+  }
+
+  private void flushToTmpIndex() throws IOException {
+    if (_documentBufferStrategy.isEmpty()) {
+      return;
+    }
+    _usingLocalTmpindex = true;
+    if (_localTmpWriter == null) {
+      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+      _localTmpPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+      SimpleFSDirectory directory = new SimpleFSDirectory(_localTmpPath);
+      _localTmpDir = new ProgressableDirectory(directory, getProgressable());
+      _localTmpWriter = new IndexWriter(_localTmpDir, _overFlowConf.clone());
+      // The local tmp writer has merging disabled so the first document in is
+      // going to be doc 0.
+      // Therefore the first document added is the prime doc
+      List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
+      for (List<Field> doc : docs) {
+        _localTmpWriter.addDocument(doc);
+      }
+    } else {
+      List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
+      for (List<Field> doc : docs) {
+        _localTmpWriter.addDocument(doc);
+      }
+    }
+  }
+
+  private void resetLocalTmp() {
+    _usingLocalTmpindex = false;
+    _localTmpWriter = null;
+    _localTmpDir = null;
+    rm(_localTmpPath);
+    _localTmpPath = null;
+  }
+
+  public static Record getRecord(BlurRecord value) {
+    Record record = new Record();
+    record.setRecordId(value.getRecordId());
+    record.setFamily(value.getFamily());
+    for (BlurColumn col : value.getColumns()) {
+      record.addToColumns(new Column(col.getName(), col.getValue()));
+    }
+    return record;
+  }
+
+  private void flush() throws CorruptIndexException, IOException {
+    if (_usingLocalTmpindex) {
+      // since we have flushed to disk then we do not need to index the
+      // delete.
+      flushToTmpIndex();
+      _localTmpWriter.close(false);
+      DirectoryReader reader = DirectoryReader.open(_localTmpDir);
+      AtomicReader atomicReader = SlowCompositeReaderWrapper.wrap(reader);
+      AtomicReader primeDocAtomicReader = PrimeDocOverFlowHelper.addPrimeDoc(atomicReader);
+      if (_countersSetup) {
+        _recordRateCounter.mark(reader.numDocs());
+      }
+      _writer.addIndexes(primeDocAtomicReader);
+      primeDocAtomicReader.close();
+      resetLocalTmp();
+      _writer.maybeMerge();
+      if (_countersSetup) {
+        _rowOverFlowCount.increment(1);
+      }
+    } else {
+      if (_documentBufferStrategy.isEmpty()) {
+        if (_deletedRowId != null) {
+          _writer.addDocument(getDeleteDoc());
+          if (_countersSetup) {
+            _rowDeleteCount.increment(1);
+          }
+        }
+      } else {
+        List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
+        docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+        _writer.addDocuments(docs);
+        if (_countersSetup) {
+          _recordRateCounter.mark(docs.size());
+        }
+      }
+    }
+    _deletedRowId = null;
+    if (_countersSetup) {
+      _rowRateCounter.mark();
+      _rowCount.increment(1);
+    }
+  }
+
+  private Document getDeleteDoc() {
+    Document document = new Document();
+    document.add(new StringField(BlurConstants.ROW_ID, _deletedRowId, Store.NO));
+    document.add(new StringField(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE, Store.NO));
+    return document;
+  }
+
+  public void close() throws IOException {
+    flush();
+    _writer.close();
+    if (_countersSetup) {
+      _recordRateCounter.close();
+      _rowRateCounter.close();
+    }
+    if (_indexLocally) {
+      if (_optimizeInFlight) {
+        copyAndOptimizeInFlightDir();
+      } else {
+        copyDir();
+      }
+    }
+    if (_countersSetup) {
+      _copyRateCounter.close();
+    }
+  }
+
+  private void copyAndOptimizeInFlightDir() throws IOException {
+    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+    copyRateDirectory.setLockFactory(NoLockFactory.getNoLockFactory());
+    DirectoryReader reader = DirectoryReader.open(_localDir);
+    IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
+    writer.addIndexes(reader);
+    writer.close();
+    rm(_localPath);
+  }
+
+  private void copyDir() throws IOException {
+    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+    String[] fileNames = _localDir.listAll();
+    for (String fileName : fileNames) {
+      LOG.info("Copying [{0}] to [{1}]", fileName, _newIndex);
+      _localDir.copy(copyRateDirectory, fileName, fileName, IOContext.DEFAULT);
+    }
+    rm(_localPath);
+  }
+
+  private void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
new file mode 100644
index 0000000..6814ac2
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
@@ -0,0 +1,29 @@
+package org.apache.blur.mapreduce.lib;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * 
+ */
+public interface GetCounter {
+  
+  Counter getCounter(Enum<?> counterName);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
new file mode 100644
index 0000000..46c030f
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
@@ -0,0 +1,58 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class IOUtil {
+
+  public static final String UTF_8 = "UTF-8";
+
+  public static String readString(DataInput input) throws IOException {
+    int length = readVInt(input);
+    byte[] buffer = new byte[length];
+    input.readFully(buffer);
+    return new String(buffer, UTF_8);
+  }
+
+  public static void writeString(DataOutput output, String s) throws IOException {
+    byte[] bs = s.getBytes(UTF_8);
+    writeVInt(output, bs.length);
+    output.write(bs);
+  }
+
+  public static int readVInt(DataInput input) throws IOException {
+    byte b = input.readByte();
+    int i = b & 0x7F;
+    for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+      b = input.readByte();
+      i |= (b & 0x7F) << shift;
+    }
+    return i;
+  }
+
+  public static void writeVInt(DataOutput output, int i) throws IOException {
+    while ((i & ~0x7F) != 0) {
+      output.writeByte((byte) ((i & 0x7f) | 0x80));
+      i >>>= 7;
+    }
+    output.writeByte((byte) i);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/NullCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/NullCounter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/NullCounter.java
new file mode 100644
index 0000000..af39539
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/NullCounter.java
@@ -0,0 +1,68 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class NullCounter {
+
+  public static Counter getNullCounter() {
+    try {
+      // Try hadoop 2 version
+      // Class<?> counterClass =
+      // Class.forName("org.apache.hadoop.mapreduce.Counter");
+      // if (counterClass.isInterface()) {
+      // return (Counter) Proxy.newProxyInstance(Counter.class.getClassLoader(),
+      // new Class[] { Counter.class },
+      // handler);
+      // }
+      Class<?> clazz1 = Class.forName("org.apache.hadoop.mapreduce.counters.GenericCounter");
+      return (Counter) clazz1.newInstance();
+    } catch (ClassNotFoundException e1) {
+      // Try hadoop 1 version
+      try {
+        Class<?> clazz2 = Class.forName("org.apache.hadoop.mapreduce.Counter");
+        Constructor<?> constructor = clazz2.getDeclaredConstructor(new Class[] {});
+        constructor.setAccessible(true);
+        return (Counter) constructor.newInstance(new Object[] {});
+      } catch (ClassNotFoundException e2) {
+        throw new RuntimeException(e2);
+      } catch (NoSuchMethodException e2) {
+        throw new RuntimeException(e2);
+      } catch (SecurityException e2) {
+        throw new RuntimeException(e2);
+      } catch (InstantiationException e2) {
+        throw new RuntimeException(e2);
+      } catch (IllegalAccessException e2) {
+        throw new RuntimeException(e2);
+      } catch (IllegalArgumentException e2) {
+        throw new RuntimeException(e2);
+      } catch (InvocationTargetException e2) {
+        throw new RuntimeException(e2);
+      }
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
new file mode 100644
index 0000000..672a1c1
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/PrimeDocOverFlowHelper.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+
+import org.apache.blur.utils.BlurConstants;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.FilterAtomicReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.ParallelAtomicReader;
+import org.apache.lucene.index.StoredFieldVisitor;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.Version;
+
+public class PrimeDocOverFlowHelper {
+
+  private static Directory _directory;
+
+  static {
+    try {
+      _directory = new RAMDirectory();
+      IndexWriter writer = new IndexWriter(_directory, new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer()));
+      Document document = new Document();
+      document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+      writer.addDocument(document);
+      writer.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static AtomicReader addPrimeDoc(AtomicReader atomicReader) throws IOException {
+    AtomicReaderContext context = DirectoryReader.open(_directory).leaves().get(0);
+    return new ParallelAtomicReader(true, setDocSize(context.reader(), atomicReader.maxDoc()), atomicReader);
+  }
+
+  private static AtomicReader setDocSize(AtomicReader reader, final int count) {
+    return new FilterAtomicReader(reader) {
+      @Override
+      public Bits getLiveDocs() {
+        return new Bits() {
+          @Override
+          public boolean get(int index) {
+            return true;
+          }
+
+          @Override
+          public int length() {
+            return count;
+          }
+        };
+      }
+
+      @Override
+      public int numDocs() {
+        return count;
+      }
+
+      @Override
+      public int maxDoc() {
+        return count;
+      }
+
+      @Override
+      public void document(int docID, StoredFieldVisitor visitor) throws IOException {
+        // Do nothing
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
new file mode 100644
index 0000000..15700bc
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
@@ -0,0 +1,223 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.BufferedIndexOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+/**
+ * The {@link ProgressableDirectory} allows for progress to be recorded while
+ * Lucene is blocked and merging. This prevents the Task from being killed after
+ * not reporting progress because of the blocked merge.
+ */
+public class ProgressableDirectory extends Directory {
+
+  private static final Log LOG = LogFactory.getLog(ProgressableDirectory.class);
+
+  private final Directory _directory;
+  private final Progressable _progressable;
+
+  public ProgressableDirectory(Directory directory, Progressable progressable) {
+    _directory = directory;
+    if (progressable == null) {
+      LOG.warn("Progressable is null.");
+      _progressable = new Progressable() {
+        @Override
+        public void progress() {
+
+        }
+      };
+    } else {
+      _progressable = progressable;
+    }
+  }
+
+  @Override
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+  private IndexInput wrapInput(String name, IndexInput openInput) {
+    return new ProgressableIndexInput(name, openInput, 16384, _progressable);
+  }
+
+  private IndexOutput wrapOutput(IndexOutput createOutput) {
+    return new ProgressableIndexOutput(createOutput, _progressable);
+  }
+
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return wrapOutput(_directory.createOutput(name, context));
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return _directory.equals(obj);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  @Override
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  @Override
+  public int hashCode() {
+    return _directory.hashCode();
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  @Override
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return wrapInput(name, _directory.openInput(name, context));
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  @Override
+  public String toString() {
+    return _directory.toString();
+  }
+
+  static class ProgressableIndexOutput extends BufferedIndexOutput {
+
+    private Progressable _progressable;
+    private IndexOutput _indexOutput;
+
+    public ProgressableIndexOutput(IndexOutput indexOutput, Progressable progressable) {
+      _indexOutput = indexOutput;
+      _progressable = progressable;
+    }
+
+    @Override
+    protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
+      _indexOutput.writeBytes(b, offset, len);
+      _progressable.progress();
+    }
+
+    @Override
+    public long length() throws IOException {
+      return _indexOutput.length();
+    }
+
+    @Override
+    public void close() throws IOException {
+      super.close();
+      _indexOutput.close();
+      _progressable.progress();
+    }
+
+  }
+
+  static class ProgressableIndexInput extends BufferedIndexInput {
+
+    private IndexInput _indexInput;
+    private final long _length;
+    private Progressable _progressable;
+
+    ProgressableIndexInput(String name, IndexInput indexInput, int buffer, Progressable progressable) {
+      super("ProgressableIndexInput(" + indexInput.toString() + ")", buffer);
+      _indexInput = indexInput;
+      _length = indexInput.length();
+      _progressable = progressable;
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      long filePointer = getFilePointer();
+      if (filePointer != _indexInput.getFilePointer()) {
+        _indexInput.seek(filePointer);
+      }
+      _indexInput.readBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      _indexInput.close();
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    public ProgressableIndexInput clone() {
+      ProgressableIndexInput clone = (ProgressableIndexInput) super.clone();
+      clone._indexInput = (IndexInput) _indexInput.clone();
+      return clone;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
new file mode 100644
index 0000000..777cb14
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
@@ -0,0 +1,59 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * This turns a standard hadoop counter into a rate counter.
+ */
+public class RateCounter {
+
+  private final Counter _counter;
+  private final long _reportTime;
+  private long _lastReport;
+  private long _count = 0;
+
+  public RateCounter(Counter counter) {
+    _counter = counter;
+    _lastReport = System.nanoTime();
+    _reportTime = TimeUnit.SECONDS.toNanos(5);
+  }
+
+  public void mark() {
+    mark(1l);
+  }
+
+  public void mark(long n) {
+    long now = System.nanoTime();
+    if (_lastReport + _reportTime < now) {
+      long seconds = TimeUnit.NANOSECONDS.toSeconds(now - _lastReport);
+      long rate = _count / seconds;
+      _counter.setValue(rate);
+      _lastReport = System.nanoTime();
+      _count = 0;
+    }
+    _count += n;
+  }
+
+  public void close() {
+    _counter.setValue(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
new file mode 100644
index 0000000..22d2b56
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
@@ -0,0 +1,238 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.GCWatcher;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurOutputFormatMiniClusterTest {
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem fileSystem;
+  private static Path TEST_ROOT_DIR;
+  private static MiniCluster miniCluster;
+  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
+  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir",
+      "./target/tmp_BlurOutputFormatMiniClusterTest"));
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    GCWatcher.init(0.60);
+    BlurOutputFormatTest.setupJavaHome();
+    LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
+    File testDirectory = new File(TMPDIR, "blur-cluster-test").getAbsoluteFile();
+    testDirectory.mkdirs();
+
+    Path directory = new Path(testDirectory.getPath());
+    FsPermission dirPermissions = localFS.getFileStatus(directory).getPermission();
+    FsAction userAction = dirPermissions.getUserAction();
+    FsAction groupAction = dirPermissions.getGroupAction();
+    FsAction otherAction = dirPermissions.getOtherAction();
+
+    StringBuilder builder = new StringBuilder();
+    builder.append(userAction.ordinal());
+    builder.append(groupAction.ordinal());
+    builder.append(otherAction.ordinal());
+    String dirPermissionNum = builder.toString();
+    System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
+    testDirectory.delete();
+    miniCluster = new MiniCluster();
+    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true, false);
+
+    // System.setProperty("test.build.data",
+    // "./target/BlurOutputFormatTest/data");
+    // TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+    // "target/tmp/BlurOutputFormatTest_tmp"));
+    TEST_ROOT_DIR = new Path(miniCluster.getFileSystemUri().toString() + "/blur_test");
+    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
+    try {
+      fileSystem = TEST_ROOT_DIR.getFileSystem(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+
+    FileSystem.setDefaultUri(conf, miniCluster.getFileSystemUri());
+
+    miniCluster.startMrMiniCluster();
+    conf = miniCluster.getMRConfiguration();
+
+    BufferStore.initNewBuffer(128, 128 * 128);
+  }
+
+  @AfterClass
+  public static void teardown() throws IOException {
+    if (miniCluster != null) {
+      miniCluster.stopMrMiniCluster();
+    }
+    miniCluster.shutdownBlurCluster();
+    rm(new File("build"));
+  }
+
+  private static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Before
+  public void setup() {
+    TableContext.clear();
+  }
+
+  @Test
+  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException, BlurException,
+      TException {
+    fileSystem.delete(inDir, true);
+    String tableName = "testBlurOutputFormat";
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatMiniClusterTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/blur/" + tableName).makeQualified(fileSystem.getUri(),
+        fileSystem.getWorkingDirectory()).toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName(tableName);
+
+    Iface client = getClient();
+    client.createTable(tableDescriptor);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
+
+    Path tablePath = new Path(tableUri);
+    Path shardPath = new Path(tablePath, ShardUtil.getShardName(0));
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath);
+    assertEquals(3, listStatus.length);
+    System.out.println("======" + listStatus.length);
+    for (FileStatus fileStatus : listStatus) {
+      System.out.println(fileStatus.getPath());
+    }
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    client.loadData(tableName, output.toString());
+
+    while (true) {
+      TableStats tableStats = client.tableStats(tableName);
+      System.out.println(tableStats);
+      if (tableStats.getRowCount() > 0) {
+        break;
+      }
+      Thread.sleep(100);
+    }
+
+    assertTrue(fileSystem.exists(tablePath));
+    assertFalse(fileSystem.isFile(tablePath));
+
+    FileStatus[] listStatusAfter = fileSystem.listStatus(shardPath);
+
+    assertEquals(11, listStatusAfter.length);
+
+  }
+
+  private Iface getClient() {
+    return BlurClient.getClient(miniCluster.getControllerConnectionStr());
+  }
+
+  public static String readFile(String name) throws IOException {
+    DataInputStream f = fileSystem.open(new Path(TEST_ROOT_DIR + "/" + name));
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+      result.append(line);
+      result.append('\n');
+      line = b.readLine();
+    }
+    b.close();
+    return result.toString();
+  }
+
+  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
+      int numberOfRecords, String family) throws IOException {
+    // "1,1,cf1,val1"
+    Path file = new Path(TEST_ROOT_DIR + "/" + name);
+    fileSystem.delete(file, false);
+    DataOutputStream f = fileSystem.create(file);
+    PrintWriter writer = new PrintWriter(f);
+    for (int row = 0; row < numberOfRows; row++) {
+      for (int record = 0; record < numberOfRecords; record++) {
+        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
+      }
+    }
+    writer.close();
+    return file;
+  }
+
+  private String getRecord(int rowId, int recordId, String family) {
+    return rowId + "," + recordId + "," + family + ",valuetoindex";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
new file mode 100644
index 0000000..c3ab723
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -0,0 +1,465 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.Collection;
+import java.util.TreeSet;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.ShardUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.lucene.index.DirectoryReader;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurOutputFormatTest {
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  private static Path TEST_ROOT_DIR;
+  private static MiniCluster miniCluster;
+  private Path outDir = new Path(TEST_ROOT_DIR + "/out");
+  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    setupJavaHome();
+    File file = new File("./target/tmp/BlurOutputFormatTest_tmp");
+    String pathStr = file.getAbsoluteFile().toURI().toString();
+    System.setProperty("test.build.data", pathStr + "/data");
+    System.setProperty("hadoop.log.dir", pathStr + "/hadoop_log");
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+    TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", pathStr));
+
+    URI uri = new URI("file:///");
+    FileSystem.setDefaultUri(conf, uri);
+    
+    miniCluster = new MiniCluster();
+    miniCluster.startMrMiniCluster(uri.toString());
+    conf = miniCluster.getMRConfiguration();
+
+    BufferStore.initNewBuffer(128, 128 * 128);
+  }
+
+  public static void setupJavaHome() {
+    String str = System.getenv("JAVA_HOME");
+    if (str == null) {
+      String property = System.getProperty("java.home");
+      if (property != null) {
+        throw new RuntimeException("JAVA_HOME not set should probably be [" + property + "].");
+      }
+      throw new RuntimeException("JAVA_HOME not set.");
+    }
+  }
+
+  @AfterClass
+  public static void teardown() throws IOException {
+    if (miniCluster != null) {
+      miniCluster.stopMrMiniCluster();
+    }
+    rm(new File("build"));
+  }
+
+  private static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Before
+  public void setup() {
+    TableContext.clear();
+  }
+
+  @Test
+  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException {
+    localFs.delete(inDir, true);
+    localFs.delete(outDir, true);
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    Path path = new Path(output, ShardUtil.getShardName(0));
+    dump(path, conf);
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+    assertEquals(2, reader.numDocs());
+    reader.close();
+  }
+
+  private void dump(Path path, Configuration conf) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    System.out.println(path);
+    if (!fileSystem.isFile(path)) {
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fileStatus : listStatus) {
+        dump(fileStatus.getPath(), conf);
+      }
+    }
+  }
+
+  private Collection<Path> getCommitedTasks(Path path) throws IOException {
+    Collection<Path> result = new TreeSet<Path>();
+    FileSystem fileSystem = path.getFileSystem(conf);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus fileStatus : listStatus) {
+      Path p = fileStatus.getPath();
+      if (fileStatus.isDir() && p.getName().endsWith(".commit")) {
+        result.add(p);
+      }
+    }
+    return result;
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
+    BlurOutputFormat.setIndexLocally(job, true);
+    BlurOutputFormat.setOptimizeInFlight(job, false);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    Path path = new Path(output, ShardUtil.getShardName(0));
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+    assertEquals(80000, reader.numDocs());
+    reader.close();
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 2);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
+    BlurOutputFormat.setIndexLocally(job, false);
+    BlurOutputFormat.setDocumentBufferStrategy(job, DocumentBufferStrategyHeapSize.class);
+    BlurOutputFormat.setMaxDocumentBufferHeapSize(job, 128 * 1024);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(output, ShardUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertEquals(1, commitedTasks.size());
+
+      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+      total += reader.numDocs();
+      reader.close();
+    }
+    assertEquals(80000, total);
+
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersWithReduceMultiplierTest() throws IOException,
+      InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(7);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 7);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
+    int multiple = 2;
+    BlurOutputFormat.setReducerMultiplier(job, multiple);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(output, ShardUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertTrue(multiple >= commitedTasks.size());
+      for (Path p : commitedTasks) {
+        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, p));
+        total += reader.numDocs();
+        reader.close();
+      }
+    }
+    assertEquals(80000, total);
+
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlurOutputFormatValidateReducerCount() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 1);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
+    BlurOutputFormat.setReducerMultiplier(job, 2);
+    job.setNumReduceTasks(4);
+    job.submit();
+
+  }
+
+  // @TODO this test to fail sometimes due to issues in the MR MiniCluster
+  // @Test
+  public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 5000, 2000, 100, "cf1"); // 100 * 5000 =
+                                                             // 500,000
+
+    Job job = Job.getInstance(conf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setName("test");
+
+    createShardDirectories(outDir, 2);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
+    BlurOutputFormat.setIndexLocally(job, false);
+
+    job.submit();
+    boolean killCalled = false;
+    while (!job.isComplete()) {
+      Thread.sleep(1000);
+      System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress() * 100,
+          job.reduceProgress() * 100);
+      if (job.reduceProgress() > 0.7 && !killCalled) {
+        job.killJob();
+        killCalled = true;
+      }
+    }
+
+    assertFalse(job.isSuccessful());
+
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(output, ShardUtil.getShardName(i));
+      FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      assertEquals(toString(listStatus), 0, listStatus.length);
+    }
+  }
+
+  private String toString(FileStatus[] listStatus) {
+    if (listStatus == null || listStatus.length == 0) {
+      return "";
+    }
+    String s = "";
+    for (FileStatus fileStatus : listStatus) {
+      if (s.length() > 0) {
+        s += ",";
+      }
+      s += fileStatus.getPath();
+    }
+    return s;
+  }
+
+  public static String readFile(String name) throws IOException {
+    DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+      result.append(line);
+      result.append('\n');
+      line = b.readLine();
+    }
+    b.close();
+    return result.toString();
+  }
+
+  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
+      int numberOfRecords, String family) throws IOException {
+    // "1,1,cf1,val1"
+    Path file = new Path(TEST_ROOT_DIR + "/" + name);
+    localFs.delete(file, false);
+    DataOutputStream f = localFs.create(file);
+    PrintWriter writer = new PrintWriter(f);
+    for (int row = 0; row < numberOfRows; row++) {
+      for (int record = 0; record < numberOfRecords; record++) {
+        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
+      }
+    }
+    writer.close();
+    return file;
+  }
+
+  private void createShardDirectories(Path outDir, int shardCount) throws IOException {
+    localFs.mkdirs(outDir);
+    for (int i = 0; i < shardCount; i++) {
+      localFs.mkdirs(new Path(outDir, ShardUtil.getShardName(i)));
+    }
+  }
+
+  private String getRecord(int rowId, int recordId, String family) {
+    return rowId + "," + recordId + "," + family + ",valuetoindex";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
new file mode 100644
index 0000000..340d2b3
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.mapreduce.lib.CsvBlurDriver.ControllerPool;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CsvBlurDriverTest {
+
+  protected String tableUri = "file:///tmp/tmppath";
+  protected int shardCount = 13;
+
+  @Before
+  public void setup() throws IOException {
+    Configuration configuration = new Configuration();
+    Path path1 = new Path("file:///tmp/test1");
+    Path path2 = new Path("file:///tmp/test2");
+    FileSystem fileSystem = path1.getFileSystem(configuration);
+    fileSystem.mkdirs(path1);
+    fileSystem.mkdirs(path2);
+  }
+
+  @Test
+  public void testCsvBlurDriverTestFail1() throws Exception {
+    Configuration configuration = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return null;
+      }
+    };
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, ref, new String[] {}));
+  }
+
+  @Test
+  public void testCsvBlurDriverTest() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
+        "file:///tmp/test2");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+  }
+
+  @Test
+  public void testCsvBlurDriverTest2() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
+        "file:///tmp/test2", "-S", "-C", "1000000", "2000000");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+  }
+
+  @Test
+  public void testCsvBlurDriverTest3() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
+        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+    assertEquals("true", configuration.get(CsvBlurDriver.MAPRED_COMPRESS_MAP_OUTPUT));
+    assertEquals(SnappyCodec.class.getName(), configuration.get(CsvBlurDriver.MAPRED_MAP_OUTPUT_COMPRESSION_CODEC));
+  }
+
+  @Test
+  public void multiplierParamShouldIncreaseReduceTasks() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    int multiplierParam = 10;
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
+        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY", "-r", Integer.toString(multiplierParam));
+    assertNotNull(job);
+
+    assertEquals(multiplierParam * shardCount, job.getNumReduceTasks());
+  }
+
+  protected Iface getMockIface() {
+    InvocationHandler handler = new InvocationHandler() {
+
+      @Override
+      public Object invoke(Object o, Method method, Object[] args) throws Throwable {
+        if (method.getName().equals("describe")) {
+          TableDescriptor tableDescriptor = new TableDescriptor();
+          tableDescriptor.setName((String) args[0]);
+          tableDescriptor.setTableUri(tableUri);
+          tableDescriptor.setShardCount(shardCount);
+          return tableDescriptor;
+        }
+        throw new RuntimeException("not implemented.");
+      }
+    };
+    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class }, handler);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
new file mode 100644
index 0000000..47aa8e5
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
@@ -0,0 +1,108 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CsvBlurMapperTest {
+
+  private MapDriver<Writable, Text, Text, BlurMutate> _mapDriver;
+  private CsvBlurMapper _mapper;
+
+  @Before
+  public void setUp() throws IOException {
+    _mapper = new CsvBlurMapper();
+    _mapDriver = MapDriver.newMapDriver(_mapper);
+  }
+
+  @Test
+  public void testMapperWithFamilyInData() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+
+  @Test
+  public void testMapperFamilyPerPath() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+  
+  @Test
+  public void testMapperAutoGenerateRecordId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "-25nqln3n2vb4cayex9y9tpxx3", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+  
+  @Test
+  public void testMapperAutoGenerateRowId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("record1,value1,value2"));
+    _mapDriver.withOutput(new Text("-50b4uzohynr7j7s9pve7ytz66"), new BlurMutate(MUTATE_TYPE.REPLACE, "-50b4uzohynr7j7s9pve7ytz66", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+  
+  @Test
+  public void testMapperAutoGenerateRowIdAndRecordId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("value1,value2"));
+    _mapDriver.withOutput(new Text("5q0tme15ph3h5pns8sv3u5wy2"), new BlurMutate(MUTATE_TYPE.REPLACE, "5q0tme15ph3h5pns8sv3u5wy2", "5q0tme15ph3h5pns8sv3u5wy2", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 7cbc371..97707df 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -36,6 +36,11 @@ under the License.
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-mapred</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
 			<artifactId>blur-shell</artifactId>
 			<version>${project.version}</version>
 		</dependency>
@@ -95,13 +100,6 @@ under the License.
 					<name>hadoop1</name>
 				</property>
 			</activation>
-			<dependencies>
-				<dependency>
-					<groupId>org.apache.blur</groupId>
-					<artifactId>blur-mapred-hadoop1</artifactId>
-					<version>${project.version}</version>
-				</dependency>
-			</dependencies>
 			<properties>
                 <bin.assembly.file>src/assemble/bin-hadoop1.xml</bin.assembly.file>
 			</properties>
@@ -113,13 +111,6 @@ under the License.
 					<name>hadoop2-mr1</name>
 				</property>
 			</activation>
-			<dependencies>
-				<dependency>
-					<groupId>org.apache.blur</groupId>
-					<artifactId>blur-mapred-hadoop1</artifactId>
-					<version>${project.version}</version>
-				</dependency>
-			</dependencies>
 			<properties>
                 <bin.assembly.file>src/assemble/bin-hadoop2.xml</bin.assembly.file>
 			</properties>
@@ -131,13 +122,6 @@ under the License.
 					<name>hadoop2</name>
 				</property>
 			</activation>
-			<dependencies>
-				<dependency>
-					<groupId>org.apache.blur</groupId>
-					<artifactId>blur-mapred-hadoop2</artifactId>
-					<version>${project.version}</version>
-				</dependency>
-			</dependencies>
 			<properties>
                 <bin.assembly.file>src/assemble/bin-hadoop2.xml</bin.assembly.file>
 			</properties>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fa41356/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0d43a28..ec4766a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -508,8 +508,7 @@ under the License.
 				<module>blur-thrift</module>
 				<module>blur-query</module>
 				<module>blur-store</module>
-				<module>blur-mapred-common</module>
-				<module>blur-mapred-hadoop1</module>
+				<module>blur-mapred</module>
 				<module>blur-util</module>
 				<module>blur-status</module>
 				<module>blur-shell</module>
@@ -535,8 +534,7 @@ under the License.
 				<module>blur-thrift</module>
 				<module>blur-query</module>
 				<module>blur-store</module>
-				<module>blur-mapred-common</module>
-				<module>blur-mapred-hadoop1</module>
+				<module>blur-mapred</module>
 				<module>blur-util</module>
 				<module>blur-status</module>
 				<module>blur-shell</module>
@@ -562,8 +560,7 @@ under the License.
 				<module>blur-thrift</module>
 				<module>blur-query</module>
 				<module>blur-store</module>
-				<module>blur-mapred-common</module>
-				<module>blur-mapred-hadoop2</module>
+				<module>blur-mapred</module>
 				<module>blur-util</module>
 				<module>blur-status</module>
 				<module>blur-shell</module>


Mime
View raw message