incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/5] git commit: Adding a mapred common library. This should allow a single project for the mapred code. The next step will be to remove the blur-mapred-hadoop1 and 2 projects.
Date Sun, 12 Apr 2015 13:26:37 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriver.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriver.java
deleted file mode 100644
index 5d17aef..0000000
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriver.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.v2;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.blur.analysis.FieldManager;
-import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.blur.mapreduce.lib.GenericBlurRecordWriter;
-import org.apache.blur.server.TableContext;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.tools.DFSAdmin;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.TokenStream;
-import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
-import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.FieldInfo.IndexOptions;
-import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.index.IndexableFieldType;
-import org.apache.lucene.util.IOUtils;
-
-public class DirectIndexingDriver implements Tool {
-
-  public static class DirectIndexingMapper extends Mapper<IntWritable, BlurRecord, LuceneKeyWritable, NullWritable> {
-
-    private FieldManager _fieldManager;
-    private TableContext _tableContext;
-    private Analyzer _analyzer;
-
-    // private fieldState.reset();
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-      _fieldManager = _tableContext.getFieldManager();
-      _analyzer = _fieldManager.getAnalyzerForIndex();
-    }
-
-    @Override
-    protected void map(IntWritable key, BlurRecord record, Context context) throws IOException, InterruptedException {
-      int documentId = key.get();
-      String rowId = record.getRowId();
-      List<Field> fields = _fieldManager.getFields(rowId, GenericBlurRecordWriter.getRecord(record));
-      // write(documentId, fields, context);
-    }
-
-//    public void processFields(final IndexableField[] fields, final int count) throws IOException {
-//
-//      fieldState.reset();
-//
-//      final boolean doInvert = consumer.start(fields, count);
-//
-//      for (int i = 0; i < count; i++) {
-//
-//        final IndexableField field = fields[i];
-//        final IndexableFieldType fieldType = field.fieldType();
-//
-//        // TODO FI: this should be "genericized" to querying
-//        // consumer if it wants to see this particular field
-//        // tokenized.
-//        if (fieldType.indexed() && doInvert) {
-//          final boolean analyzed = fieldType.tokenized() && docState.analyzer != null;
-//
-//          // if the field omits norms, the boost cannot be indexed.
-//          if (fieldType.omitNorms() && field.boost() != 1.0f) {
-//            throw new UnsupportedOperationException("You cannot set an index-time boost: norms are omitted for field '"
-//                + field.name() + "'");
-//          }
-//
-//          // only bother checking offsets if something will consume them.
-//          // TODO: after we fix analyzers, also check if termVectorOffsets will
-//          // be indexed.
-//          final boolean checkOffsets = fieldType.indexOptions() == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS;
-//          int lastStartOffset = 0;
-//
-//          if (i > 0) {
-//            fieldState.position += analyzed ? docState.analyzer.getPositionIncrementGap(fieldInfo.name) : 0;
-//          }
-//
-//          final TokenStream stream = field.tokenStream(docState.analyzer);
-//          // reset the TokenStream to the first token
-//          stream.reset();
-//
-//          boolean success2 = false;
-//
-//          try {
-//            boolean hasMoreTokens = stream.incrementToken();
-//
-//            fieldState.attributeSource = stream;
-//
-//            OffsetAttribute offsetAttribute = fieldState.attributeSource.addAttribute(OffsetAttribute.class);
-//            PositionIncrementAttribute posIncrAttribute = fieldState.attributeSource
-//                .addAttribute(PositionIncrementAttribute.class);
-//
-//            if (hasMoreTokens) {
-//              consumer.start(field);
-//
-//              do {
-//                // If we hit an exception in stream.next below
-//                // (which is fairly common, eg if analyzer
-//                // chokes on a given document), then it's
-//                // non-aborting and (above) this one document
-//                // will be marked as deleted, but still
-//                // consume a docID
-//
-//                final int posIncr = posIncrAttribute.getPositionIncrement();
-//                if (posIncr < 0) {
-//                  throw new IllegalArgumentException("position increment must be >=0 (got " + posIncr + ")");
-//                }
-//                if (fieldState.position == 0 && posIncr == 0) {
-//                  throw new IllegalArgumentException("first position increment must be > 0 (got 0)");
-//                }
-//                int position = fieldState.position + posIncr;
-//                if (position > 0) {
-//                  // NOTE: confusing: this "mirrors" the
-//                  // position++ we do below
-//                  position--;
-//                } else if (position < 0) {
-//                  throw new IllegalArgumentException("position overflow for field '" + field.name() + "'");
-//                }
-//
-//                // position is legal, we can safely place it in fieldState now.
-//                // not sure if anything will use fieldState after non-aborting
-//                // exc...
-//                fieldState.position = position;
-//
-//                if (posIncr == 0)
-//                  fieldState.numOverlap++;
-//
-//                if (checkOffsets) {
-//                  int startOffset = fieldState.offset + offsetAttribute.startOffset();
-//                  int endOffset = fieldState.offset + offsetAttribute.endOffset();
-//                  if (startOffset < 0 || endOffset < startOffset) {
-//                    throw new IllegalArgumentException(
-//                        "startOffset must be non-negative, and endOffset must be >= startOffset, " + "startOffset="
-//                            + startOffset + ",endOffset=" + endOffset);
-//                  }
-//                  if (startOffset < lastStartOffset) {
-//                    throw new IllegalArgumentException("offsets must not go backwards startOffset=" + startOffset
-//                        + " is < lastStartOffset=" + lastStartOffset);
-//                  }
-//                  lastStartOffset = startOffset;
-//                }
-//
-//                boolean success = false;
-//                try {
-//                  // If we hit an exception in here, we abort
-//                  // all buffered documents since the last
-//                  // flush, on the likelihood that the
-//                  // internal state of the consumer is now
-//                  // corrupt and should not be flushed to a
-//                  // new segment:
-//                  consumer.add();
-//                  success = true;
-//                } finally {
-//                  if (!success) {
-//                    docState.docWriter.setAborting();
-//                  }
-//                }
-//                fieldState.length++;
-//                fieldState.position++;
-//              } while (stream.incrementToken());
-//            }
-//            // trigger streams to perform end-of-stream operations
-//            stream.end();
-//
-//            fieldState.offset += offsetAttribute.endOffset();
-//            success2 = true;
-//          } finally {
-//            if (!success2) {
-//              IOUtils.closeWhileHandlingException(stream);
-//            } else {
-//              stream.close();
-//            }
-//          }
-//
-//          fieldState.offset += analyzed ? docState.analyzer.getOffsetGap(fieldInfo.name) : 0;
-//          fieldState.boost *= field.boost();
-//        }
-//
-//        // LUCENE-2387: don't hang onto the field, so GC can
-//        // reclaim
-//        fields[i] = null;
-//      }
-//
-//      consumer.finish();
-//      endConsumer.finish();
-//    }
-
-  }
-
-  private Configuration _conf;
-
-  @Override
-  public Configuration getConf() {
-    return _conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    _conf = conf;
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-
-    String in = args[0];
-
-    Job job = new Job(getConf(), "Lucene Direct Indexing");
-    job.setJarByClass(DirectIndexingDriver.class);
-    job.setMapperClass(DirectIndexingMapper.class);
-    job.setInputFormatClass(SequenceFileInputFormat.class);
-
-    job.setOutputFormatClass(NullOutputFormat.class);
-    job.setOutputKeyClass(LuceneKeyWritable.class);
-    job.setOutputValueClass(NullWritable.class);
-
-    FileInputFormat.addInputPath(job, new Path(in));
-
-    if (!job.waitForCompletion(true)) {
-      return 1;
-    }
-    return 0;
-  }
-
-  public static void main(String[] args) throws Exception {
-    System.exit(ToolRunner.run(new DFSAdmin(), args));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/DocumentWritable.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/DocumentWritable.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/DocumentWritable.java
deleted file mode 100644
index 8a88d8a..0000000
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/DocumentWritable.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.v2;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.FieldType;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.IndexableField;
-
-public class DocumentWritable implements Writable {
-
-  private static final String UTF_8 = "UTF-8";
-  private List<IndexableField> _document = new ArrayList<IndexableField>();
-
-  public DocumentWritable() {
-
-  }
-
-  public DocumentWritable(List<IndexableField> document) {
-    _document = document;
-  }
-
-  public void clear() {
-    _document.clear();
-  }
-
-  public void add(IndexableField field) {
-    _document.add(field);
-  }
-
-  public DocumentWritable(Document document) {
-    _document = document.getFields();
-  }
-
-  public List<IndexableField> getDocument() {
-    return _document;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    int numberOfFields = _document.size();
-    WritableUtils.writeVInt(out, numberOfFields);
-    for (int i = 0; i < numberOfFields; i++) {
-      IndexableField field = _document.get(i);
-      write(out, field);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    _document.clear();
-    int numberOfFields = WritableUtils.readVInt(in);
-    for (int i = 0; i < numberOfFields; i++) {
-      IndexableField field = readField(in);
-      _document.add(field);
-    }
-  }
-
-  private IndexableField readField(DataInput in) throws IOException {
-    LUCENE_FIELD_TYPE type = LUCENE_FIELD_TYPE.lookupByValue(in.readByte());
-    String name = readString(in);
-    switch (type) {
-    case StringField:
-      return readStringField(in, name);
-    default:
-      throw new IOException("Type [" + type + "] not supported.");
-    }
-  }
-
-  private void write(DataOutput out, IndexableField field) throws IOException {
-    LUCENE_FIELD_TYPE type = LUCENE_FIELD_TYPE.lookupByClass(field.getClass());
-    out.writeByte(type.value());
-    writeString(out, field.name());
-    switch (type) {
-    case StringField:
-      writeStringField(out, (StringField) field);
-      return;
-    default:
-      throw new IOException("Type [" + type + "] not supported.");
-    }
-  }
-
-  private void writeStringField(DataOutput out, StringField stringField) throws IOException {
-    FieldType fieldType = stringField.fieldType();
-    if (fieldType.equals(StringField.TYPE_STORED)) {
-      out.writeBoolean(true);
-    } else if (fieldType.equals(StringField.TYPE_NOT_STORED)) {
-      out.writeBoolean(false);
-    } else {
-      throw new IOException("Non default FieldTypes for StringField not supported.");
-    }
-    writeString(out, stringField.stringValue());
-  }
-
-  private IndexableField readStringField(DataInput in, String name) throws IOException {
-    boolean stored = in.readBoolean();
-    String value = readString(in);
-    if (stored) {
-      return new StringField(name, value, Store.YES);
-    } else {
-      return new StringField(name, value, Store.NO);
-    }
-  }
-
-  private String readString(DataInput in) throws IOException {
-    int length = WritableUtils.readVInt(in);
-    byte[] buf = new byte[length];
-    in.readFully(buf);
-    return new String(buf, UTF_8);
-  }
-
-  private void writeString(DataOutput out, String value) throws IOException {
-    byte[] bs = value.getBytes(UTF_8);
-    WritableUtils.writeVInt(out, bs.length);
-    out.write(bs);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/LUCENE_FIELD_TYPE.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/LUCENE_FIELD_TYPE.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/LUCENE_FIELD_TYPE.java
deleted file mode 100644
index 373f4ac..0000000
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/LUCENE_FIELD_TYPE.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.v2;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.lucene.index.IndexableField;
-
-public enum LUCENE_FIELD_TYPE {
-
-  BinaryDocValuesField(org.apache.lucene.document.BinaryDocValuesField.class, (byte) 0),
-
-  DoubleField(org.apache.lucene.document.DoubleField.class, (byte) 1),
-
-  FloatField(org.apache.lucene.document.FloatField.class, (byte) 2),
-
-  IntField(org.apache.lucene.document.IntField.class, (byte) 3),
-
-  LongField(org.apache.lucene.document.LongField.class, (byte) 4),
-
-  NumericDocValuesField(org.apache.lucene.document.NumericDocValuesField.class, (byte) 5),
-
-  DoubleDocValuesField(org.apache.lucene.document.DoubleDocValuesField.class, (byte) 6),
-
-  FloatDocValuesField(org.apache.lucene.document.FloatDocValuesField.class, (byte) 7),
-
-  SortedDocValuesField(org.apache.lucene.document.SortedDocValuesField.class, (byte) 8),
-
-  SortedSetDocValuesField(org.apache.lucene.document.SortedSetDocValuesField.class, (byte) 9),
-
-  StoredField(org.apache.lucene.document.StoredField.class, (byte) 10),
-
-  StringField(org.apache.lucene.document.StringField.class, (byte) 11),
-
-  TextField(org.apache.lucene.document.TextField.class, (byte) 12);
-
-  private final byte _value;
-  private final Class<? extends IndexableField> _clazz;
-
-  private LUCENE_FIELD_TYPE(Class<? extends IndexableField> clazz, byte b) {
-    _value = b;
-    _clazz = clazz;
-  }
-
-  public Class<? extends IndexableField> fieldClass() {
-    return _clazz;
-  }
-
-  public byte value() {
-    return _value;
-  }
-
-  public static LUCENE_FIELD_TYPE lookupByValue(byte value) {
-    LUCENE_FIELD_TYPE type = _lookupByValue.get(value);
-    if (type == null) {
-      throw new RuntimeException("Type for [" + value + "] not found.");
-    }
-    return type;
-  }
-
-  public static LUCENE_FIELD_TYPE lookupByClass(Class<? extends IndexableField> value) {
-    LUCENE_FIELD_TYPE type = _lookupByClass.get(value);
-    if (type == null) {
-      throw new RuntimeException("Type for [" + value + "] not found.");
-    }
-    return type;
-  }
-
-  private final static Map<Class<? extends IndexableField>, LUCENE_FIELD_TYPE> _lookupByClass = new ConcurrentHashMap<Class<? extends IndexableField>, LUCENE_FIELD_TYPE>();
-  private final static Map<Byte, LUCENE_FIELD_TYPE> _lookupByValue = new ConcurrentHashMap<Byte, LUCENE_FIELD_TYPE>();
-
-  static {
-    LUCENE_FIELD_TYPE[] values = LUCENE_FIELD_TYPE.values();
-    for (LUCENE_FIELD_TYPE type : values) {
-      _lookupByClass.put(type.fieldClass(), type);
-      _lookupByValue.put(type.value(), type);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/LuceneKeyWritable.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/LuceneKeyWritable.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/LuceneKeyWritable.java
deleted file mode 100644
index bca3c4f..0000000
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/v2/LuceneKeyWritable.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.v2;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.lucene.util.BytesRef;
-
-public class LuceneKeyWritable implements WritableComparable<LuceneKeyWritable> {
-
-  public static enum Type {
-    SHARD_FIELD_TEXT((byte) 0), SHARD_FIELD_TEXT_DOCUMENTID((byte) 1), SHARD_FIELD_TEXT_DOCUMENTID_POSITION((byte) 2);
-
-    private final byte _value;
-
-    private Type(byte value) {
-      _value = value;
-    }
-
-    public byte value() {
-      return _value;
-    }
-
-    public static Type lookup(byte value) {
-      switch (value) {
-      case 0:
-        return SHARD_FIELD_TEXT;
-      case 1:
-        return SHARD_FIELD_TEXT_DOCUMENTID;
-      case 2:
-        return SHARD_FIELD_TEXT_DOCUMENTID_POSITION;
-      default:
-        throw new RuntimeException("Value [" + value + "] not found.");
-      }
-    }
-  }
-
-  private int _shardId;
-  private int _fieldId;
-  private BytesRef _text = new BytesRef();
-  private Type _type;
-  private int _documentId;
-  private int _position;
-
-  public LuceneKeyWritable() {
-
-  }
-
-  public LuceneKeyWritable(int shardId, int fieldId, BytesRef text, Type type, int documentId, int position) {
-    _shardId = shardId;
-    _fieldId = fieldId;
-    _text = text;
-    _type = type;
-    _documentId = documentId;
-    _position = position;
-  }
-
-  public int getShardId() {
-    return _shardId;
-  }
-
-  public int getFieldId() {
-    return _fieldId;
-  }
-
-  public BytesRef getText() {
-    return _text;
-  }
-
-  public Type getType() {
-    return _type;
-  }
-
-  public int getDocumentId() {
-    return _documentId;
-  }
-
-  public int getPosition() {
-    return _position;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    _shardId = in.readInt();
-    _fieldId = in.readInt();
-    read(in, _text);
-    _type = Type.lookup(in.readByte());
-    switch (_type) {
-    case SHARD_FIELD_TEXT:
-      return;
-    case SHARD_FIELD_TEXT_DOCUMENTID:
-      _documentId = in.readInt();
-      return;
-    case SHARD_FIELD_TEXT_DOCUMENTID_POSITION:
-      _documentId = in.readInt();
-      _position = in.readInt();
-      return;
-    default:
-      throw new IOException("Type [" + _type + "] not supported.");
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(_shardId);
-    out.writeInt(_fieldId);
-    write(out, _text);
-    out.writeByte(_type.value());
-    switch (_type) {
-    case SHARD_FIELD_TEXT:
-      return;
-    case SHARD_FIELD_TEXT_DOCUMENTID:
-      out.writeInt(_documentId);
-      return;
-    case SHARD_FIELD_TEXT_DOCUMENTID_POSITION:
-      out.writeInt(_documentId);
-      out.writeInt(_position);
-      return;
-    default:
-      throw new IOException("Type [" + _type + "] not supported.");
-    }
-  }
-
-  private void write(DataOutput out, BytesRef ref) throws IOException {
-    out.writeInt(ref.length);
-    out.write(ref.bytes, ref.offset, ref.length);
-  }
-
-  private void read(DataInput in, BytesRef ref) throws IOException {
-    int len = in.readInt();
-    if (ref.bytes.length < len) {
-      ref.grow(len);
-    }
-    in.readFully(ref.bytes, 0, len);
-    ref.offset = 0;
-    ref.length = len;
-  }
-
-  @Override
-  public int compareTo(LuceneKeyWritable o) {
-    int compareTo = _shardId - o._shardId;
-    if (compareTo == 0) {
-      compareTo = _fieldId - o._fieldId;
-      if (compareTo == 0) {
-        compareTo = _text.compareTo(o._text);
-        switch (_type) {
-        case SHARD_FIELD_TEXT:
-          return compareTo;
-        case SHARD_FIELD_TEXT_DOCUMENTID:
-          return _documentId - o._documentId;
-        case SHARD_FIELD_TEXT_DOCUMENTID_POSITION: {
-          compareTo = _documentId - o._documentId;
-          if (compareTo == 0) {
-            return _position - o._position;
-          }
-          return compareTo;
-        }
-        default:
-          throw new RuntimeException("Type [" + _type + "] not supported.");
-        }
-      }
-    }
-    return compareTo;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
deleted file mode 100644
index 340d2b3..0000000
--- a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.blur.mapreduce.lib.CsvBlurDriver.ControllerPool;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvBlurDriverTest {
-
-  protected String tableUri = "file:///tmp/tmppath";
-  protected int shardCount = 13;
-
-  @Before
-  public void setup() throws IOException {
-    Configuration configuration = new Configuration();
-    Path path1 = new Path("file:///tmp/test1");
-    Path path2 = new Path("file:///tmp/test2");
-    FileSystem fileSystem = path1.getFileSystem(configuration);
-    fileSystem.mkdirs(path1);
-    fileSystem.mkdirs(path2);
-  }
-
-  @Test
-  public void testCsvBlurDriverTestFail1() throws Exception {
-    Configuration configuration = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return null;
-      }
-    };
-    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
-    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, ref, new String[] {}));
-  }
-
-  @Test
-  public void testCsvBlurDriverTest() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
-        "file:///tmp/test2");
-    assertNotNull(job);
-    Configuration configuration = job.getConfiguration();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    assertEquals(tableDescriptor.getName(), "table1");
-    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
-    assertEquals(2, inputs.size());
-    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
-    assertEquals(2, familyAndColumnNameMap.size());
-  }
-
-  @Test
-  public void testCsvBlurDriverTest2() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
-        "file:///tmp/test2", "-S", "-C", "1000000", "2000000");
-    assertNotNull(job);
-    Configuration configuration = job.getConfiguration();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    assertEquals(tableDescriptor.getName(), "table1");
-    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
-    assertEquals(2, inputs.size());
-    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
-    assertEquals(2, familyAndColumnNameMap.size());
-  }
-
-  @Test
-  public void testCsvBlurDriverTest3() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
-        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
-    assertNotNull(job);
-    Configuration configuration = job.getConfiguration();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    assertEquals(tableDescriptor.getName(), "table1");
-    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
-    assertEquals(2, inputs.size());
-    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
-    assertEquals(2, familyAndColumnNameMap.size());
-    assertEquals("true", configuration.get(CsvBlurDriver.MAPRED_COMPRESS_MAP_OUTPUT));
-    assertEquals(SnappyCodec.class.getName(), configuration.get(CsvBlurDriver.MAPRED_MAP_OUTPUT_COMPRESSION_CODEC));
-  }
-
-  @Test
-  public void multiplierParamShouldIncreaseReduceTasks() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    int multiplierParam = 10;
-    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010", "-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i",
-        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY", "-r", Integer.toString(multiplierParam));
-    assertNotNull(job);
-
-    assertEquals(multiplierParam * shardCount, job.getNumReduceTasks());
-  }
-
-  protected Iface getMockIface() {
-    InvocationHandler handler = new InvocationHandler() {
-
-      @Override
-      public Object invoke(Object o, Method method, Object[] args) throws Throwable {
-        if (method.getName().equals("describe")) {
-          TableDescriptor tableDescriptor = new TableDescriptor();
-          tableDescriptor.setName((String) args[0]);
-          tableDescriptor.setTableUri(tableUri);
-          tableDescriptor.setShardCount(shardCount);
-          return tableDescriptor;
-        }
-        throw new RuntimeException("not implemented.");
-      }
-    };
-    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class }, handler);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
deleted file mode 100644
index 47aa8e5..0000000
--- a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvBlurMapperTest {
-
-  private MapDriver<Writable, Text, Text, BlurMutate> _mapDriver;
-  private CsvBlurMapper _mapper;
-
-  @Before
-  public void setUp() throws IOException {
-    _mapper = new CsvBlurMapper();
-    _mapDriver = MapDriver.newMapDriver(_mapper);
-  }
-
-  @Test
-  public void testMapperWithFamilyInData() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
-    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-
-  @Test
-  public void testMapperFamilyPerPath() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,value1,value2"));
-    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-  
-  @Test
-  public void testMapperAutoGenerateRecordId() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("rowid1,value1,value2"));
-    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "-25nqln3n2vb4cayex9y9tpxx3", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-  
-  @Test
-  public void testMapperAutoGenerateRowId() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("record1,value1,value2"));
-    _mapDriver.withOutput(new Text("-50b4uzohynr7j7s9pve7ytz66"), new BlurMutate(MUTATE_TYPE.REPLACE, "-50b4uzohynr7j7s9pve7ytz66", "record1", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-  
-  @Test
-  public void testMapperAutoGenerateRowIdAndRecordId() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("value1,value2"));
-    _mapDriver.withOutput(new Text("5q0tme15ph3h5pns8sv3u5wy2"), new BlurMutate(MUTATE_TYPE.REPLACE, "5q0tme15ph3h5pns8sv3u5wy2", "5q0tme15ph3h5pns8sv3u5wy2", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriverMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriverMiniClusterTest.java b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriverMiniClusterTest.java
deleted file mode 100644
index 2adfbfb..0000000
--- a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriverMiniClusterTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.blur.mapreduce.lib.v2;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.blur.server.TableContext;
-import org.apache.blur.store.buffer.BufferStore;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class DirectIndexingDriverMiniClusterTest {
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem localFs;
-  private static MiniMRCluster mr;
-  private static final Path TEST_ROOT_DIR = new Path("./target/tmp/DirectIndexingDriverTest_tmp");
-  private static JobConf jobConf;
-  private static final Path outDir = new Path(TEST_ROOT_DIR + "/out");
-  private static final Path inDir = new Path(TEST_ROOT_DIR + "/in");
-
-  @BeforeClass
-  public static void setupTest() throws Exception {
-    System.setProperty("test.build.data", "./target/DirectIndexingDriverTest/data");
-    System.setProperty("hadoop.log.dir", "./target/DirectIndexingDriverTest/hadoop_log");
-    try {
-      localFs = FileSystem.getLocal(conf);
-    } catch (IOException io) {
-      throw new RuntimeException("problem getting local fs", io);
-    }
-    mr = new MiniMRCluster(1, "file:///", 1);
-    jobConf = mr.createJobConf();
-    BufferStore.initNewBuffer(128, 128 * 128);
-  }
-
-  @AfterClass
-  public static void teardown() {
-    if (mr != null) {
-      mr.shutdown();
-    }
-    rm(new File("build"));
-  }
-
-  private static void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Before
-  public void setup() throws IllegalArgumentException, IOException {
-    TableContext.clear();
-    if (localFs.exists(inDir)) {
-      assertTrue(localFs.delete(inDir, true));
-    }
-    if (localFs.exists(outDir)) {
-      assertTrue(localFs.delete(outDir, true));
-    }
-  }
-
-  @Test
-  public void testBlurOutputFormat() throws Exception {
-//    DirectIndexingDriverTest.createInputDocument(localFs, jobConf, inDir);
-//    DirectIndexingDriver driver = new DirectIndexingDriver();
-//    driver.setConf(jobConf);
-//    driver.run(new String[] { inDir.toString(), outDir.toString() });
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriverTest.java b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriverTest.java
deleted file mode 100644
index d8f9a68..0000000
--- a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/v2/DirectIndexingDriverTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib.v2;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.document.Field.Store;
-import org.junit.Test;
-
-public class DirectIndexingDriverTest {
-
-//  @Test
-  public void testIndexing() throws Exception {
-    Configuration configuration = new Configuration();
-    Path path = new Path("./tmp/test_DirectIndexingDriverTest_input/");
-    FileSystem fileSystem = path.getFileSystem(configuration);
-    createInputDocument(fileSystem, configuration, path);
-    DirectIndexingDriver directIndexingDriver = new DirectIndexingDriver();
-    directIndexingDriver.setConf(configuration);
-    directIndexingDriver.run(new String[] { path.toString() });
-  }
-
-  public static void createInputDocument(FileSystem fileSystem, Configuration configuration, Path path)
-      throws IOException {
-    Writer writer = SequenceFile.createWriter(fileSystem, configuration, new Path(path, "data"), IntWritable.class,
-        DocumentWritable.class);
-    IntWritable docId = new IntWritable();
-    DocumentWritable documentWritable = new DocumentWritable();
-    int numberOfFields = 10;
-    Random random = new Random();
-    for (int i = 0; i < 100; i++) {
-      docId.set(i);
-      documentWritable.clear();
-      populate(numberOfFields, random, documentWritable);
-      writer.append(docId, documentWritable);
-    }
-    writer.close();
-  }
-
-  public static void populate(int numberOfFields, Random random, DocumentWritable documentWritable) {
-    for (int i = 0; i < numberOfFields; i++) {
-      long l = random.nextLong();
-      documentWritable.add(new StringField("f" + i, Long.toString(l), Store.YES));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/pom.xml b/blur-mapred-hadoop2/pom.xml
index 1c9a7c6..4b4720c 100644
--- a/blur-mapred-hadoop2/pom.xml
+++ b/blur-mapred-hadoop2/pom.xml
@@ -35,6 +35,11 @@ under the License.
 
 	<dependencies>
 		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-mapred-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.zookeeper</groupId>
 			<artifactId>zookeeper</artifactId>
 			<version>${zookeeper.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
deleted file mode 100644
index 037edec..0000000
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * Base mapper class for Blur map reduce classes.
- * 
- * @param <KEY>
- * @param <VALUE>
- */
-public abstract class BaseBlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, Text, BlurMutate> {
-  protected BlurMutate _mutate;
-  protected Text _key;
-  protected Counter _recordCounter;
-  protected Counter _columnCounter;
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    _mutate = new BlurMutate();
-    _mutate.setRecord(new BlurRecord());
-    _key = new Text();
-    _recordCounter = context.getCounter(BlurCounters.RECORD_COUNT);
-    _columnCounter = context.getCounter(BlurCounters.COLUMN_COUNT);
-  }
-
-  @Override
-  protected abstract void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
deleted file mode 100644
index d32a3bd..0000000
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-public class BlurColumn implements Writable {
-
-  private String name;
-  private String value;
-
-  public BlurColumn() {
-  }
-
-  public BlurColumn(String name, String value) {
-    this.name = name;
-    this.value = value;
-  }
-
-  public boolean hasNull() {
-    if (name == null || value == null) {
-      return true;
-    }
-    return false;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    name = IOUtil.readString(in);
-    value = IOUtil.readString(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    IOUtil.writeString(out, name);
-    IOUtil.writeString(out, value);
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  public void setValue(String value) {
-    this.value = value;
-  }
-
-  @Override
-  public String toString() {
-    return "{name=" + name + ", value=" + value + "}";
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((name == null) ? 0 : name.hashCode());
-    result = prime * result + ((value == null) ? 0 : value.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    BlurColumn other = (BlurColumn) obj;
-    if (name == null) {
-      if (other.name != null)
-        return false;
-    } else if (!name.equals(other.name))
-      return false;
-    if (value == null) {
-      if (other.value != null)
-        return false;
-    } else if (!value.equals(other.value))
-      return false;
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
deleted file mode 100644
index 0691dce..0000000
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * The enum class used for all the internal counters during map reduce jobs.
- */
-public enum BlurCounters {
-  RECORD_COUNT, LUCENE_FIELD_COUNT, ROW_COUNT, RECORD_RATE, COPY_RATE, ROW_RATE, RECORD_DUPLICATE_COUNT, ROW_OVERFLOW_COUNT, ROW_DELETE_COUNT, COLUMN_COUNT
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
deleted file mode 100644
index 41769d0..0000000
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.blur.mapreduce.lib;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import lucene.security.DocumentVisibility;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * This utility code was taken from HBase to locate classes and the jars files
- * to add to the MapReduce Job.
- */
-public class BlurMapReduceUtil {
-
-  private final static Log LOG = LogFactory.getLog(BlurMapReduceUtil.class);
-
-  /**
-   * Add the Blur dependency jars as well as jars for any of the configured job
-   * classes to the job configuration, so that JobClient will ship them to the
-   * cluster and add them to the DistributedCache.
-   */
-  public static void addDependencyJars(Job job) throws IOException {
-    try {
-      addDependencyJars(job.getConfiguration(), org.apache.zookeeper.ZooKeeper.class, job.getMapOutputKeyClass(),
-          job.getMapOutputValueClass(), job.getInputFormatClass(), job.getOutputKeyClass(), job.getOutputValueClass(),
-          job.getOutputFormatClass(), job.getPartitionerClass(), job.getCombinerClass(), DocumentVisibility.class);
-      addAllJarsInBlurLib(job.getConfiguration());
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * Adds all the jars in the same path as the blur jar files.
-   * 
-   * @param conf
-   * @throws IOException
-   */
-  public static void addAllJarsInBlurLib(Configuration conf) throws IOException {
-    FileSystem localFs = FileSystem.getLocal(conf);
-    Set<String> jars = new HashSet<String>();
-    jars.addAll(conf.getStringCollection("tmpjars"));
-
-    String property = System.getProperty("java.class.path");
-    String[] files = property.split("\\:");
-
-    String blurLibPath = getPath("blur-", files);
-    if (blurLibPath == null) {
-      return;
-    }
-    List<String> pathes = getPathes(blurLibPath, files);
-    for (String pathStr : pathes) {
-      Path path = new Path(pathStr);
-      if (!localFs.exists(path)) {
-        LOG.warn("Could not validate jar file " + path);
-        continue;
-      }
-      jars.add(path.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toString());
-    }
-    if (jars.isEmpty()) {
-      return;
-    }
-    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
-  }
-
-  private static List<String> getPathes(String path, String[] files) {
-    List<String> pathes = new ArrayList<String>();
-    for (String file : files) {
-      if (file.startsWith(path)) {
-        pathes.add(file);
-      }
-    }
-    return pathes;
-  }
-
-  private static String getPath(String startsWith, String[] files) {
-    for (String file : files) {
-      int lastIndexOf = file.lastIndexOf('/');
-      String fileName = file.substring(lastIndexOf + 1);
-      if (fileName.startsWith(startsWith)) {
-        return file.substring(0, lastIndexOf);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Add the jars containing the given classes to the job's configuration such
-   * that JobClient will ship them to the cluster and add them to the
-   * DistributedCache.
-   */
-  public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
-    FileSystem localFs = FileSystem.getLocal(conf);
-    Set<String> jars = new HashSet<String>();
-    // Add jars that are already in the tmpjars variable
-    jars.addAll(conf.getStringCollection("tmpjars"));
-
-    // Add jars containing the specified classes
-    for (Class<?> clazz : classes) {
-      if (clazz == null) {
-        continue;
-      }
-
-      String pathStr = findOrCreateJar(clazz);
-      if (pathStr == null) {
-        LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
-        continue;
-      }
-      Path path = new Path(pathStr);
-      if (!localFs.exists(path)) {
-        LOG.warn("Could not validate jar file " + path + " for class " + clazz);
-        continue;
-      }
-      jars.add(path.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toString());
-    }
-    if (jars.isEmpty()) {
-      return;
-    }
-
-    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
-  }
-
-  /**
-   * If org.apache.hadoop.util.JarFinder is available (0.23+ hadoop), finds the
-   * Jar for a class or creates it if it doesn't exist. If the class is in a
-   * directory in the classpath, it creates a Jar on the fly with the contents
-   * of the directory and returns the path to that Jar. If a Jar is created, it
-   * is created in the system temporary directory.
-   * 
-   * Otherwise, returns an existing jar that contains a class of the same name.
-   * 
-   * @param my_class
-   *          the class to find.
-   * @return a jar file that contains the class, or null.
-   * @throws IOException
-   */
-  private static String findOrCreateJar(Class<?> my_class) throws IOException {
-    try {
-      Class<?> jarFinder = Class.forName("org.apache.hadoop.util.JarFinder");
-      // hadoop-0.23 has a JarFinder class that will create the jar
-      // if it doesn't exist. Note that this is needed to run the mapreduce
-      // unit tests post-0.23, because mapreduce v2 requires the relevant jars
-      // to be in the mr cluster to do output, split, etc. At unit test time,
-      // the hbase jars do not exist, so we need to create some. Note that we
-      // can safely fall back to findContainingJars for pre-0.23 mapreduce.
-      Method m = jarFinder.getMethod("getJar", Class.class);
-      return (String) m.invoke(null, my_class);
-    } catch (InvocationTargetException ite) {
-      // function was properly called, but threw it's own exception
-      throw new IOException(ite.getCause());
-    } catch (Exception e) {
-      // ignore all other exceptions. related to reflection failure
-    }
-
-    LOG.debug("New JarFinder: org.apache.hadoop.util.JarFinder.getJar " + "not available.  Using old findContainingJar");
-    return findContainingJar(my_class);
-  }
-
-  /**
-   * Find a jar that contains a class of the same name, if any. It will return a
-   * jar file, even if that is not the first thing on the class path that has a
-   * class with the same name.
-   * 
-   * This is shamelessly copied from JobConf
-   * 
-   * @param my_class
-   *          the class to find.
-   * @return a jar file that contains the class, or null.
-   * @throws IOException
-   */
-  private static String findContainingJar(Class<?> my_class) {
-    ClassLoader loader = my_class.getClassLoader();
-    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
-    try {
-      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
-        URL url = itr.nextElement();
-        if ("jar".equals(url.getProtocol())) {
-          String toReturn = url.getPath();
-          if (toReturn.startsWith("file:")) {
-            toReturn = toReturn.substring("file:".length());
-          }
-          // URLDecoder is a misnamed class, since it actually decodes
-          // x-www-form-urlencoded MIME type rather than actual
-          // URL encoding (which the file path has). Therefore it would
-          // decode +s to ' 's which is incorrect (spaces are actually
-          // either unencoded or encoded as "%20"). Replace +s first, so
-          // that they are kept sacred during the decoding process.
-          toReturn = toReturn.replaceAll("\\+", "%2B");
-          toReturn = URLDecoder.decode(toReturn, "UTF-8");
-          return toReturn.replaceAll("!.*$", "");
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
deleted file mode 100644
index 36d7f4f..0000000
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
+++ /dev/null
@@ -1,178 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.hadoop.io.Writable;
-
-/**
- * {@link BlurMutate} carries the {@link Record}s bound for the {@link Row} for
- * indexing. If this mutate represents a delete of the {@link Row} the recordId
- * of the {@link BlurRecord} is ignored.
- */
-public class BlurMutate implements Writable {
-
-  /**
-   * The {@link MUTATE_TYPE} controls the mutating of the {@link Row}. DELETE
-   * indicates that the {@link Row} is to be deleted. REPLACE indicates that the
-   * group of mutates are to replace the existing {@link Row}.
-   * 
-   * If both a DELETE and a REPLACE exist for a single {@link Row} in the
-   * {@link BlurOutputFormat} then the {@link Row} will be replaced not just
-   * deleted.
-   */
-  public enum MUTATE_TYPE {
-    /* ADD(0), UPDATE(1), */DELETE(2), REPLACE(3);
-    private int _value;
-
-    private MUTATE_TYPE(int value) {
-      _value = value;
-    }
-
-    public int getValue() {
-      return _value;
-    }
-
-    public MUTATE_TYPE find(int value) {
-      switch (value) {
-      // @TODO Updates through MR is going to be disabled
-      // case 0:
-      // return ADD;
-      // case 1:
-      // return UPDATE;
-      case 2:
-        return DELETE;
-      case 3:
-        return REPLACE;
-      default:
-        throw new RuntimeException("Value [" + value + "] not found.");
-      }
-    }
-  }
-
-  private MUTATE_TYPE _mutateType = MUTATE_TYPE.REPLACE;
-  private BlurRecord _record = new BlurRecord();
-
-  public BlurMutate() {
-
-  }
-
-  public BlurMutate(MUTATE_TYPE type, BlurRecord record) {
-    _mutateType = type;
-    _record = record;
-  }
-
-  public BlurMutate(MUTATE_TYPE type, String rowId) {
-    _mutateType = type;
-    _record.setRowId(rowId);
-  }
-
-  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId) {
-    _mutateType = type;
-    _record.setRowId(rowId);
-    _record.setRecordId(recordId);
-  }
-
-  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId, String family) {
-    _mutateType = type;
-    _record.setRowId(rowId);
-    _record.setRecordId(recordId);
-    _record.setFamily(family);
-  }
-
-  public BlurMutate addColumn(BlurColumn column) {
-    _record.addColumn(column);
-    return this;
-  }
-
-  public BlurMutate addColumn(String name, String value) {
-    return addColumn(new BlurColumn(name, value));
-  }
-
-  public BlurRecord getRecord() {
-    return _record;
-  }
-
-  public void setRecord(BlurRecord record) {
-    _record = record;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    IOUtil.writeVInt(out, _mutateType.getValue());
-    _record.write(out);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    _mutateType.find(IOUtil.readVInt(in));
-    _record.readFields(in);
-  }
-
-  public MUTATE_TYPE getMutateType() {
-    return _mutateType;
-  }
-
-  public BlurMutate setMutateType(MUTATE_TYPE mutateType) {
-    _mutateType = mutateType;
-    return this;
-  }
-
-  @Override
-  public String toString() {
-    return "BlurMutate [mutateType=" + _mutateType + ", record=" + _record + "]";
-  }
-
-  public BlurMutate setFamily(String family) {
-    _record.setFamily(family);
-    return this;
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((_mutateType == null) ? 0 : _mutateType.hashCode());
-    result = prime * result + ((_record == null) ? 0 : _record.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    BlurMutate other = (BlurMutate) obj;
-    if (_mutateType != other._mutateType)
-      return false;
-    if (_record == null) {
-      if (other._record != null)
-        return false;
-    } else if (!_record.equals(other._record))
-      return false;
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e8165b77/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
deleted file mode 100644
index a5b176e..0000000
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
+++ /dev/null
@@ -1,221 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.ShardUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus.State;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-
-public class BlurOutputCommitter extends OutputCommitter {
-
-  private static final Log LOG = LogFactory.getLog(BlurOutputCommitter.class);
-
-  @Override
-  public void setupJob(JobContext jobContext) throws IOException {
-    LOG.info("Running setup job.");
-  }
-
-  @Override
-  public void commitJob(JobContext jobContext) throws IOException {
-    // look through all the shards for attempts that need to be cleaned up.
-    // also find all the attempts that are finished
-    // then rename all the attempts jobs to commits
-    LOG.info("Commiting Job [{0}]", jobContext.getJobID());
-    Configuration configuration = jobContext.getConfiguration();
-    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
-    LOG.info("TableOutput path [{0}]", tableOutput);
-    makeSureNoEmptyShards(configuration, tableOutput);
-    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
-      LOG.info("Checking file status [{0}] with path [{1}]", fileStatus, fileStatus.getPath());
-      if (isShard(fileStatus)) {
-        commitOrAbortJob(jobContext, fileStatus.getPath(), true);
-      }
-    }
-    LOG.info("Commiting Complete [{0}]", jobContext.getJobID());
-  }
-
-  private void makeSureNoEmptyShards(Configuration configuration, Path tableOutput) throws IOException {
-    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    int shardCount = tableDescriptor.getShardCount();
-    for (int i = 0; i < shardCount; i++) {
-      String shardName = ShardUtil.getShardName(i);
-      fileSystem.mkdirs(new Path(tableOutput, shardName));
-    }
-  }
-
-  private void commitOrAbortJob(JobContext jobContext, Path shardPath, boolean commit) throws IOException {
-    LOG.info("CommitOrAbort [{0}] path [{1}]", commit, shardPath);
-    FileSystem fileSystem = shardPath.getFileSystem(jobContext.getConfiguration());
-    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        LOG.info("Checking path [{0}]", path);
-        if (path.getName().endsWith(".task_complete")) {
-          return true;
-        }
-        return false;
-      }
-    });
-    for (FileStatus fileStatus : listStatus) {
-      Path path = fileStatus.getPath();
-      LOG.info("Trying to commitOrAbort [{0}]", path);
-      String name = path.getName();
-      boolean taskComplete = name.endsWith(".task_complete");
-      if (fileStatus.isDirectory()) {
-        String taskAttemptName = getTaskAttemptName(name);
-        if (taskAttemptName == null) {
-          LOG.info("Dir name [{0}] not task attempt", name);
-          continue;
-        }
-        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptName);
-        if (taskAttemptID.getJobID().equals(jobContext.getJobID())) {
-          if (commit) {
-            if (taskComplete) {
-              fileSystem.rename(path, new Path(shardPath, taskAttemptName + ".commit"));
-              LOG.info("Committing [{0}] in path [{1}]", taskAttemptID, path);
-            } else {
-              fileSystem.delete(path, true);
-              LOG.info("Deleting tmp dir [{0}] in path [{1}]", taskAttemptID, path);
-            }
-          } else {
-            fileSystem.delete(path, true);
-            LOG.info("Deleting aborted job dir [{0}] in path [{1}]", taskAttemptID, path);
-          }
-        } else {
-          LOG.warn("TaskAttempt JobID [{0}] does not match JobContext JobId [{1}]", taskAttemptID.getJobID(),
-              jobContext.getJobID());
-        }
-      }
-    }
-  }
-
-  private String getTaskAttemptName(String name) {
-    int lastIndexOf = name.lastIndexOf('.');
-    if (lastIndexOf < 0) {
-      return null;
-    }
-    return name.substring(0, lastIndexOf);
-  }
-
-  private boolean isShard(FileStatus fileStatus) {
-    return isShard(fileStatus.getPath());
-  }
-
-  private boolean isShard(Path path) {
-    return path.getName().startsWith(BlurConstants.SHARD_PREFIX);
-  }
-
-  @Override
-  public void abortJob(JobContext jobContext, State state) throws IOException {
-    LOG.info("Abort Job [{0}]", jobContext.getJobID());
-    Configuration configuration = jobContext.getConfiguration();
-    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
-    makeSureNoEmptyShards(configuration, tableOutput);
-    FileSystem fileSystem = tableOutput.getFileSystem(configuration);
-    for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
-      if (isShard(fileStatus)) {
-        commitOrAbortJob(jobContext, fileStatus.getPath(), false);
-      }
-    }
-  }
-
-  @Override
-  public void cleanupJob(JobContext context) throws IOException {
-    LOG.info("Running job cleanup.");
-  }
-
-  @Override
-  public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
-    TaskType taskType = taskAttemptID.getTaskType();
-    if (taskType == TaskType.MAP && context.getNumReduceTasks() != 0) {
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-  @Override
-  public void setupTask(TaskAttemptContext context) throws IOException {
-    LOG.info("Running task setup.");
-  }
-
-  private static class Conf {
-    Path _newIndex;
-    Configuration _configuration;
-    TaskAttemptID _taskAttemptID;
-    Path _indexPath;
-    TableDescriptor _tableDescriptor;
-  }
-
-  @Override
-  public void commitTask(TaskAttemptContext context) throws IOException {
-    LOG.info("Running commit task.");
-    Conf conf = setup(context);
-    FileSystem fileSystem = conf._newIndex.getFileSystem(conf._configuration);
-    if (fileSystem.exists(conf._newIndex) && !fileSystem.isFile(conf._newIndex)) {
-      Path dst = new Path(conf._indexPath, conf._taskAttemptID.toString() + ".task_complete");
-      LOG.info("Committing [{0}] to [{1}]", conf._newIndex, dst);
-      fileSystem.rename(conf._newIndex, dst);
-    } else {
-      throw new IOException("Path [" + conf._newIndex + "] does not exist, can not commit.");
-    }
-  }
-
-  @Override
-  public void abortTask(TaskAttemptContext context) throws IOException {
-    LOG.info("Running abort task.");
-    Conf conf = setup(context);
-    FileSystem fileSystem = conf._newIndex.getFileSystem(conf._configuration);
-    LOG.info("abortTask - Deleting [{0}]", conf._newIndex);
-    fileSystem.delete(conf._newIndex, true);
-  }
-
-  private Conf setup(TaskAttemptContext context) throws IOException {
-    LOG.info("Setting up committer with task attempt [{0}]", context.getTaskAttemptID().toString());
-    Conf conf = new Conf();
-    conf._configuration = context.getConfiguration();
-    conf._tableDescriptor = BlurOutputFormat.getTableDescriptor(conf._configuration);
-    int shardCount = conf._tableDescriptor.getShardCount();
-    int attemptId = context.getTaskAttemptID().getTaskID().getId();
-    int shardId = attemptId % shardCount;
-    conf._taskAttemptID = context.getTaskAttemptID();
-    Path tableOutput = BlurOutputFormat.getOutputPath(conf._configuration);
-    String shardName = ShardUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
-    conf._indexPath = new Path(tableOutput, shardName);
-    conf._newIndex = new Path(conf._indexPath, conf._taskAttemptID.toString() + ".tmp");
-    return conf;
-  }
-
-}


Mime
View raw message