incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [30/92] [abbrv] [partial] Fixed BLUR-126.
Date Tue, 11 Jun 2013 02:41:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
new file mode 100644
index 0000000..7c12a76
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
@@ -0,0 +1,178 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.utils.ReaderBlurRecord;
+import org.apache.hadoop.io.Writable;
+
+public class BlurRecord implements Writable, ReaderBlurRecord {
+
+  private String _rowId;
+  private String _recordId;
+  private String _family;
+
+  private List<BlurColumn> _columns = new ArrayList<BlurColumn>();
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _rowId = IOUtil.readString(in);
+    _recordId = IOUtil.readString(in);
+    _family = IOUtil.readString(in);
+    int size = IOUtil.readVInt(in);
+    _columns.clear();
+    for (int i = 0; i < size; i++) {
+      BlurColumn column = new BlurColumn();
+      column.readFields(in);
+      _columns.add(column);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeString(out, _rowId);
+    IOUtil.writeString(out, _recordId);
+    IOUtil.writeString(out, _family);
+    IOUtil.writeVInt(out, _columns.size());
+    for (BlurColumn column : _columns) {
+      column.write(out);
+    }
+  }
+
+  public String getRowId() {
+    return _rowId;
+  }
+
+  public void setRowId(String rowId) {
+    this._rowId = rowId;
+  }
+
+  public String getRecordId() {
+    return _recordId;
+  }
+
+  public void setRecordId(String recordId) {
+    this._recordId = recordId;
+  }
+
+  public String getFamily() {
+    return _family;
+  }
+
+  public void setFamily(String family) {
+    this._family = family;
+  }
+
+  public List<BlurColumn> getColumns() {
+    return _columns;
+  }
+
+  public void setColumns(List<BlurColumn> columns) {
+    this._columns = columns;
+  }
+
+  public void clearColumns() {
+    _columns.clear();
+  }
+
+  public void addColumn(BlurColumn column) {
+    _columns.add(column);
+  }
+
+  public void addColumn(String name, String value) {
+    BlurColumn blurColumn = new BlurColumn();
+    blurColumn.setName(name);
+    blurColumn.setValue(value);
+    addColumn(blurColumn);
+  }
+
+  @Override
+  public void setRecordIdStr(String value) {
+    setRecordId(value);
+  }
+
+  @Override
+  public void setFamilyStr(String family) {
+    setFamily(family);
+  }
+
+  public void reset() {
+    clearColumns();
+    _rowId = null;
+    _recordId = null;
+    _family = null;
+  }
+
+  @Override
+  public void setRowIdStr(String rowId) {
+    setRowId(rowId);
+  }
+
+  @Override
+  public String toString() {
+    return "{rowId=" + _rowId + ", recordId=" + _recordId + ", family=" + _family + ", columns=" + _columns + "}";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_columns == null) ? 0 : _columns.hashCode());
+    result = prime * result + ((_family == null) ? 0 : _family.hashCode());
+    result = prime * result + ((_recordId == null) ? 0 : _recordId.hashCode());
+    result = prime * result + ((_rowId == null) ? 0 : _rowId.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurRecord other = (BlurRecord) obj;
+    if (_columns == null) {
+      if (other._columns != null)
+        return false;
+    } else if (!_columns.equals(other._columns))
+      return false;
+    if (_family == null) {
+      if (other._family != null)
+        return false;
+    } else if (!_family.equals(other._family))
+      return false;
+    if (_recordId == null) {
+      if (other._recordId != null)
+        return false;
+    } else if (!_recordId.equals(other._recordId))
+      return false;
+    if (_rowId == null) {
+      if (other._rowId != null)
+        return false;
+    } else if (!_rowId.equals(other._rowId))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
new file mode 100644
index 0000000..5f4fec6
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
@@ -0,0 +1,90 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+
+public abstract class BlurRecordReader extends RecordReader<Text, BlurRecord> {
+
+//  private IndexReader reader;
+//  private Directory directory;
+//  private int startingDocId;
+//  private int endingDocId;
+//  private int position;
+//  private Text rowid = new Text();
+//  private BlurRecord record = new BlurRecord();
+//
+//  @Override
+//  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+//    BlurInputSplit blurSplit = (BlurInputSplit) split;
+//    Path path = blurSplit.getIndexPath();
+//    String segmentName = blurSplit.getSegmentName();
+//    startingDocId = blurSplit.getStartingDocId();
+//    endingDocId = blurSplit.getEndingDocId();
+//    directory = new HdfsDirectory(context.getConfiguration(), path);
+//
+//    IndexCommit commit = Utils.findLatest(directory);
+//    reader = Utils.openSegmentReader(directory, commit, segmentName, Utils.getTermInfosIndexDivisor(context.getConfiguration()));
+//    int maxDoc = reader.maxDoc();
+//    if (endingDocId >= maxDoc) {
+//      endingDocId = maxDoc - 1;
+//    }
+//    position = startingDocId - 1;
+//  }
+//
+//  @Override
+//  public boolean nextKeyValue() throws IOException, InterruptedException {
+//    do {
+//      position++;
+//      if (position > endingDocId) {
+//        return false;
+//      }
+//    } while (reader.isDeleted(position));
+//    readDocument();
+//    return true;
+//  }
+//
+//  private void readDocument() throws CorruptIndexException, IOException {
+//    Document document = reader.document(position);
+//    record.reset();
+//    rowid.set(RowDocumentUtil.readRecord(document, record));
+//  }
+//
+//  @Override
+//  public Text getCurrentKey() throws IOException, InterruptedException {
+//    return rowid;
+//  }
+//
+//  @Override
+//  public BlurRecord getCurrentValue() throws IOException, InterruptedException {
+//    return record;
+//  }
+//
+//  @Override
+//  public float getProgress() throws IOException, InterruptedException {
+//    int total = endingDocId - startingDocId;
+//    return (float) position / (float) total;
+//  }
+//
+//  @Override
+//  public void close() throws IOException {
+//    reader.close();
+//    directory.close();
+//  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
new file mode 100644
index 0000000..886f2d6
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
@@ -0,0 +1,124 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Decorator of Directory to capture the copy rate of a directory copy.
+ */
+public class CopyRateDirectory extends Directory {
+
+  private final Directory _directory;
+  private final RateCounter _copyRateCounter;
+
+  public CopyRateDirectory(Directory dir, RateCounter copyRateCounter) {
+    _directory = dir;
+    _copyRateCounter = copyRateCounter;
+  }
+
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return wrap(_directory.createOutput(name, context));
+  }
+
+  private IndexOutput wrap(IndexOutput output) {
+    return new CopyRateIndexOutput(output, _copyRateCounter);
+  }
+
+  static class CopyRateIndexOutput extends IndexOutput {
+
+    private final IndexOutput _indexOutput;
+    private final RateCounter _copyRateCounter;
+
+    public CopyRateIndexOutput(IndexOutput output, RateCounter copyRateCounter) {
+      _indexOutput = output;
+      _copyRateCounter = copyRateCounter;
+    }
+
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+      _indexOutput.copyBytes(input, numBytes);
+      _copyRateCounter.mark(numBytes);
+    }
+
+    public void writeByte(byte b) throws IOException {
+      _indexOutput.writeByte(b);
+      _copyRateCounter.mark();
+    }
+
+    public void flush() throws IOException {
+      _indexOutput.flush();
+    }
+
+    public void close() throws IOException {
+      _indexOutput.close();
+    }
+
+    public long getFilePointer() {
+      return _indexOutput.getFilePointer();
+    }
+
+    @SuppressWarnings("deprecation")
+    public void seek(long pos) throws IOException {
+      _indexOutput.seek(pos);
+    }
+
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      _indexOutput.writeBytes(b, offset, length);
+      _copyRateCounter.mark(length);
+    }
+
+    public long length() throws IOException {
+      return _indexOutput.length();
+    }
+  }
+
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return _directory.openInput(name, context);
+  }
+
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
new file mode 100644
index 0000000..6ae329b
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
@@ -0,0 +1,65 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class CsvBlurDriver {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException,
+      BlurException, TException {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length != 4) {
+      System.err
+          .println("Usage: csvindexer <thrift controller connection str> <tablename> <column family definitions> <in>");
+      System.exit(2);
+    }
+    int c = 0;
+    final String controllerConnectionStr = otherArgs[c++];
+    final String tableName = otherArgs[c++];
+    final String columnDefs = otherArgs[c++];
+    final String input = otherArgs[c++];
+
+    final Iface client = BlurClient.getClient(controllerConnectionStr);
+    TableDescriptor tableDescriptor = client.describe(tableName);
+
+    Job job = new Job(configuration, "Blur indexer [" + tableName + "] [" + input + "]");
+    job.setJarByClass(CsvBlurDriver.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(input));
+    CsvBlurMapper.setColumns(job, columnDefs);
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+
+    boolean waitForCompletion = job.waitForCompletion(true);
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
new file mode 100644
index 0000000..fbe4253
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
@@ -0,0 +1,73 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class CsvBlurDriverFamilyPerInput {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException,
+      BlurException, TException {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length < 4) {
+      System.err
+          .println("Usage: csvindexer <thrift controller connection str> <tablename> <column family definitions> <family=input path> ...");
+      System.exit(2);
+    }
+    int c = 0;
+    final String controllerConnectionStr = otherArgs[c++];
+    final String tableName = otherArgs[c++];
+    final String columnDefs = otherArgs[c++];
+
+    final Iface client = BlurClient.getClient(controllerConnectionStr);
+    TableDescriptor tableDescriptor = client.describe(tableName);
+
+    Job job = new Job(configuration, "Blur indexer [" + tableName + "] Mulitple Inputs");
+    job.setJarByClass(CsvBlurDriverFamilyPerInput.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    CsvBlurMapper.setColumns(job, columnDefs);
+    CsvBlurMapper.setFamilyNotInFile(job, true);
+
+    for (int i = c; i < otherArgs.length; i++) {
+      final String input = otherArgs[c++];
+      int indexOf = input.indexOf('=');
+      String family = input.substring(0, indexOf);
+      String pathStr = input.substring(indexOf + 1);
+      FileInputFormat.addInputPath(job, new Path(pathStr));
+      CsvBlurMapper.addFamilyPath(job, family, new Path(pathStr));
+    }
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+
+    boolean waitForCompletion = job.waitForCompletion(true);
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
new file mode 100644
index 0000000..e5e73fb
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
@@ -0,0 +1,362 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.google.common.base.Splitter;
+
+/**
+ * This will parse a standard csv file into a {@link BlurMutate} object. Use the
+ * static addColumns, and setSeparator methods to configure the class.
+ */
+public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text> {
+
+  public static final String BLUR_CSV_FAMILYISNOTINFILE = "blur.csv.familyisnotinfile";
+  public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES = "blur.csv.family.path.mappings.families";
+  public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX = "blur.csv.family.path.mappings.family.";
+  public static final String BLUR_CSV_SEPARATOR = "blur.csv.separator";
+  public static final String BLUR_CSV_FAMILY_COLUMN_PREFIX = "blur.csv.family.";
+  public static final String BLUR_CSV_FAMILIES = "blur.csv.families";
+
+  private Map<String, List<String>> columnNameMap;
+  private String separator = ",";
+  private Splitter splitter;
+  private boolean familyNotInFile;
+  private String familyFromPath;
+
+  /**
+   * Add a mapping for a family to a path. This is to be used when an entire
+   * path is to be processed as a single family and the data itself does not
+   * contain the family.<br/>
+   * <br/>
+   * 
+   * NOTE: the familyNotInFile property must be set before this method can be
+   * called.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param family
+   *          the family.
+   * @param path
+   *          the path.
+   */
+  public static void addFamilyPath(Job job, String family, Path path) {
+    addFamilyPath(job.getConfiguration(), family, path);
+  }
+
+  /**
+   * Sets the property familyIsNotInFile so that the parser know that the family
+   * is not to be parsed. Is to be used in conjunction with the addFamilyPath
+   * method.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param familyIsNotInFile
+   *          boolean.
+   */
+  public static void setFamilyNotInFile(Job job, boolean familyIsNotInFile) {
+    setFamilyNotInFile(job.getConfiguration(), familyIsNotInFile);
+  }
+
+  /**
+   * Add a mapping for a family to a path. This is to be used when an entire
+   * path is to be processed as a single family and the data itself does not
+   * contain the family.<br/>
+   * <br/>
+   * 
+   * NOTE: the familyNotInFile property must be set before this method can be
+   * called.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param family
+   *          the family.
+   * @param path
+   *          the path.
+   */
+  public static void addFamilyPath(Configuration configuration, String family, Path path) {
+    Collection<String> mappings = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
+    if (mappings == null) {
+      mappings = new TreeSet<String>();
+    }
+    mappings.add(family);
+    configuration.setStrings(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES, mappings.toArray(new String[mappings.size()]));
+    configuration.set(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family, path.toString());
+  }
+
+  /**
+   * Sets the property familyIsNotInFile so that the parser know that the family
+   * is not to be parsed. Is to be used in conjunction with the addFamilyPath
+   * method.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param familyIsNotInFile
+   *          boolean.
+   */
+  public static void setFamilyNotInFile(Configuration configuration, boolean familyIsNotInFile) {
+    configuration.setBoolean(BLUR_CSV_FAMILYISNOTINFILE, familyIsNotInFile);
+  }
+
+  public static boolean isFamilyNotInFile(Configuration configuration) {
+    return configuration.getBoolean(BLUR_CSV_FAMILYISNOTINFILE, false);
+  }
+
+  /**
+   * Sets all the family and column definitions.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param strDefinition
+   *          the string definition. <br/>
+   * <br/>
+   *          Example:<br/>
+   *          "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
+   *          Where "cf1" is a family name that contains columns "col1", "col2"
+   *          and "col3" and a second family of "cf2" with columns "col1",
+   *          "col2", and "col3".
+   */
+  public static void setColumns(Job job, String strDefinition) {
+    setColumns(job.getConfiguration(), strDefinition);
+  }
+
+  /**
+   * Sets all the family and column definitions.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param strDefinition
+   *          the string definition. <br/>
+   * <br/>
+   *          Example:<br/>
+   *          "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
+   *          Where "cf1" is a family name that contains columns "col1", "col2"
+   *          and "col3" and a second family of "cf2" with columns "col1",
+   *          "col2", and "col3".
+   */
+  public static void setColumns(Configuration configuration, String strDefinition) {
+    Iterable<String> familyDefs = Splitter.on('|').split(strDefinition);
+    for (String familyDef : familyDefs) {
+      int indexOf = familyDef.indexOf(':');
+      if (indexOf < 0) {
+        throwMalformedDefinition(strDefinition);
+      }
+      String family = familyDef.substring(0, indexOf);
+      Iterable<String> cols = Splitter.on(',').split(familyDef.substring(indexOf + 1));
+      List<String> colnames = new ArrayList<String>();
+      for (String columnName : cols) {
+        colnames.add(columnName);
+      }
+      if (family.trim().isEmpty() || colnames.isEmpty()) {
+        throwMalformedDefinition(strDefinition);
+      }
+      addColumns(configuration, family, colnames.toArray(new String[colnames.size()]));
+    }
+  }
+
+  private static void throwMalformedDefinition(String strDefinition) {
+    throw new RuntimeException("Family and column definition string not valid [" + strDefinition
+        + "] should look like \"family1:colname1,colname2|family2:colname1,colname2,colname3\"");
+  }
+
+  /**
+   * Adds the column layout for the given family.
+   * 
+   * @param job
+   *          the job to apply the layout.
+   * @param family
+   *          the family name.
+   * @param columns
+   *          the column names.
+   */
+  public static void addColumns(Job job, String family, String... columns) {
+    addColumns(job.getConfiguration(), family, columns);
+  }
+
+  /**
+   * Adds the column layout for the given family.
+   * 
+   * @param configuration
+   *          the configuration to apply the layout.
+   * @param family
+   *          the family name.
+   * @param columns
+   *          the column names.
+   */
+  public static void addColumns(Configuration configuration, String family, String... columns) {
+    Collection<String> families = new TreeSet<String>(configuration.getStringCollection(BLUR_CSV_FAMILIES));
+    families.add(family);
+    configuration.setStrings(BLUR_CSV_FAMILIES, families.toArray(new String[] {}));
+    configuration.setStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family, columns);
+  }
+
+  /**
+   * Sets the separator of the file, by default it is ",".
+   * 
+   * @param job
+   *          the job to apply the separator change.
+   * @param separator
+   *          the separator.
+   */
+  public static void setSeparator(Job job, String separator) {
+    setSeparator(job.getConfiguration(), separator);
+  }
+
+  /**
+   * Sets the separator of the file, by default it is ",".
+   * 
+   * @param configuration
+   *          the configuration to apply the separator change.
+   * @param separator
+   *          the separator.
+   */
+  public static void setSeparator(Configuration configuration, String separator) {
+    configuration.set(BLUR_CSV_SEPARATOR, separator);
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration configuration = context.getConfiguration();
+    Collection<String> familyNames = configuration.getStringCollection(BLUR_CSV_FAMILIES);
+    columnNameMap = new HashMap<String, List<String>>();
+    for (String family : familyNames) {
+      String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
+      columnNameMap.put(family, Arrays.asList(columnsNames));
+    }
+    splitter = Splitter.on(separator);
+    separator = configuration.get(BLUR_CSV_SEPARATOR, separator);
+    familyNotInFile = isFamilyNotInFile(configuration);
+    if (familyNotInFile) {
+      Path fileCurrentlyProcessing = getCurrentFile(context);
+      Collection<String> families = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
+      for (String family : families) {
+        String pathStr = configuration.get(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family);
+        Path path = new Path(pathStr);
+        path = path.makeQualified(path.getFileSystem(configuration));
+        if (isParent(path, fileCurrentlyProcessing)) {
+          familyFromPath = family;
+          break;
+        }
+      }
+    }
+  }
+
+  private boolean isParent(Path possibleParent, Path child) {
+    if (child == null) {
+      return false;
+    }
+    if (possibleParent.equals(child.getParent())) {
+      return true;
+    }
+    return isParent(possibleParent, child.getParent());
+  }
+
+  private Path getCurrentFile(Context context) throws IOException {
+    InputSplit split = context.getInputSplit();
+    if (split != null) {
+      FileSplit inputSplit = (FileSplit) split;
+      Path path = inputSplit.getPath();
+      return path.makeQualified(path.getFileSystem(context.getConfiguration()));
+    }
+    return null;
+  }
+
+  @Override
+  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException {
+    BlurRecord record = _mutate.getRecord();
+    record.clearColumns();
+    String str = value.toString();
+
+    Iterable<String> split = splitter.split(str);
+    List<String> list = toList(split);
+
+    if (list.size() < 3) {
+      throw new IOException("Record [" + str + "] too short.");
+    }
+    int column = 0;
+    record.setRowId(list.get(column++));
+    record.setRecordId(list.get(column++));
+    String family;
+    int offset;
+    if (familyNotInFile) {
+      family = familyFromPath;
+      offset = 2;
+    } else {
+      family = list.get(column++);
+      offset = 3;
+    }
+    record.setFamily(family);
+
+    List<String> columnNames = columnNameMap.get(family);
+    if (columnNames == null) {
+      throw new IOException("Family [" + family + "] is missing in the definition.");
+    }
+    if (list.size() - offset != columnNames.size()) {
+      throw new IOException("Record [" + str + "] too short, does not match defined record [rowid,recordid,family"
+          + getColumnNames(columnNames) + "].");
+    }
+
+    for (int i = 0; i < columnNames.size(); i++) {
+      record.addColumn(columnNames.get(i), list.get(i + offset));
+      _fieldCounter.increment(1);
+    }
+    _key.set(record.getRowId());
+    _mutate.setMutateType(MUTATE_TYPE.REPLACE);
+    context.write(_key, _mutate);
+    _recordCounter.increment(1);
+    context.progress();
+  }
+
+  public void setFamilyFromPath(String familyFromPath) {
+    this.familyFromPath = familyFromPath;
+  }
+
+  private String getColumnNames(List<String> columnNames) {
+    StringBuilder builder = new StringBuilder();
+    for (String c : columnNames) {
+      builder.append(',').append(c);
+    }
+    return builder.toString();
+  }
+
+  private List<String> toList(Iterable<String> split) {
+    List<String> lst = new ArrayList<String>();
+    for (String s : split) {
+      lst.add(s);
+    }
+    return lst;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
new file mode 100644
index 0000000..016fb51
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
@@ -0,0 +1,79 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * This class is to be used in conjunction with {@link BlurOutputFormat}
+ * .</br></br>
+ * 
+ * Here is a basic example of how to use both the {@link BlurOutputFormat} and
+ * the {@link DefaultBlurReducer} together to build indexes.</br></br>
+ * 
+ * Once this job has successfully completed the indexes will be imported by the
+ * running shard servers and be placed online. This is a polling mechicism in
+ * the shard servers and by default they poll every 10 seconds.
+ * 
+ * 
+ * </br></br>
+ * 
+ * Job job = new Job(conf, "blur index");</br>
+ * job.setJarByClass(BlurOutputFormatTest.class);</br>
+ * job.setMapperClass(CsvBlurMapper.class);</br>
+ * job.setReducerClass(DefaultBlurReducer.class);</br>
+ * job.setNumReduceTasks(1);</br>
+ * job.setInputFormatClass(TrackingTextInputFormat.class);</br>
+ * job.setOutputKeyClass(Text.class);
+ * </br>job.setOutputValueClass(BlurMutate.class);</br>
+ * job.setOutputFormatClass(BlurOutputFormat.class);</br> </br>
+ * FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));</br>
+ * CsvBlurMapper.addColumns(job, "cf1", "col");</br> </br> TableDescriptor
+ * tableDescriptor = new TableDescriptor();</br>
+ * tableDescriptor.setShardCount(1)
+ * ;</br>tableDescriptor.setAnalyzerDefinition(new
+ * AnalyzerDefinition());</br>tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR
+ * + "/out").toString());</br>BlurOutputFormat.setTableDescriptor(job,
+ * tableDescriptor);</br>
+ * 
+ * 
+ */
+public class DefaultBlurReducer extends Reducer<Text, BlurMutate, Text, BlurMutate> {
+
+  @Override
+  protected void setup(final Context context) throws IOException, InterruptedException {
+    BlurOutputFormat.setProgressable(context);
+    BlurOutputFormat.setGetCounter(new GetCounter() {
+      @Override
+      public Counter getCounter(Enum<?> counterName) {
+        return context.getCounter(counterName);
+      }
+    });
+  }
+
+  @Override
+  protected void reduce(Text key, Iterable<BlurMutate> values, Context context) throws IOException,
+      InterruptedException {
+    for (BlurMutate value : values) {
+      context.write(key, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
new file mode 100644
index 0000000..a078e76
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
@@ -0,0 +1,25 @@
+package org.apache.blur.mapreduce.lib;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public interface GetCounter {
+  
+  Counter getCounter(Enum<?> counterName);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
new file mode 100644
index 0000000..46c030f
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
@@ -0,0 +1,58 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class IOUtil {
+
+  public static final String UTF_8 = "UTF-8";
+
+  public static String readString(DataInput input) throws IOException {
+    int length = readVInt(input);
+    byte[] buffer = new byte[length];
+    input.readFully(buffer);
+    return new String(buffer, UTF_8);
+  }
+
+  public static void writeString(DataOutput output, String s) throws IOException {
+    byte[] bs = s.getBytes(UTF_8);
+    writeVInt(output, bs.length);
+    output.write(bs);
+  }
+
+  public static int readVInt(DataInput input) throws IOException {
+    byte b = input.readByte();
+    int i = b & 0x7F;
+    for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+      b = input.readByte();
+      i |= (b & 0x7F) << shift;
+    }
+    return i;
+  }
+
+  public static void writeVInt(DataOutput output, int i) throws IOException {
+    while ((i & ~0x7F) != 0) {
+      output.writeByte((byte) ((i & 0x7f) | 0x80));
+      i >>>= 7;
+    }
+    output.writeByte((byte) i);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
new file mode 100644
index 0000000..6a96166
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
@@ -0,0 +1,280 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+/**
+ * The {@link ProgressableDirectory} allows for progress to be recorded while
+ * Lucene is blocked and merging. This prevents the Task from being killed after
+ * not reporting progress because of the blocked merge.
+ */
+public class ProgressableDirectory extends Directory {
+
+  private Directory _directory;
+  private Progressable _progressable;
+
+  public ProgressableDirectory(Directory directory, Progressable progressable) {
+    _directory = directory;
+    _progressable = progressable == null ? new Progressable() {
+      @Override
+      public void progress() {
+
+      }
+    } : progressable;
+  }
+
+  @Override
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+  private IndexInput wrapInput(String name, IndexInput openInput) {
+    return new ProgressableIndexInput(name, openInput, 16384, _progressable);
+  }
+
+  private IndexOutput wrapOutput(IndexOutput createOutput) {
+    return new ProgressableIndexOutput(createOutput, _progressable);
+  }
+
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return wrapOutput(_directory.createOutput(name, context));
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return _directory.equals(obj);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  @Override
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  @Override
+  public int hashCode() {
+    return _directory.hashCode();
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  @Override
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return wrapInput(name, _directory.openInput(name, context));
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  @Override
+  public String toString() {
+    return _directory.toString();
+  }
+
+  @SuppressWarnings("deprecation")
+  static class ProgressableIndexOutput extends IndexOutput {
+
+    private Progressable _progressable;
+    private IndexOutput _indexOutput;
+
+    public ProgressableIndexOutput(IndexOutput indexOutput, Progressable progressable) {
+      _indexOutput = indexOutput;
+      _progressable = progressable;
+    }
+
+    @Override
+    public void close() throws IOException {
+      _indexOutput.close();
+      _progressable.progress();
+    }
+
+    @Override
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+      _indexOutput.copyBytes(input, numBytes);
+      _progressable.progress();
+    }
+
+    @Override
+    public void flush() throws IOException {
+      _indexOutput.flush();
+      _progressable.progress();
+    }
+
+    @Override
+    public long getFilePointer() {
+      return _indexOutput.getFilePointer();
+    }
+
+    @Override
+    public long length() throws IOException {
+      return _indexOutput.length();
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      _indexOutput.seek(pos);
+      _progressable.progress();
+    }
+
+    @Override
+    public void setLength(long length) throws IOException {
+      _indexOutput.setLength(length);
+      _progressable.progress();
+    }
+
+    @Override
+    public String toString() {
+      return _indexOutput.toString();
+    }
+
+    @Override
+    public void writeByte(byte b) throws IOException {
+      _indexOutput.writeByte(b);
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      _indexOutput.writeBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int length) throws IOException {
+      _indexOutput.writeBytes(b, length);
+      _progressable.progress();
+    }
+
+    @Override
+    public void writeInt(int i) throws IOException {
+      _indexOutput.writeInt(i);
+    }
+
+    @Override
+    public void writeLong(long i) throws IOException {
+      _indexOutput.writeLong(i);
+    }
+
+    @Override
+    public void writeString(String s) throws IOException {
+      _indexOutput.writeString(s);
+    }
+
+    @Override
+    public void writeStringStringMap(Map<String, String> map) throws IOException {
+      _indexOutput.writeStringStringMap(map);
+    }
+
+  }
+
+  static class ProgressableIndexInput extends BufferedIndexInput {
+
+    private IndexInput _indexInput;
+    private final long _length;
+    private Progressable _progressable;
+
+    ProgressableIndexInput(String name, IndexInput indexInput, int buffer, Progressable progressable) {
+      super(name, buffer);
+      _indexInput = indexInput;
+      _length = indexInput.length();
+      _progressable = progressable;
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      long filePointer = getFilePointer();
+      if (filePointer != _indexInput.getFilePointer()) {
+        _indexInput.seek(filePointer);
+      }
+      _indexInput.readBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      _indexInput.close();
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    public ProgressableIndexInput clone() {
+      ProgressableIndexInput clone = (ProgressableIndexInput) super.clone();
+      clone._indexInput = (IndexInput) _indexInput.clone();
+      return clone;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
new file mode 100644
index 0000000..ff8ee20
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
@@ -0,0 +1,64 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * This turns a standard hadoop counter into a rate counter.
+ */
+public class RateCounter {
+
+  private final Counter _counter;
+  private final long _reportTime;
+  private final long _rateTime;
+  private long _lastReport;
+  private long _count = 0;
+
+  public RateCounter(Counter counter) {
+    this(counter, TimeUnit.SECONDS, 10);
+  }
+
+  public RateCounter(Counter counter, TimeUnit unit, long reportTime) {
+    _counter = counter;
+    _lastReport = System.nanoTime();
+    _reportTime = unit.toNanos(reportTime);
+    _rateTime = unit.toSeconds(reportTime);
+  }
+
+  public void mark() {
+    mark(1l);
+  }
+
+  public void mark(long n) {
+    long now = System.nanoTime();
+    if (_lastReport + _reportTime < now) {
+      long rate = _count / _rateTime;
+      _counter.setValue(rate);
+      _lastReport = System.nanoTime();
+      _count = 0;
+    }
+    _count += n;
+  }
+
+  public void close() {
+    _counter.setValue(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java
new file mode 100644
index 0000000..e04476e
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java
@@ -0,0 +1,63 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class BlurTaskTest {
+  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "/tmp"));
+
+  @Test
+  public void testGetNumReducersBadPath() {
+
+    BlurTask task = new BlurTask();
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(5);
+    tableDescriptor.setTableUri(new File(TMPDIR, "blur34746545").toURI().toString());
+    tableDescriptor.setName("blur34746545");
+    task.setTableDescriptor(tableDescriptor);
+    assertEquals(5, task.getNumReducers(new Configuration()));
+  }
+
+  @Test
+  public void testGetNumReducersValidPath() {
+    new File(TMPDIR, "blurTestShards/shard-1/").mkdirs();
+    new File(TMPDIR, "blurTestShards/shard-2/").mkdirs();
+    new File(TMPDIR, "blurTestShards/shard-3/").mkdirs();
+    try {
+      BlurTask task = new BlurTask();
+      TableDescriptor tableDescriptor = new TableDescriptor();
+      tableDescriptor.setShardCount(5);
+      tableDescriptor.setTableUri(new File(TMPDIR, "blurTestShards").toURI().toString());
+      tableDescriptor.setName("blurTestShards");
+      task.setTableDescriptor(tableDescriptor);
+      assertEquals(3, task.getNumReducers(new Configuration()));
+    } finally {
+      new File(TMPDIR, "blurTestShards/shard-1/").delete();
+      new File(TMPDIR, "blurTestShards/shard-2/").delete();
+      new File(TMPDIR, "blurTestShards/shard-3/").delete();
+      new File(TMPDIR, "blurTestShards/").delete();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
new file mode 100644
index 0000000..915ee89
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -0,0 +1,337 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.TreeSet;
+
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TestMapReduceLocal.TrackingTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.lucene.index.DirectoryReader;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurOutputFormatTest {
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs;
+  private static MiniMRCluster mr;
+  private static Path TEST_ROOT_DIR;
+  private static JobConf jobConf;
+  private Path outDir = new Path(TEST_ROOT_DIR + "/out");
+  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    System.setProperty("test.build.data", "./target/BlurOutputFormatTest/data");
+    TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "/tmp"));
+    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+    mr = new MiniMRCluster(2, "file:///", 3);
+    jobConf = mr.createJobConf();
+    BufferStore.init(128, 128);
+  }
+
+  @AfterClass
+  public static void teardown() {
+    if (mr != null) {
+      mr.shutdown();
+    }
+  }
+
+  @Test
+  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException {
+    localFs.delete(inDir, true);
+    localFs.delete(outDir, true);
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(tableUri);
+    
+    createShardDirectories(outDir,1);
+    
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+    assertEquals(2, reader.numDocs());
+    reader.close();
+  }
+
+  private Collection<Path> getCommitedTasks(Path path) throws IOException {
+    Collection<Path> result = new TreeSet<Path>();
+    FileSystem fileSystem = path.getFileSystem(jobConf);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus fileStatus : listStatus) {
+      Path p = fileStatus.getPath();
+      if (fileStatus.isDir() && p.getName().endsWith(".commit")) {
+        result.add(p);
+      }
+    }
+    return result;
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(tableUri);
+
+    createShardDirectories(outDir,1);
+    
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, true);
+    BlurOutputFormat.setOptimizeInFlight(job, false);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Collection<Path> commitedTasks = getCommitedTasks(path);
+    assertEquals(1, commitedTasks.size());
+
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+    assertEquals(80000, reader.numDocs());
+    reader.close();
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+      
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(tableUri);
+    
+    createShardDirectories(outDir,2);
+    
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, false);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertEquals(1, commitedTasks.size());
+
+      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+      total += reader.numDocs();
+      reader.close();
+    }
+    assertEquals(80000, total);
+
+  }
+
+  @Test
+  public void testBlurOutputFormatOverFlowMultipleReducersWithReduceMultiplierTest() throws IOException,
+      InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+
+    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
+    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(7);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(tableUri);
+
+    createShardDirectories(outDir,7);
+    
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    int multiple = 2;
+    BlurOutputFormat.setReducerMultiplier(job, multiple);
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    long total = 0;
+    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
+      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Collection<Path> commitedTasks = getCommitedTasks(path);
+      assertTrue(multiple >= commitedTasks.size());
+      for (Path p : commitedTasks) {
+        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, p));
+        total += reader.numDocs();
+        reader.close();
+      }
+    }
+    assertEquals(80000, total);
+
+  }
+  
+  @Test (expected = IllegalArgumentException.class)
+  public void testBlurOutputFormatValidateReducerCount() throws IOException, InterruptedException, ClassNotFoundException {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TrackingTextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(tableUri);
+    
+    createShardDirectories(outDir,1);
+    
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setReducerMultiplier(job, 2);
+    job.setNumReduceTasks(4);
+    job.submit();
+    
+  }
+
+  public static String readFile(String name) throws IOException {
+    DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+      result.append(line);
+      result.append('\n');
+      line = b.readLine();
+    }
+    b.close();
+    return result.toString();
+  }
+
+  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
+      int numberOfRecords, String family) throws IOException {
+    // "1,1,cf1,val1"
+    Path file = new Path(TEST_ROOT_DIR + "/" + name);
+    localFs.delete(file, false);
+    DataOutputStream f = localFs.create(file);
+    PrintWriter writer = new PrintWriter(f);
+    for (int row = 0; row < numberOfRows; row++) {
+      for (int record = 0; record < numberOfRecords; record++) {
+        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
+      }
+    }
+    writer.close();
+    return file;
+  }
+
+  private void createShardDirectories(Path outDir, int shardCount) throws IOException{
+    localFs.mkdirs(outDir);
+    for(int i=0; i<shardCount; i++){
+      localFs.mkdirs(new Path(outDir, BlurUtil.getShardName(i)));
+    }
+  }
+  private String getRecord(int rowId, int recordId, String family) {
+    return rowId + "," + recordId + "," + family + ",valuetoindex";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
new file mode 100644
index 0000000..2e28f6c
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
@@ -0,0 +1,66 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.CsvBlurMapper;
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CsvBlurMapperTest {
+
+  private MapDriver<LongWritable, Text, Text, BlurMutate> _mapDriver;
+  private CsvBlurMapper _mapper;
+
+  @Before
+  public void setUp() throws IOException {
+    _mapper = new CsvBlurMapper();
+    _mapDriver = MapDriver.newMapDriver(_mapper);
+  }
+
+  @Test
+  public void testMapperWithFamilyInData() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+
+  @Test
+  public void testMapperFamilyPerPath() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setFamilyNotInFile(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-query/pom.xml
----------------------------------------------------------------------
diff --git a/blur-query/pom.xml b/blur-query/pom.xml
new file mode 100644
index 0000000..ca48a51
--- /dev/null
+++ b/blur-query/pom.xml
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.blur</groupId>
+		<artifactId>blur</artifactId>
+		<version>0.1.5</version>
+        <relativePath>../pom.xml</relativePath>
+	</parent>
+	<groupId>org.apache.blur</groupId>
+	<artifactId>blur-query</artifactId>
+	<packaging>jar</packaging>
+	<name>Blur Query</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-util</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-store</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-thrift</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.lucene</groupId>
+			<artifactId>lucene-core</artifactId>
+			<version>${lucene.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.lucene</groupId>
+			<artifactId>lucene-codecs</artifactId>
+			<version>${lucene.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.lucene</groupId>
+			<artifactId>lucene-analyzers-common</artifactId>
+			<version>${lucene.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.lucene</groupId>
+			<artifactId>lucene-queryparser</artifactId>
+			<version>${lucene.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-cli</groupId>
+			<artifactId>commons-cli</artifactId>
+			<version>${commons-cli.version}</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
+			<artifactId>concurrentlinkedhashmap-lru</artifactId>
+			<version>${concurrentlinkedhashmap-lru.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>${log4j.version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.mail</groupId>
+					<artifactId>mail</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+	</dependencies>
+
+	<repositories>
+		<repository>
+			<id>libdir</id>
+			<url>file://${basedir}/../lib</url>
+		</repository>
+	</repositories>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>


Mime
View raw message