incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: More updates to map reduce. Getting closer to a functioning system.
Date Tue, 11 Dec 2012 02:07:12 GMT
Updated Branches:
  refs/heads/0.2-dev-removing-old-thrift beb05ce9b -> 8e6a0bf82


More updates to map reduce.  Getting closer to a functioning system.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/8e6a0bf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/8e6a0bf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/8e6a0bf8

Branch: refs/heads/0.2-dev-removing-old-thrift
Commit: 8e6a0bf828ecc3939f4fa12d606d737eec888480
Parents: beb05ce
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Dec 10 21:06:37 2012 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Dec 10 21:06:37 2012 -0500

----------------------------------------------------------------------
 .../apache/blur/utils/ThriftLuceneConversion.java  |   21 ++-
 .../apache/blur/example/BlurExampleIndexer.java    |   53 +++++
 .../blur/example/BlurExampleIndexerRebuild.java    |   79 +++++++
 .../org/apache/blur/example/BlurExampleMapper.java |   49 +++++
 .../blur/hadoop/io/AddDocumentsWritable.java       |   42 +++-
 .../apache/blur/hadoop/io/DeleteTermsWritable.java |   59 +++++
 .../apache/blur/hadoop/io/DocumentWritable.java    |    5 +
 .../apache/blur/hadoop/io/ScoreDocWritable.java    |   15 +-
 .../org/apache/blur/hadoop/io/TermWritable.java    |   47 ++++
 .../blur/hadoop/io/UpdatePackageWritable.java      |   47 ++++
 .../blur/hadoop/io/UpdatePackagesWritable.java     |   59 +++++
 .../org/apache/blur/mapred/BlurInputFormat.java    |   64 ++++--
 .../java/org/apache/blur/mapred/BlurReader.java    |  166 +++++++++++++++
 .../org/apache/blur/mapred/BlurRecordReader.java   |  108 ----------
 .../example/BlurExampleIndexerRebuild.java         |   79 -------
 .../example/BlurExampleIndexerUpdate.java          |   62 ------
 .../blur/mapreduce/example/BlurExampleMapper.java  |   51 -----
 .../apache/blur/mapreduce/lib/BlurInputFormat.java |    4 +-
 .../apache/blur/mapreduce/lib/BlurInputSplit.java  |    6 +-
 .../apache/blur/mapreduce/lib/BlurJobSetup.java    |   31 ---
 .../blur/mapreduce/lib/BlurMapReduceUtil.java      |   46 ++++
 .../org/apache/blur/mapreduce/lib/BlurMapper.java  |    9 +
 .../blur/mapreduce/lib/BlurOutputFormat.java       |   64 +++++-
 .../org/apache/blur/mapreduce/lib/BlurReader.java  |   29 +--
 .../org/apache/blur/mapreduce/lib/BlurWriter.java  |   88 ++++++--
 25 files changed, 865 insertions(+), 418 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-core/src/main/java/org/apache/blur/utils/ThriftLuceneConversion.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/ThriftLuceneConversion.java b/src/blur-core/src/main/java/org/apache/blur/utils/ThriftLuceneConversion.java
index affa0a6..741b0d1 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/ThriftLuceneConversion.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/ThriftLuceneConversion.java
@@ -17,6 +17,7 @@ import org.apache.blur.thrift.generated.TYPE;
 import org.apache.blur.thrift.generated.TopFieldDocs;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.DoubleField;
 import org.apache.lucene.document.Field.Store;
@@ -29,12 +30,15 @@ import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
 import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField.Type;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.Version;
 
 public class ThriftLuceneConversion {
 
@@ -54,7 +58,7 @@ public class ThriftLuceneConversion {
     }
     return topDocs;
   }
-  
+
   public static org.apache.lucene.search.TopFieldDocs setShardIndexTopFieldDocs(int shardIndex, org.apache.lucene.search.TopFieldDocs topDocs) {
     org.apache.lucene.search.ScoreDoc[] scoreDocs = topDocs.scoreDocs;
     for (int i = 0; i < scoreDocs.length; i++) {
@@ -462,4 +466,19 @@ public class ThriftLuceneConversion {
     LOG.error("Not supported YET!");
     return null;
   }
+
+  public static byte[] parseQuery(String luceneQuery, Version matchVersion, String field, Analyzer anlyzer) throws ParseException, IOException {
+    Query query = new QueryParser(matchVersion, field, anlyzer).parse(luceneQuery);
+    QueryWritable queryWritable = new QueryWritable(query);
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    queryWritable.write(buffer);
+    buffer.close();
+    return trim(buffer);
+  }
+
+  private static byte[] trim(DataOutputBuffer buffer) {
+    byte[] buf = new byte[buffer.getLength()];
+    System.arraycopy(buffer.getData(), 0, buf, 0, buf.length);
+    return buf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexer.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexer.java b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexer.java
new file mode 100644
index 0000000..359f279
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexer.java
@@ -0,0 +1,53 @@
+package org.apache.blur.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.BlurMapReduceUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class BlurExampleIndexer {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length != 3) {
+      System.err.println("Usage: blurexampleindexer <in> <connection string> <table>");
+      System.exit(2);
+    }
+
+    String input = otherArgs[0];
+    String connectionStr = otherArgs[1];
+    String table = otherArgs[2];
+
+    Job job = new Job(configuration, "Blur Indexer");
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setNumReduceTasks(0);
+    job.setMapperClass(BlurExampleMapper.class);
+    BlurMapReduceUtil.setupWriterJob(job, connectionStr, table);
+
+    FileInputFormat.addInputPath(job, new Path(input));
+    boolean waitForCompletion = job.waitForCompletion(true);
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexerRebuild.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexerRebuild.java b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexerRebuild.java
new file mode 100644
index 0000000..51ca611
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexerRebuild.java
@@ -0,0 +1,79 @@
+package org.apache.blur.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.BlurTask;
+import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.ColumnDefinition;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+
+public class BlurExampleIndexerRebuild {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length != 2) {
+      System.err.println("Usage: blurindexer <in> <out>");
+      System.exit(2);
+    }
+
+    AnalyzerDefinition ad = new AnalyzerDefinition();
+    ad.defaultDefinition = new ColumnDefinition(StandardAnalyzer.class.getName(), true, null);
+
+    TableDescriptor descriptor = new TableDescriptor();
+    descriptor.analyzerDefinition = ad;
+    descriptor.compressionBlockSize = 32768;
+    descriptor.compressionClass = DefaultCodec.class.getName();
+    descriptor.isEnabled = true;
+    descriptor.name = "test-table";
+    descriptor.shardCount = 1;
+    descriptor.cluster = "default";
+    descriptor.tableUri = "./blur-testing";
+
+    BlurTask blurTask = new BlurTask();
+    blurTask.setTableDescriptor(descriptor);
+    blurTask.setIndexingType(INDEXING_TYPE.REBUILD);
+    blurTask.setOptimize(false);
+    Job job = blurTask.configureJob(configuration);
+    job.setJarByClass(BlurExampleIndexerRebuild.class);
+    job.setMapperClass(BlurExampleMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
+    long s = System.currentTimeMillis();
+    boolean waitForCompletion = job.waitForCompletion(true);
+    long e = System.currentTimeMillis();
+    System.out.println("Completed in [" + (e - s) + " ms]");
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleMapper.java b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleMapper.java
new file mode 100644
index 0000000..77378de
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleMapper.java
@@ -0,0 +1,49 @@
+package org.apache.blur.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.blur.hadoop.io.AddDocumentsWritable;
+import org.apache.blur.hadoop.io.DocumentWritable;
+import org.apache.blur.mapreduce.lib.BlurMapper;
+import org.apache.blur.thrift.generated.Field;
+import org.apache.blur.thrift.generated.TYPE;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+public class BlurExampleMapper extends BlurMapper<LongWritable, Text> {
+
+  private DocumentWritable documentWritable = new DocumentWritable();
+  private AddDocumentsWritable addDocumentsWritable = new AddDocumentsWritable();
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    addDocumentsWritable.setDocumentWritable(documentWritable);
+  }
+
+  @Override
+  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException {
+    documentWritable.addToFields(new Field("body", ByteBuffer.wrap(value.getBytes()), TYPE.TEXT, 1.0));
+
+    // NOTE:We are using the original value to distribute the documents evenly
+    // across all of the reducer (if any).
+    context.write(value, addDocumentsWritable);
+    context.progress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/AddDocumentsWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/AddDocumentsWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/AddDocumentsWritable.java
index dc35e1a..22fa9d7 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/AddDocumentsWritable.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/AddDocumentsWritable.java
@@ -6,28 +6,58 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.blur.thrift.generated.Document;
 import org.apache.hadoop.io.Writable;
 
 public class AddDocumentsWritable extends MutateWritable implements Writable {
-  
+
   private List<DocumentWritable> documents = new ArrayList<DocumentWritable>();
 
-  public List<DocumentWritable> getDocuments() {
+  public List<DocumentWritable> getDocumentWritables() {
     return documents;
   }
 
-  public void setDocuments(List<DocumentWritable> documents) {
+  public void setDocumentWritables(List<DocumentWritable> documents) {
     this.documents = documents;
   }
 
+  public List<? extends Document> getDocuments() {
+    return documents;
+  }
+
+  public void setDocuments(List<Document> docs) {
+    documents.clear();
+    for (Document document : docs) {
+      if (document instanceof DocumentWritable) {
+        documents.add((DocumentWritable) document);
+      } else {
+        documents.add(new DocumentWritable(document));
+      }
+    }
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
-    
+    out.writeInt(documents.size());
+    for (DocumentWritable documentWritable : documents) {
+      documentWritable.write(out);
+    }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    
+    documents.clear();
+    int size = in.readInt();
+    for (int i = 0; i < size; i++) {
+      DocumentWritable documentWritable = new DocumentWritable();
+      documentWritable.readFields(in);
+      documents.add(documentWritable);
+    }
+  }
+
+  public void setDocumentWritable(DocumentWritable documentWritable) {
+    documents.clear();
+    documents.add(documentWritable);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DeleteTermsWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DeleteTermsWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DeleteTermsWritable.java
new file mode 100644
index 0000000..5ae1fb5
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DeleteTermsWritable.java
@@ -0,0 +1,59 @@
+package org.apache.blur.hadoop.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.thrift.generated.Term;
+import org.apache.hadoop.io.Writable;
+
+public class DeleteTermsWritable extends MutateWritable implements Writable {
+
+  private List<TermWritable> termWritables = new ArrayList<TermWritable>();
+
+  public List<TermWritable> getTermWritables() {
+    return termWritables;
+  }
+
+  public void setTermWritables(List<TermWritable> termWritables) {
+    this.termWritables = termWritables;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(termWritables.size());
+    for (TermWritable termWritable : termWritables) {
+      termWritable.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    termWritables.clear();
+    int size = in.readInt();
+    for (int i = 0; i < size; i++) {
+      TermWritable termWritable = new TermWritable();
+      termWritable.readFields(in);
+      termWritables.add(termWritable);
+    }
+  }
+
+  public void setTerms(List<Term> terms) {
+    termWritables.clear();
+    for (Term term : terms) {
+      if (term instanceof Term) {
+        termWritables.add((TermWritable) term);
+      } else {
+        termWritables.add(new TermWritable(term));
+      }
+    }
+
+  }
+
+  public List<? extends Term> getTerms() {
+    return termWritables;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DocumentWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DocumentWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DocumentWritable.java
index d9ebe4c..05310d6 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DocumentWritable.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DocumentWritable.java
@@ -9,6 +9,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 
+@SuppressWarnings("serial")
 public class DocumentWritable extends Document implements Writable {
 
   public DocumentWritable() {
@@ -44,4 +45,8 @@ public class DocumentWritable extends Document implements Writable {
       throw new IOException(e);
     }
   }
+
+  public void setDocument(Document document) {
+    setFields(document.getFields());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/ScoreDocWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/ScoreDocWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/ScoreDocWritable.java
index 037e953..c03d5c0 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/ScoreDocWritable.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/ScoreDocWritable.java
@@ -9,16 +9,17 @@ import org.apache.hadoop.io.Writable;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 
+@SuppressWarnings("serial")
 public class ScoreDocWritable extends ScoreDoc implements Writable {
-  
+
   public ScoreDocWritable() {
-    
+
   }
-  
+
   public ScoreDocWritable(ScoreDoc doc) {
     super(doc);
   }
-  
+
   public ScoreDocWritable(ScoreDocWritable doc) {
     super(doc);
   }
@@ -44,4 +45,10 @@ public class ScoreDocWritable extends ScoreDoc implements Writable {
       throw new IOException(e);
     }
   }
+
+  public void setScoreDoc(ScoreDoc scoreDoc) {
+    setDocLocation(scoreDoc.getDocLocation());
+    setFields(scoreDoc.getFields());
+    setScore(scoreDoc.getScore());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/TermWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/TermWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/TermWritable.java
new file mode 100644
index 0000000..7dab682
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/TermWritable.java
@@ -0,0 +1,47 @@
+package org.apache.blur.hadoop.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.blur.thrift.generated.Term;
+import org.apache.hadoop.io.Writable;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+@SuppressWarnings("serial")
+public class TermWritable extends Term implements Writable {
+
+  public TermWritable() {
+  }
+
+  public TermWritable(Term term) {
+    super(term);
+  }
+
+  public TermWritable(TermWritable term) {
+    super(term);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    HadoopTransport transport = new HadoopTransport(out);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    try {
+      this.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    HadoopTransport transport = new HadoopTransport(in);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    try {
+      this.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/UpdatePackageWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/UpdatePackageWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/UpdatePackageWritable.java
new file mode 100644
index 0000000..625a7c8
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/UpdatePackageWritable.java
@@ -0,0 +1,47 @@
+package org.apache.blur.hadoop.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.blur.thrift.generated.UpdatePackage;
+import org.apache.hadoop.io.Writable;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+@SuppressWarnings("serial")
+public class UpdatePackageWritable extends UpdatePackage implements Writable {
+
+  public UpdatePackageWritable() {
+  }
+
+  public UpdatePackageWritable(UpdatePackage updatePackage) {
+    super(updatePackage);
+  }
+
+  public UpdatePackageWritable(UpdatePackageWritable updatePackage) {
+    super(updatePackage);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    HadoopTransport transport = new HadoopTransport(out);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    try {
+      this.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    HadoopTransport transport = new HadoopTransport(in);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    try {
+      this.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/UpdatePackagesWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/UpdatePackagesWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/UpdatePackagesWritable.java
new file mode 100644
index 0000000..c05a298
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/UpdatePackagesWritable.java
@@ -0,0 +1,59 @@
+package org.apache.blur.hadoop.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.thrift.generated.UpdatePackage;
+import org.apache.hadoop.io.Writable;
+
+public class UpdatePackagesWritable extends MutateWritable implements Writable {
+
+  private List<UpdatePackageWritable> updatePackageWritables = new ArrayList<UpdatePackageWritable>();
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    int size = updatePackageWritables.size();
+    out.writeInt(size);
+    for (UpdatePackageWritable writable : updatePackageWritables) {
+      writable.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    updatePackageWritables.clear();
+    int size = in.readInt();
+    for (int i = 0; i < size; i++) {
+      UpdatePackageWritable updatePackageWritable = new UpdatePackageWritable();
+      updatePackageWritable.readFields(in);
+      updatePackageWritables.add(updatePackageWritable);
+    }
+  }
+
+  public List<? extends UpdatePackage> getUpdatePackages() {
+    return updatePackageWritables;
+  }
+
+  public void setUpdatePackages(List<? extends UpdatePackage> updatePackages) {
+    updatePackageWritables.clear();
+    for (UpdatePackage updatePackage : updatePackages) {
+      if (updatePackage instanceof UpdatePackageWritable) {
+        updatePackageWritables.add((UpdatePackageWritable) updatePackage);
+      } else {
+        updatePackageWritables.add(new UpdatePackageWritable(updatePackage));
+      }
+    }
+  }
+
+  public List<UpdatePackageWritable> getUpdatePackageWritables() {
+    return updatePackageWritables;
+  }
+
+  public void setUpdatePackageWritabless(List<UpdatePackageWritable> updatePackageWritables) {
+    this.updatePackageWritables = updatePackageWritables;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurInputFormat.java b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurInputFormat.java
index db17abf..2a142d5 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurInputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurInputFormat.java
@@ -16,27 +16,53 @@ package org.apache.blur.mapred;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.hadoop.io.Text;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_CONNECTION_STR;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_TABLE_NAME;
+
+import java.io.IOException;
+
+import org.apache.blur.hadoop.io.DocumentWritable;
+import org.apache.blur.hadoop.io.ScoreDocWritable;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.thrift.TException;
+
+
+public class BlurInputFormat implements InputFormat<ScoreDocWritable, DocumentWritable> {
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    Configuration configuration = job;
+    String connectionStr = configuration.get(BLUR_CONNECTION_STR);
+    String tableName = configuration.get(BLUR_TABLE_NAME);
+    Iface client = BlurClient.getClient(connectionStr);
+    try {
+      TableDescriptor describe = client.describe(tableName);
+      int shardCount = describe.getShardCount();
+      InputSplit[] splits = new InputSplit[shardCount];
+      for (int i = 0; i < shardCount; i++) {
+        splits[i] = null;
+      }
+      return splits;
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
 
+  @Override
+  public RecordReader<ScoreDocWritable, DocumentWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    return new BlurReader(split,job,reporter);
+  }
 
-public abstract class BlurInputFormat implements InputFormat<Text, BlurRecord> {
-
-//  @Override
-//  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-//    List<?> splits = new ArrayList<Object>();
-//    Path[] paths = FileInputFormat.getInputPaths(job);
-//    for (Path path : paths) {
-//      org.apache.blur.mapreduce.lib.BlurInputFormat.findAllSegments((Configuration) job, path, splits);
-//    }
-//    return splits.toArray(new InputSplit[] {});
-//  }
-//
-//  @Override
-//  public RecordReader<Text, BlurRecord> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
-//    reporter.setStatus(split.toString());
-//    return new BlurRecordReader(split, job);
-//  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurReader.java b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurReader.java
new file mode 100644
index 0000000..1b737c4
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurReader.java
@@ -0,0 +1,166 @@
+package org.apache.blur.mapred;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_CONNECTION_STR;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_SESSION_ID;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_TABLE_NAME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.blur.hadoop.io.DocumentWritable;
+import org.apache.blur.hadoop.io.ScoreDocWritable;
+import org.apache.blur.mapreduce.lib.BlurInputSplit;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Document;
+import org.apache.blur.thrift.generated.QueryArgs;
+import org.apache.blur.thrift.generated.ScoreDoc;
+import org.apache.blur.thrift.generated.Session;
+import org.apache.blur.thrift.generated.TopFieldDocs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.thrift.TException;
+
+
+public class BlurReader implements RecordReader<ScoreDocWritable, DocumentWritable> {
+
+  private Iface client;
+  private Session session;
+  private Set<String> fields;
+  private QueryArgs queryArgs;
+
+  private long totalHits = -1;
+  private long totalPosition = -1;
+
+  private int position = 0;
+  private List<ScoreDoc> scoreDocs = new ArrayList<ScoreDoc>();
+  private List<Document> documents = new ArrayList<Document>();
+  
+  public BlurReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    BlurInputSplit blurInputSplit = (BlurInputSplit) split;
+    
+    Configuration configuration = job;
+
+    String connectionStr = configuration.get(BLUR_CONNECTION_STR);
+    String sessionId = configuration.get(BLUR_SESSION_ID);
+    String tableName = configuration.get(BLUR_TABLE_NAME);
+    String[] fieldArray = configuration.getStrings(BLUR_TABLE_NAME);
+    if (fieldArray != null) {
+      fields = new HashSet<String>(Arrays.asList(fieldArray));
+    }
+
+    configureForThisSplit(blurInputSplit, queryArgs);
+
+    session = new Session(sessionId, tableName);
+    client = BlurClient.getClient(connectionStr);
+    search();
+  }
+
+  @Override
+  public boolean next(ScoreDocWritable key, DocumentWritable value) throws IOException {
+    totalPosition++;
+    position++;
+    if (totalPosition >= totalHits) {
+      return false;
+    }
+    if (position >= documents.size()) {
+      search();
+      position++;
+      if (totalPosition < totalHits) {
+        key.setScoreDoc(scoreDocs.get(position));
+        value.setDocument(documents.get(position));
+        return true;
+      } else {
+        return false;
+      }
+    }
+    key.setScoreDoc(scoreDocs.get(position));
+    value.setDocument(documents.get(position));
+    return true;
+  }
+
+  @Override
+  public ScoreDocWritable createKey() {
+    return new ScoreDocWritable();
+  }
+
+  @Override
+  public DocumentWritable createValue() {
+    return new DocumentWritable();
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return totalPosition;
+  }
+
+  @Override
+  public void close() throws IOException {
+    
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    if (totalHits < 0) {
+      return 0;
+    }
+    return totalPosition / (float) totalHits;
+  }
+
+  private void search() throws IOException {
+    try {
+      if (!scoreDocs.isEmpty()) {
+        queryArgs.setAfter(scoreDocs.get(scoreDocs.size() - 1));
+      }
+      List<TopFieldDocs> results = client.search(session, queryArgs);
+      TopFieldDocs topFieldDocs = results.get(0);
+      totalHits = topFieldDocs.getTotalHits();
+      scoreDocs = topFieldDocs.getScoreDocs();
+      documents = client.doc(session, getDocLocations(topFieldDocs), fields);
+      position = -1;
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+  
+  private List<Long> getDocLocations(TopFieldDocs topFieldDocs) {
+    List<ScoreDoc> scoreDocs = topFieldDocs.scoreDocs;
+    List<Long> docLocations = new ArrayList<Long>();
+    for (ScoreDoc scoreDoc : scoreDocs) {
+      docLocations.add(scoreDoc.getDocLocation());
+    }
+    return docLocations;
+  }
+  
+  private void configureForThisSplit(BlurInputSplit split, QueryArgs queryArgs) {
+    int shard = split.getShard();
+    queryArgs.setShardIndexes(Arrays.asList(shard));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurRecordReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurRecordReader.java b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurRecordReader.java
deleted file mode 100644
index 7b34289..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurRecordReader.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package org.apache.blur.mapred;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.blur.mapreduce.lib.BlurInputSplit;
-import org.apache.blur.mapreduce.lib.Utils;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.utils.RowDocumentUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.store.Directory;
-
-
-public abstract class BlurRecordReader implements RecordReader<Text, BlurRecord> {
-
-//  private IndexReader reader;
-//  private Directory directory;
-//  private int startingDocId;
-//  private int endingDocId;
-//  private int position;
-//
-//  public BlurRecordReader(InputSplit split, JobConf job) throws IOException {
-//    BlurInputSplit blurSplit = (BlurInputSplit) split;
-//    Path path = blurSplit.getIndexPath();
-//    String segmentName = blurSplit.getSegmentName();
-//    startingDocId = blurSplit.getStartingDocId();
-//    endingDocId = blurSplit.getEndingDocId();
-//    directory = new HdfsDirectory(path);
-//
-//    IndexCommit commit = Utils.findLatest(directory);
-//    reader = Utils.openSegmentReader(directory, commit, segmentName, Utils.getTermInfosIndexDivisor(job));
-//    int maxDoc = reader.maxDoc();
-//    if (endingDocId >= maxDoc) {
-//      endingDocId = maxDoc - 1;
-//    }
-//    position = startingDocId - 1;
-//  }
-//
-//  @Override
-//  public boolean next(Text key, BlurRecord value) throws IOException {
-//    do {
-//      position++;
-//      if (position > endingDocId) {
-//        return false;
-//      }
-//    } while (reader.isDeleted(position));
-//    readDocument(key, value);
-//    return true;
-//  }
-//
-//  private void readDocument(Text rowid, BlurRecord record) throws CorruptIndexException, IOException {
-//    Document document = reader.document(position);
-//    record.reset();
-//    rowid.set(RowDocumentUtil.readRecord(document, record));
-//  }
-//
-//  @Override
-//  public Text createKey() {
-//    return new Text();
-//  }
-//
-//  @Override
-//  public BlurRecord createValue() {
-//    return new BlurRecord();
-//  }
-//
-//  @Override
-//  public long getPos() throws IOException {
-//    return position;
-//  }
-//
-//  @Override
-//  public void close() throws IOException {
-//    reader.close();
-//    directory.close();
-//  }
-//
-//  @Override
-//  public float getProgress() throws IOException {
-//    int total = endingDocId - startingDocId;
-//    return (float) position / (float) total;
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java
deleted file mode 100644
index 29eba4e..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.blur.mapreduce.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.mapreduce.BlurTask;
-import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
-import org.apache.blur.thrift.generated.AnalyzerDefinition;
-import org.apache.blur.thrift.generated.ColumnDefinition;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-
-
-public class BlurExampleIndexerRebuild {
-
-  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-    Configuration configuration = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: blurindexer <in> <out>");
-      System.exit(2);
-    }
-
-    AnalyzerDefinition ad = new AnalyzerDefinition();
-    ad.defaultDefinition = new ColumnDefinition(StandardAnalyzer.class.getName(), true, null);
-
-    TableDescriptor descriptor = new TableDescriptor();
-    descriptor.analyzerDefinition = ad;
-    descriptor.compressionBlockSize = 32768;
-    descriptor.compressionClass = DefaultCodec.class.getName();
-    descriptor.isEnabled = true;
-    descriptor.name = "test-table";
-    descriptor.shardCount = 1;
-    descriptor.cluster = "default";
-    descriptor.tableUri = "./blur-testing";
-
-    BlurTask blurTask = new BlurTask();
-    blurTask.setTableDescriptor(descriptor);
-    blurTask.setIndexingType(INDEXING_TYPE.REBUILD);
-    blurTask.setOptimize(false);
-    Job job = blurTask.configureJob(configuration);
-    job.setJarByClass(BlurExampleIndexerRebuild.class);
-    job.setMapperClass(BlurExampleMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setOutputFormatClass(TextOutputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
-    long s = System.currentTimeMillis();
-    boolean waitForCompletion = job.waitForCompletion(true);
-    long e = System.currentTimeMillis();
-    System.out.println("Completed in [" + (e - s) + " ms]");
-    System.exit(waitForCompletion ? 0 : 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java
deleted file mode 100644
index 87cb59d..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.blur.mapreduce.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
-import org.apache.blur.mapreduce.BlurTask;
-import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-
-public class BlurExampleIndexerUpdate {
-
-  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-    Configuration configuration = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: blurindexer <in> <out>");
-      System.exit(2);
-    }
-
-    ZookeeperClusterStatus status = new ZookeeperClusterStatus("localhost");
-    TableDescriptor descriptor = status.getTableDescriptor(false, "default", "test-table");
-
-    BlurTask blurTask = new BlurTask();
-    blurTask.setTableDescriptor(descriptor);
-    blurTask.setIndexingType(INDEXING_TYPE.UPDATE);
-    Job job = blurTask.configureJob(configuration);
-    job.setJarByClass(BlurExampleIndexerUpdate.class);
-    job.setMapperClass(BlurExampleMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setOutputFormatClass(TextOutputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
-    boolean waitForCompletion = job.waitForCompletion(true);
-    System.exit(waitForCompletion ? 0 : 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java
deleted file mode 100644
index 20b86d2..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.blur.mapreduce.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.blur.mapreduce.BlurMapper;
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.blur.mapreduce.BlurMutate.MUTATE_TYPE;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-
-public class BlurExampleMapper extends BlurMapper<LongWritable, Text> {
-
-  @Override
-  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException {
-    BlurRecord record = _mutate.getRecord();
-    record.clearColumns();
-    String str = value.toString();
-    String[] split = str.split("\\t");
-    record.setRowId(UUID.randomUUID().toString());
-    record.setRecordId(UUID.randomUUID().toString());
-    record.setFamily("cf1");
-    for (int i = 0; i < split.length; i++) {
-      record.addColumn("c" + i, split[i]);
-      _fieldCounter.increment(1);
-    }
-    byte[] bs = record.getRowId().getBytes();
-    _key.set(bs, 0, bs.length);
-    _mutate.setMutateType(MUTATE_TYPE.ADD);
-    context.write(_key, _mutate);
-    _recordCounter.increment(1);
-    context.progress();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
index e69b6d2..734edd3 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
@@ -16,8 +16,8 @@ package org.apache.blur.mapreduce.lib;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_CONNECTION_STR;
-import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_TABLE_NAME;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_CONNECTION_STR;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_TABLE_NAME;
 
 import java.io.IOException;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
index 4c76d9b..ec3b0c6 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
@@ -27,7 +27,7 @@ import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 
-public class BlurInputSplit extends InputSplit implements Writable {
+public class BlurInputSplit extends InputSplit implements org.apache.hadoop.mapred.InputSplit, Writable {
 
   private int shard;
 
@@ -58,13 +58,13 @@ public class BlurInputSplit extends InputSplit implements Writable {
   }
 
   @Override
-  public long getLength() throws IOException, InterruptedException {
+  public long getLength() {
     // Get the result size of this shard.
     return 0;
   }
 
   @Override
-  public String[] getLocations() throws IOException, InterruptedException {
+  public String[] getLocations() {
     // Get the location of this shard.
     return new String[] {};
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurJobSetup.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurJobSetup.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurJobSetup.java
deleted file mode 100644
index 8faeda8..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurJobSetup.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.blur.thrift.generated.QueryArgs;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-public class BlurJobSetup {
-  
-  public static final String BLUR_QUERY = "blur.query";
-  public static final String BLUR_FIELDS = "blur.fields";
-  public static final String BLUR_TABLE_NAME = "blur.tableName";
-  public static final String BLUR_SESSION_ID = "blur.sessionId";
-  public static final String BLUR_CONNECTION_STR = "blur.connectionStr";
-  public static final String BLUR_WAIT_TO_BE_VISIBLE = "blur.waitToBeVisible";
-  public static final String BLUR_WRITE_AHEAD_LOG = "blur.writeAheadLog";
-  
-  public static void setupJob(Job job, String connectionStr, String tableName, Set<String> fields, QueryArgs queryArgs) throws IOException {
-    Configuration configuration = job.getConfiguration();
-    configuration.set(BLUR_CONNECTION_STR, connectionStr);
-    configuration.set(BLUR_SESSION_ID, UUID.randomUUID().toString());
-    configuration.set(BLUR_TABLE_NAME, tableName);
-    configuration.set(BLUR_QUERY, BlurUtil.toJson(queryArgs));
-    configuration.setStrings(BLUR_FIELDS, fields.toArray(new String[fields.size()]));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
new file mode 100644
index 0000000..1adc46e
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
@@ -0,0 +1,46 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.blur.thrift.generated.QueryArgs;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+public class BlurMapReduceUtil {
+
+  public static final String BLUR_QUERY = "blur.query";
+  public static final String BLUR_FIELDS = "blur.fields";
+  public static final String BLUR_TABLE_NAME = "blur.tableName";
+  public static final String BLUR_SESSION_ID = "blur.sessionId";
+  public static final String BLUR_CONNECTION_STR = "blur.connectionStr";
+  public static final String BLUR_WAIT_TO_BE_VISIBLE = "blur.waitToBeVisible";
+  public static final String BLUR_WRITE_AHEAD_LOG = "blur.writeAheadLog";
+  public static final String BLUR_AUTO_ASSIGN_SHARD = "blur.auto.assign.shard";
+
+  public static void setupReaderJob(Job job, String connectionStr, String tableName, Set<String> fields, QueryArgs queryArgs) throws IOException {
+    Configuration configuration = job.getConfiguration();
+    configuration.set(BLUR_CONNECTION_STR, connectionStr);
+    configuration.set(BLUR_SESSION_ID, UUID.randomUUID().toString());
+    configuration.set(BLUR_TABLE_NAME, tableName);
+    configuration.set(BLUR_QUERY, BlurUtil.toJson(queryArgs));
+    configuration.setStrings(BLUR_FIELDS, fields.toArray(new String[fields.size()]));
+    job.setInputFormatClass(BlurInputFormat.class);
+  }
+
+  public static void setupWriterJob(Job job, String connectionStr, String tableName, boolean waitToBeVisible, boolean writeAheadLog, boolean autoAssignShards) throws IOException {
+    Configuration configuration = job.getConfiguration();
+    configuration.set(BLUR_CONNECTION_STR, connectionStr);
+    configuration.set(BLUR_TABLE_NAME, tableName);
+    configuration.setBoolean(BLUR_WAIT_TO_BE_VISIBLE, waitToBeVisible);
+    configuration.setBoolean(BLUR_WRITE_AHEAD_LOG, writeAheadLog);
+    configuration.setBoolean(BLUR_AUTO_ASSIGN_SHARD, autoAssignShards);
+    job.setOutputFormatClass(BlurOutputFormat.class);
+  }
+
+  public static void setupWriterJob(Job job, String connectionStr, String tableName) throws IOException {
+    setupWriterJob(job, connectionStr, tableName, false, false, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapper.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapper.java
new file mode 100644
index 0000000..ed02d23
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapper.java
@@ -0,0 +1,9 @@
+package org.apache.blur.mapreduce.lib;
+
+import org.apache.blur.hadoop.io.MutateWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public abstract class BlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE,  WritableComparable<?>, MutateWritable> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index 374a7a6..f4688c6 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -16,32 +16,82 @@ package org.apache.blur.mapreduce.lib;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.*;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_TABLE_NAME;
+
 import java.io.IOException;
 
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.hadoop.io.Text;
+import org.apache.blur.hadoop.io.MutateWritable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
 
+/**
+ * The {@link BlurOutputFormat} writes {@link MutateWritable} objects (
+ * {@link AddDocumentsWritable}, {@link DeleteTermsWritable}, or
+ * {@link UpdatePackagesWritable}) to the enabled blur table.<br/>
+ * <br/>
+ * 
+ * In {@link BlurMapReduceUtil} the following configuration options need to be
+ * set.<br/>
+ * <br/>
+ * BLUR_TABLE_NAME - Required - Sets the blur table name to receive the updates.<br/>
+ * BLUR_CONNECTION_STR - Required - Sets the connectionStr to the cluster.<br/>
+ * BLUR_WAIT_TO_BE_VISIBLE - Optional (Default false) - Sets whether the
+ * OutputFormat should wait for all the updates to be visible before closing.<br/>
+ * BLUR_WRITE_AHEAD_LOG - Optional (Default false) - Sets whether the data
+ * should be written to the write ahead log before performing the operation.<br/>
+ * BLUR_AUTO_ASSIGN_SHARD - Optional - Whether or not to assign the updates to a
+ * shard or allow the Blur server to assign the location automatically.<br/>
+ * <br/>
+ * 
+ * NOTE: If the OutputFormat is run in a Map task then auto assignment is enabled.
+ */
+public class BlurOutputFormat extends OutputFormat<Writable, MutateWritable> {
 
-public class BlurOutputFormat extends OutputFormat<Text, BlurRecord> {
+  private static final Log LOG = LogFactory.getLog(BlurOutputFormat.class);
 
   @Override
   public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-
+    Configuration configuration = context.getConfiguration();
+    String connectionStr = configuration.get(BLUR_CONNECTION_STR);
+    if (connectionStr == null) {
+      throw new IOException("\"" + BLUR_CONNECTION_STR + "\" null in the configuration.");
+    }
+    String table = configuration.get(BLUR_TABLE_NAME);
+    if (table == null) {
+      throw new IOException("\"" + BLUR_TABLE_NAME + "\" null in the configuration.");
+    }
   }
 
   @Override
-  public RecordWriter<Text, BlurRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-    return null;
+  public RecordWriter<Writable, MutateWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    Configuration configuration = context.getConfiguration();
+    int shardIndex = -1;
+    if (taskAttemptID.isMap()) {
+      LOG.info("Blur output in a map is forced to have auto assigned shards. (MutateOptions.shardIndex = -1)");
+    } else if (configuration.getBoolean(BLUR_AUTO_ASSIGN_SHARD, false)) {
+      LOG.info("Blur output is set to have auto assigned shards. (MutateOptions.shardIndex = -1)");
+    } else {
+      TaskID taskID = taskAttemptID.getTaskID();
+      shardIndex = taskID.getId();
+      LOG.info("Blur output is set to shard index [" + shardIndex + "].");
+    }
+    return new BlurWriter(context.getConfiguration(), shardIndex);
   }
 
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-    return null;
+    return new BlurOutputCommitter(context);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
index 662daf9..8c3bcbc 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
@@ -16,9 +16,9 @@ package org.apache.blur.mapreduce.lib;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_CONNECTION_STR;
-import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_SESSION_ID;
-import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_TABLE_NAME;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_CONNECTION_STR;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_SESSION_ID;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_TABLE_NAME;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -68,21 +68,6 @@ public class BlurReader extends RecordReader<ScoreDocWritable, DocumentWritable>
       fields = new HashSet<String>(Arrays.asList(fieldArray));
     }
 
-    // queryArgs = new QueryArgs();
-    // queryArgs.setNumberToFetch(1000);
-    // // read query args here
-    //
-    // Query query = new MatchAllDocsQuery();
-    //
-    // QueryWritable queryWritable = new QueryWritable(query);
-    // DataOutputBuffer buffer = new DataOutputBuffer();
-    // queryWritable.write(buffer);
-    // buffer.close();
-    //
-    // queryArgs.setQuery(trim(buffer));
-    //
-    // // ////////
-
     configureForThisSplit((BlurInputSplit) split, queryArgs);
 
     session = new Session(sessionId, tableName);
@@ -90,12 +75,6 @@ public class BlurReader extends RecordReader<ScoreDocWritable, DocumentWritable>
     search();
   }
 
-  // private static byte[] trim(DataOutputBuffer buffer) {
-  // byte[] buf = new byte[buffer.getLength()];
-  // System.arraycopy(buffer.getData(), 0, buf, 0, buf.length);
-  // return buf;
-  // }
-
   @Override
   public void close() throws IOException {
 
@@ -140,11 +119,13 @@ public class BlurReader extends RecordReader<ScoreDocWritable, DocumentWritable>
 
   @Override
   public ScoreDocWritable getCurrentKey() throws IOException, InterruptedException {
+    //@TODO reuse an object.
     return new ScoreDocWritable(scoreDocs.get(position));
   }
 
   @Override
   public DocumentWritable getCurrentValue() throws IOException, InterruptedException {
+  //@TODO reuse an object.
     return new DocumentWritable(documents.get(position));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8e6a0bf8/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
index b80caf5..3bf6d21 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
@@ -1,38 +1,66 @@
 package org.apache.blur.mapreduce.lib;
 
-import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_CONNECTION_STR;
-import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_TABLE_NAME;
-import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_WAIT_TO_BE_VISIBLE;
-import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_WRITE_AHEAD_LOG;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_CONNECTION_STR;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_TABLE_NAME;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_WAIT_TO_BE_VISIBLE;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_WRITE_AHEAD_LOG;
 
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.blur.hadoop.io.AddDocumentsWritable;
+import org.apache.blur.hadoop.io.DeleteTermsWritable;
 import org.apache.blur.hadoop.io.MutateWritable;
+import org.apache.blur.hadoop.io.UpdatePackagesWritable;
 import org.apache.blur.thrift.BlurClient;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.MutateOptions;
+import org.apache.blur.thrift.generated.Term;
+import org.apache.blur.thrift.generated.UpdatePackage;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 
-public class BlurWriter extends RecordWriter<BytesWritable, MutateWritable> {
+public class BlurWriter extends RecordWriter<Writable, MutateWritable> {
 
-  
   private Iface client;
   private MutateOptions options;
 
-  public static void main(String[] args) {
-    Configuration configuration = new Configuration();
-    configuration.set(BLUR_CONNECTION_STR, "127.0.0.1:40020");
-    configuration.set(BLUR_TABLE_NAME, "test_table");
-
-  }
+  // public static void main(String[] args) throws IOException,
+  // InterruptedException, ParseException, BlurException, TException {
+  // Configuration configuration = new Configuration();
+  // configuration.set(BLUR_CONNECTION_STR, "127.0.0.1:40020");
+  // configuration.set(BLUR_TABLE_NAME, "test_table");
+  //
+  // Iface client = BlurClient.getClient("127.0.0.1:40020");
+  // QueryArgs queryArgs = new QueryArgs();
+  // queryArgs.setQuery(ThriftLuceneConversion.parseQuery("test:hi",
+  // Version.LUCENE_40, "", new StandardAnalyzer(Version.LUCENE_40)));
+  // List<TopFieldDocs> list = client.search(new
+  // Session(UUID.randomUUID().toString(), "test_table"), queryArgs);
+  // long totalHits = list.get(0).getTotalHits();
+  // System.out.println(totalHits);
+  //
+  // BlurWriter writer = new BlurWriter(configuration, 0);
+  //
+  // AddDocumentsWritable value = new AddDocumentsWritable();
+  // List<Document> documents = new ArrayList<Document>();
+  // Document document = new Document();
+  // Field field = new Field();
+  // field.setName("test");
+  // field.setType(TYPE.TEXT);
+  // field.setValue("hi".getBytes());
+  // document.addToFields(field);
+  // documents.add(document);
+  // value.setDocuments(documents);
+  // writer.write(null, value);
+  //
+  // writer.write(null, value);
+  // }
 
   public BlurWriter(Configuration configuration, int shardIndex) {
     String connectionStr = configuration.get(BLUR_CONNECTION_STR);
@@ -43,26 +71,44 @@ public class BlurWriter extends RecordWriter<BytesWritable, MutateWritable> {
     options = new MutateOptions(table, shardIndex, waitToBeVisible, writeAheadLog);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public void write(BytesWritable key, MutateWritable value) throws IOException, InterruptedException {
+  public void write(Writable key, MutateWritable value) throws IOException, InterruptedException {
+    if (value == null) {
+      return;
+    }
     if (value instanceof AddDocumentsWritable) {
       AddDocumentsWritable addDocumentsWritable = (AddDocumentsWritable) value;
-      List<Document> docs = getDocs(addDocumentsWritable.getDocuments());
       try {
-        client.addDocuments(options, docs);
+        client.addDocuments(options, (List<Document>) addDocumentsWritable.getDocuments());
+      } catch (BlurException e) {
+        throw new IOException(e);
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    } else if (value instanceof DeleteTermsWritable) {
+      DeleteTermsWritable deleteTermsWritable = (DeleteTermsWritable) value;
+      try {
+        client.deleteDocuments(options, (List<Term>) deleteTermsWritable.getTerms());
       } catch (BlurException e) {
         throw new IOException(e);
       } catch (TException e) {
         throw new IOException(e);
       }
+    } else if (value instanceof UpdatePackagesWritable) {
+      UpdatePackagesWritable updatePackagesWritable = (UpdatePackagesWritable) value;
+      try {
+        client.updateDocuments(options, (List<UpdatePackage>) updatePackagesWritable.getUpdatePackages());
+      } catch (BlurException e) {
+        throw new IOException(e);
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    } else {
+      throw new RuntimeException("MutateWritable type of [" + value.getClass() + "] in instance [" + value + "] not supported.");
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private List<Document> getDocs(List<?> documents) {
-    return (List<Document>) documents;
-  }
-
   @Override
   public void close(TaskAttemptContext context) throws IOException, InterruptedException {
 


Mime
View raw message