incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: More MR updates. InputFormats should now work.
Date Thu, 20 Dec 2012 22:05:37 GMT
Updated Branches:
  refs/heads/0.2-dev cf8018888 -> 4e1ce9528


More MR updates.  InputFormats should now work.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4e1ce952
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4e1ce952
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4e1ce952

Branch: refs/heads/0.2-dev
Commit: 4e1ce95282b26acaf307753906dfbfae65c4f2d9
Parents: cf80188
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Dec 20 17:05:06 2012 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Dec 20 17:05:06 2012 -0500

----------------------------------------------------------------------
 .../org/apache/blur/analysis/BlurAnalyzer.java     |    3 +
 .../java/org/apache/blur/thrift/BlurServer.java    |   21 +++-
 .../blur/example/BlurExampleIndexReader.java       |   79 ++++++++++
 .../blur/example/BlurExampleIndexWriter.java       |   67 +++++++++
 .../apache/blur/example/BlurExampleIndexer.java    |   53 -------
 .../org/apache/blur/example/BlurExampleMapper.java |   49 -------
 .../blur/example/BlurExampleReaderMapper.java      |   52 +++++++
 .../blur/example/BlurExampleWriterMapper.java      |   51 +++++++
 .../blur/mapreduce/lib/BlurMapReduceUtil.java      |    8 +-
 .../org/apache/blur/mapreduce/lib/BlurMapper.java  |    9 --
 .../blur/mapreduce/lib/BlurOutputFormat.java       |   44 ++++++-
 .../blur/mapreduce/lib/BlurOutputMapper.java       |    9 ++
 .../org/apache/blur/mapreduce/lib/BlurReader.java  |  111 +++++++++++----
 .../org/apache/blur/mapreduce/lib/BlurWriter.java  |   84 ++++++-----
 14 files changed, 455 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-core/src/main/java/org/apache/blur/analysis/BlurAnalyzer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/analysis/BlurAnalyzer.java b/src/blur-core/src/main/java/org/apache/blur/analysis/BlurAnalyzer.java
index 5bde2c5..4f3015b 100644
--- a/src/blur-core/src/main/java/org/apache/blur/analysis/BlurAnalyzer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/analysis/BlurAnalyzer.java
@@ -54,6 +54,9 @@ public final class BlurAnalyzer extends AnalyzerWrapper {
   }
 
   private void convert(org.apache.blur.thrift.generated.Analyzer analyzer) {
+    if (analyzer == null) {
+      return;
+    }
     List<AnalyzerType> analyzerTypes = analyzer.getAnalyzerTypes();
     try {
       for (AnalyzerType analyzerType : analyzerTypes) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
index 0d9c452..62468ae 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurServer.java
@@ -31,6 +31,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -409,7 +410,7 @@ public class BlurServer extends TableAdmin implements Iface {
   @Override
   public List<Generation> addDocuments(MutateOptions options, List<Document>
documents) throws BlurException, TException {
     String table = options.getTable();
-    int shardIndex = options.getShardIndex();
+    int shardIndex = getShardIndex(options);
     boolean waitToBeVisible = options.isWaitToBeVisible();
     boolean writeAheadLog = options.isWriteAheadLog();
     List<Generation> generations = new ArrayList<Generation>();
@@ -428,6 +429,18 @@ public class BlurServer extends TableAdmin implements Iface {
     }
   }
 
+  private int getShardIndex(MutateOptions options) {
+    int shardIndex = options.getShardIndex();
+    if (shardIndex < 0) {
+      // @TODO this is going to be very slow
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, options.getTable());
+      int shardCount = tableDescriptor.getShardCount();
+      Random random = new Random();
+      return random.nextInt(shardCount);
+    }
+    return shardIndex;
+  }
+
   @Override
   public void blockUntilGenerationIsVisible(List<Generation> generations, boolean forceRefresh)
throws BlurException, TException {
     try {
@@ -463,7 +476,7 @@ public class BlurServer extends TableAdmin implements Iface {
   @Override
   public List<Generation> deleteDocumentsByQueries(MutateOptions options, List<ByteBuffer>
queries) throws BlurException, TException {
     String table = options.getTable();
-    int shardIndex = options.getShardIndex();
+    int shardIndex = getShardIndex(options);
     boolean waitToBeVisible = options.isWaitToBeVisible();
     boolean writeAheadLog = options.isWriteAheadLog();
     List<Generation> generations = new ArrayList<Generation>();
@@ -485,7 +498,7 @@ public class BlurServer extends TableAdmin implements Iface {
   @Override
   public List<Generation> deleteDocuments(MutateOptions options, List<Term> terms)
throws BlurException, TException {
     String table = options.getTable();
-    int shardIndex = options.getShardIndex();
+    int shardIndex = getShardIndex(options);
     boolean waitToBeVisible = options.isWaitToBeVisible();
     boolean writeAheadLog = options.isWriteAheadLog();
     List<Generation> generations = new ArrayList<Generation>();
@@ -507,7 +520,7 @@ public class BlurServer extends TableAdmin implements Iface {
   @Override
   public List<Generation> updateDocuments(MutateOptions options, List<UpdatePackage>
updatePackages) throws BlurException, TException {
     String table = options.getTable();
-    int shardIndex = options.getShardIndex();
+    int shardIndex = getShardIndex(options);
     boolean waitToBeVisible = options.isWaitToBeVisible();
     boolean writeAheadLog = options.isWriteAheadLog();
     List<Generation> generations = new ArrayList<Generation>();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexReader.java
b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexReader.java
new file mode 100644
index 0000000..cd09716
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexReader.java
@@ -0,0 +1,79 @@
+package org.apache.blur.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.blur.mapreduce.lib.BlurMapReduceUtil;
+import org.apache.blur.thrift.generated.QueryArgs;
+import org.apache.blur.utils.ThriftLuceneConversion;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.lucene.search.MatchAllDocsQuery;
+
+public class BlurExampleIndexReader {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
{
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length != 3) {
+      System.err.println("Usage: blurexampleindexreader <connection string> <table>
<output>");
+      System.exit(2);
+    }
+
+    String connectionStr = otherArgs[0];
+    String table = otherArgs[1];
+    String output = otherArgs[2];
+
+    Job job = new Job(configuration, "Blur Index Reader");
+    job.setNumReduceTasks(0);
+    job.setMapperClass(BlurExampleReaderMapper.class);
+    QueryArgs queryArgs = new QueryArgs();
+    queryArgs.setQuery(ThriftLuceneConversion.toBytes(new MatchAllDocsQuery()));
+    BlurMapReduceUtil.setupReaderJob(job, connectionStr, table, null, queryArgs);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setOutputKeyClass(LongWritable.class);
+    job.setOutputValueClass(Text.class);
+
+    FileOutputFormat.setOutputPath(job, new Path(output));
+
+    boolean waitForCompletion = job.waitForCompletion(true);
+    if (waitForCompletion) {
+      Counters counters = job.getCounters();
+      Collection<String> groupNames = counters.getGroupNames();
+      for (String groupName : groupNames) {
+        CounterGroup group = counters.getGroup(groupName);
+        String name = group.getName();
+        for (Counter counter : group) {
+          System.out.println(name + ":" + counter.getName() + "=" + counter.getValue());
+        }
+      }
+    } else {
+      System.out.println("Fail");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexWriter.java
b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexWriter.java
new file mode 100644
index 0000000..9d6eb54
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexWriter.java
@@ -0,0 +1,67 @@
+package org.apache.blur.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.blur.mapreduce.lib.BlurMapReduceUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class BlurExampleIndexWriter {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
{
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length != 3) {
+      System.err.println("Usage: blurexampleindexwriter <in> <connection string>
<table>");
+      System.exit(2);
+    }
+
+    String input = otherArgs[0];
+    String connectionStr = otherArgs[1];
+    String table = otherArgs[2];
+
+    Job job = new Job(configuration, "Blur Index Writer");
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setNumReduceTasks(0);
+    job.setMapperClass(BlurExampleWriterMapper.class);
+    BlurMapReduceUtil.setupWriterJob(job, connectionStr, table);
+
+    FileInputFormat.addInputPath(job, new Path(input));
+    boolean waitForCompletion = job.waitForCompletion(true);
+    if (waitForCompletion) {
+      Counters counters = job.getCounters();
+      Collection<String> groupNames = counters.getGroupNames();
+      for (String groupName : groupNames) {
+        CounterGroup group = counters.getGroup(groupName);
+        String name = group.getName();
+        for (Counter counter : group) {
+          System.out.println(name + ":" + counter.getName() + "=" + counter.getValue());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexer.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexer.java
b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexer.java
deleted file mode 100644
index 359f279..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleIndexer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.blur.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.mapreduce.lib.BlurMapReduceUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-public class BlurExampleIndexer {
-
-  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
{
-    Configuration configuration = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    if (otherArgs.length != 3) {
-      System.err.println("Usage: blurexampleindexer <in> <connection string>
<table>");
-      System.exit(2);
-    }
-
-    String input = otherArgs[0];
-    String connectionStr = otherArgs[1];
-    String table = otherArgs[2];
-
-    Job job = new Job(configuration, "Blur Indexer");
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setNumReduceTasks(0);
-    job.setMapperClass(BlurExampleMapper.class);
-    BlurMapReduceUtil.setupWriterJob(job, connectionStr, table);
-
-    FileInputFormat.addInputPath(job, new Path(input));
-    boolean waitForCompletion = job.waitForCompletion(true);
-    System.exit(waitForCompletion ? 0 : 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleMapper.java
b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleMapper.java
deleted file mode 100644
index 77378de..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleMapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.blur.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.blur.hadoop.io.AddDocumentsWritable;
-import org.apache.blur.hadoop.io.DocumentWritable;
-import org.apache.blur.mapreduce.lib.BlurMapper;
-import org.apache.blur.thrift.generated.Field;
-import org.apache.blur.thrift.generated.TYPE;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-public class BlurExampleMapper extends BlurMapper<LongWritable, Text> {
-
-  private DocumentWritable documentWritable = new DocumentWritable();
-  private AddDocumentsWritable addDocumentsWritable = new AddDocumentsWritable();
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    addDocumentsWritable.setDocumentWritable(documentWritable);
-  }
-
-  @Override
-  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
-    documentWritable.addToFields(new Field("body", ByteBuffer.wrap(value.getBytes()), TYPE.TEXT,
1.0));
-
-    // NOTE:We are using the original value to distribute the documents evenly
-    // across all of the reducer (if any).
-    context.write(value, addDocumentsWritable);
-    context.progress();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleReaderMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleReaderMapper.java
b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleReaderMapper.java
new file mode 100644
index 0000000..915dc2a
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleReaderMapper.java
@@ -0,0 +1,52 @@
+package org.apache.blur.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.hadoop.io.AddDocumentsWritable;
+import org.apache.blur.hadoop.io.DocumentWritable;
+import org.apache.blur.hadoop.io.ScoreDocWritable;
+import org.apache.blur.thrift.generated.Field;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class BlurExampleReaderMapper extends Mapper<ScoreDocWritable, DocumentWritable,
LongWritable, Text> {
+
+  private DocumentWritable documentWritable = new DocumentWritable();
+  private AddDocumentsWritable addDocumentsWritable = new AddDocumentsWritable();
+  private LongWritable key = new LongWritable();
+  private Text value = new Text();
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    addDocumentsWritable.setDocumentWritable(documentWritable);
+  }
+
+  @Override
+  protected void map(ScoreDocWritable k, DocumentWritable d, Context context) throws IOException,
InterruptedException {
+    List<Field> fields = d.getFields();
+    for (Field field : fields) {
+      key.set(k.getDocLocation());
+      value.set(new String(field.getValue()));
+      context.write(key, value);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleWriterMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleWriterMapper.java
b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleWriterMapper.java
new file mode 100644
index 0000000..1d397e2
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/example/BlurExampleWriterMapper.java
@@ -0,0 +1,51 @@
+package org.apache.blur.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.blur.hadoop.io.AddDocumentsWritable;
+import org.apache.blur.hadoop.io.DocumentWritable;
+import org.apache.blur.mapreduce.lib.BlurOutputMapper;
+import org.apache.blur.thrift.generated.Field;
+import org.apache.blur.thrift.generated.TYPE;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+public class BlurExampleWriterMapper extends BlurOutputMapper<LongWritable, Text> {
+
+  private DocumentWritable documentWritable = new DocumentWritable();
+  private AddDocumentsWritable addDocumentsWritable = new AddDocumentsWritable();
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    addDocumentsWritable.setDocumentWritable(documentWritable);
+  }
+
+  @Override
+  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
+    documentWritable.addToFields(new Field("body", ByteBuffer.wrap(value.getBytes()), TYPE.TEXT,
1.0));
+
+    // NOTE:We are using the original value to distribute the documents evenly
+    // across all of the reducer (if there are any reducers).
+    context.write(value, addDocumentsWritable);
+    context.progress();
+    
+    documentWritable.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
index 1adc46e..d47558e 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
@@ -11,6 +11,10 @@ import org.apache.hadoop.mapreduce.Job;
 
 public class BlurMapReduceUtil {
 
+  public enum COUNTERS {
+    RATE, DOCUMENT, FIELD, ADD, UPDATE, DELETE
+  }
+
   public static final String BLUR_QUERY = "blur.query";
   public static final String BLUR_FIELDS = "blur.fields";
   public static final String BLUR_TABLE_NAME = "blur.tableName";
@@ -26,7 +30,9 @@ public class BlurMapReduceUtil {
     configuration.set(BLUR_SESSION_ID, UUID.randomUUID().toString());
     configuration.set(BLUR_TABLE_NAME, tableName);
     configuration.set(BLUR_QUERY, BlurUtil.toJson(queryArgs));
-    configuration.setStrings(BLUR_FIELDS, fields.toArray(new String[fields.size()]));
+    if (fields != null) {
+      configuration.setStrings(BLUR_FIELDS, fields.toArray(new String[fields.size()]));
+    }
     job.setInputFormatClass(BlurInputFormat.class);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapper.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapper.java
deleted file mode 100644
index ed02d23..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapper.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-import org.apache.blur.hadoop.io.MutateWritable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-public abstract class BlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE,  WritableComparable<?>,
MutateWritable> {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index f4688c6..86edbcf 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -16,16 +16,22 @@ package org.apache.blur.mapreduce.lib;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.*;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_AUTO_ASSIGN_SHARD;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_CONNECTION_STR;
 import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_TABLE_NAME;
 
 import java.io.IOException;
 
+import org.apache.blur.hadoop.io.AddDocumentsWritable;
+import org.apache.blur.hadoop.io.DeleteTermsWritable;
 import org.apache.blur.hadoop.io.MutateWritable;
+import org.apache.blur.hadoop.io.UpdatePackagesWritable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -33,6 +39,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.Progressable;
 
 /**
  * The {@link BlurOutputFormat} writes {@link MutateWritable} objects (
@@ -53,7 +61,8 @@ import org.apache.hadoop.mapreduce.TaskID;
  * shard or allow the Blur server to assign the location automatically.<br/>
  * <br/>
  * 
- * NOTE: If the OutputFormat is run in a Map task then auto assignment is enabled.
+ * NOTE: If the OutputFormat is run in a Map task then auto assignment is
+ * enabled.
  */
 public class BlurOutputFormat extends OutputFormat<Writable, MutateWritable> {
 
@@ -72,10 +81,39 @@ public class BlurOutputFormat extends OutputFormat<Writable, MutateWritable>
{
     }
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Override
   public RecordWriter<Writable, MutateWritable> getRecordWriter(TaskAttemptContext
context) throws IOException, InterruptedException {
     TaskAttemptID taskAttemptID = context.getTaskAttemptID();
     Configuration configuration = context.getConfiguration();
+    Counter rateCounter = new Counter() {};
+    Counter documentCounter = new Counter() {};
+    Counter fieldCounter = new Counter() {};
+    Counter addCounter = new Counter() {};
+    Counter updateCounter = new Counter() {};
+    Counter deleteCounter = new Counter() {};
+
+    if (context instanceof TaskInputOutputContext) {
+      TaskInputOutputContext tcontext = ((TaskInputOutputContext) context);
+      rateCounter = tcontext.getCounter(BlurMapReduceUtil.COUNTERS.RATE);
+      documentCounter = tcontext.getCounter(BlurMapReduceUtil.COUNTERS.DOCUMENT);
+      fieldCounter = tcontext.getCounter(BlurMapReduceUtil.COUNTERS.FIELD);
+      addCounter = tcontext.getCounter(BlurMapReduceUtil.COUNTERS.ADD);
+      updateCounter = tcontext.getCounter(BlurMapReduceUtil.COUNTERS.UPDATE);
+      deleteCounter = tcontext.getCounter(BlurMapReduceUtil.COUNTERS.DELETE);
+    } else if (context instanceof org.apache.hadoop.mapred.TaskAttemptContext) {
+      Progressable progressible = ((org.apache.hadoop.mapred.TaskAttemptContext) context).getProgressible();
+      if (progressible instanceof Reporter) {
+        Reporter reporter = (Reporter) progressible;
+        rateCounter = reporter.getCounter(BlurMapReduceUtil.COUNTERS.RATE);
+        documentCounter = reporter.getCounter(BlurMapReduceUtil.COUNTERS.DOCUMENT);
+        fieldCounter = reporter.getCounter(BlurMapReduceUtil.COUNTERS.FIELD);
+        addCounter = reporter.getCounter(BlurMapReduceUtil.COUNTERS.ADD);
+        updateCounter = reporter.getCounter(BlurMapReduceUtil.COUNTERS.UPDATE);
+        deleteCounter = reporter.getCounter(BlurMapReduceUtil.COUNTERS.DELETE);
+      }
+    }
+
     int shardIndex = -1;
     if (taskAttemptID.isMap()) {
       LOG.info("Blur output in a map is forced to have auto assigned shards. (MutateOptions.shardIndex
= -1)");
@@ -86,7 +124,7 @@ public class BlurOutputFormat extends OutputFormat<Writable, MutateWritable>
{
       shardIndex = taskID.getId();
       LOG.info("Blur output is set to shard index [" + shardIndex + "].");
     }
-    return new BlurWriter(context.getConfiguration(), shardIndex);
+    return new BlurWriter(context.getConfiguration(), shardIndex, rateCounter, documentCounter,
fieldCounter, addCounter, updateCounter, deleteCounter);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputMapper.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputMapper.java
new file mode 100644
index 0000000..5ae38af
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputMapper.java
@@ -0,0 +1,9 @@
+package org.apache.blur.mapreduce.lib;
+
+import org.apache.blur.hadoop.io.MutateWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public abstract class BlurOutputMapper<KEY, VALUE> extends Mapper<KEY, VALUE,  WritableComparable<?>,
MutateWritable> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
index 8c3bcbc..8dbc828 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
@@ -17,6 +17,8 @@ package org.apache.blur.mapreduce.lib;
  * limitations under the License.
  */
 import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_CONNECTION_STR;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_FIELDS;
+import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_QUERY;
 import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_SESSION_ID;
 import static org.apache.blur.mapreduce.lib.BlurMapReduceUtil.BLUR_TABLE_NAME;
 
@@ -37,11 +39,17 @@ import org.apache.blur.thrift.generated.QueryArgs;
 import org.apache.blur.thrift.generated.ScoreDoc;
 import org.apache.blur.thrift.generated.Session;
 import org.apache.blur.thrift.generated.TopFieldDocs;
+import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.Progressable;
 import org.apache.thrift.TException;
+
 public class BlurReader extends RecordReader<ScoreDocWritable, DocumentWritable> {
 
   private Iface client;
@@ -56,47 +64,84 @@ public class BlurReader extends RecordReader<ScoreDocWritable, DocumentWritable>
   private List<ScoreDoc> scoreDocs = new ArrayList<ScoreDoc>();
   private List<Document> documents = new ArrayList<Document>();
 
+  private boolean initialized = false;
+
+  private Counter rateCounter = new Counter() {
+  };
+  private Counter documentCounter = new Counter() {
+  };
+  private Counter fieldCounter = new Counter() {
+  };
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Override
   public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
-    Configuration configuration = context.getConfiguration();
-
-    String connectionStr = configuration.get(BLUR_CONNECTION_STR);
-    String sessionId = configuration.get(BLUR_SESSION_ID);
-    String tableName = configuration.get(BLUR_TABLE_NAME);
-    String[] fieldArray = configuration.getStrings(BLUR_TABLE_NAME);
-    if (fieldArray != null) {
-      fields = new HashSet<String>(Arrays.asList(fieldArray));
-    }
+    if (!initialized) {
+      Configuration configuration = context.getConfiguration();
+
+      String connectionStr = configuration.get(BLUR_CONNECTION_STR);
+      String sessionId = configuration.get(BLUR_SESSION_ID);
+      String tableName = configuration.get(BLUR_TABLE_NAME);
+      String[] fieldArray = configuration.getStrings(BLUR_FIELDS);
+      String queryJson = configuration.get(BLUR_QUERY);
+      queryArgs = new QueryArgs();
+      BlurUtil.readJson(queryArgs, queryJson);
+
+      if (fieldArray != null) {
+        fields = new HashSet<String>(Arrays.asList(fieldArray));
+      }
 
-    configureForThisSplit((BlurInputSplit) split, queryArgs);
+      configureForThisSplit((BlurInputSplit) split, queryArgs);
 
-    session = new Session(sessionId, tableName);
-    client = BlurClient.getClient(connectionStr);
-    search();
+      session = new Session(sessionId, tableName);
+      client = BlurClient.getClient(connectionStr);
+      search();
+
+    }
+    initialized = true;
+    if (context instanceof TaskInputOutputContext) {
+      TaskInputOutputContext tcontext = ((TaskInputOutputContext) context);
+      rateCounter = tcontext.getCounter(BlurMapReduceUtil.COUNTERS.RATE);
+      documentCounter = tcontext.getCounter(BlurMapReduceUtil.COUNTERS.DOCUMENT);
+      fieldCounter = tcontext.getCounter(BlurMapReduceUtil.COUNTERS.FIELD);
+    } else if (context instanceof org.apache.hadoop.mapred.TaskAttemptContext) {
+      Progressable progressible = ((org.apache.hadoop.mapred.TaskAttemptContext) context).getProgressible();
+      if (progressible instanceof Reporter) {
+        Reporter reporter = (Reporter) progressible;
+        rateCounter = reporter.getCounter(BlurMapReduceUtil.COUNTERS.RATE);
+        documentCounter = reporter.getCounter(BlurMapReduceUtil.COUNTERS.DOCUMENT);
+        fieldCounter = reporter.getCounter(BlurMapReduceUtil.COUNTERS.FIELD);
+      }
+    }
   }
 
   @Override
   public void close() throws IOException {
-
+    rateCounter.setValue(0);
   }
 
   @Override
   public boolean nextKeyValue() throws IOException, InterruptedException {
-    totalPosition++;
-    position++;
-    if (totalPosition >= totalHits) {
-      return false;
-    }
-    if (position >= documents.size()) {
-      search();
+    try {
+      totalPosition++;
       position++;
-      if (totalPosition < totalHits) {
-        return true;
-      } else {
+      if (totalPosition >= totalHits) {
         return false;
       }
+      if (position >= documents.size()) {
+        search();
+        position++;
+        if (totalPosition < totalHits) {
+          return true;
+        } else {
+          return false;
+        }
+      }
+      return true;
+    } catch (Throwable e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
     }
-    return true;
   }
 
   private void search() throws IOException {
@@ -111,22 +156,30 @@ public class BlurReader extends RecordReader<ScoreDocWritable, DocumentWritable>
       documents = client.doc(session, getDocLocations(topFieldDocs), fields);
       position = -1;
     } catch (BlurException e) {
+      e.printStackTrace();
       throw new IOException(e);
     } catch (TException e) {
+      e.printStackTrace();
       throw new IOException(e);
     }
   }
 
   @Override
   public ScoreDocWritable getCurrentKey() throws IOException, InterruptedException {
-    //@TODO reuse an object.
+    // @TODO reuse an object.
     return new ScoreDocWritable(scoreDocs.get(position));
   }
 
   @Override
   public DocumentWritable getCurrentValue() throws IOException, InterruptedException {
-  //@TODO reuse an object.
-    return new DocumentWritable(documents.get(position));
+    // @TODO reuse an object.
+    return new DocumentWritable(count(documents.get(position)));
+  }
+
+  private Document count(Document document) {
+    documentCounter.increment(1);
+    fieldCounter.increment(document.getFieldsSize());
+    return document;
   }
 
   @Override
@@ -141,7 +194,7 @@ public class BlurReader extends RecordReader<ScoreDocWritable, DocumentWritable>
     int shard = split.getShard();
     queryArgs.setShardIndexes(Arrays.asList(shard));
   }
-  
+
   private List<Long> getDocLocations(TopFieldDocs topFieldDocs) {
     List<ScoreDoc> scoreDocs = topFieldDocs.scoreDocs;
     List<Long> docLocations = new ArrayList<Long>();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4e1ce952/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
index 3bf6d21..a5be0be 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
@@ -21,6 +21,7 @@ import org.apache.blur.thrift.generated.Term;
 import org.apache.blur.thrift.generated.UpdatePackage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
@@ -29,46 +30,27 @@ public class BlurWriter extends RecordWriter<Writable, MutateWritable>
{
 
   private Iface client;
   private MutateOptions options;
+  private Counter documentCounter;
+  private Counter rateCounter;
+  private Counter fieldCounter;
+  private Counter deleteCounter;
+  private Counter updateCounter;
+  private Counter addCounter;
 
-  // public static void main(String[] args) throws IOException,
-  // InterruptedException, ParseException, BlurException, TException {
-  // Configuration configuration = new Configuration();
-  // configuration.set(BLUR_CONNECTION_STR, "127.0.0.1:40020");
-  // configuration.set(BLUR_TABLE_NAME, "test_table");
-  //
-  // Iface client = BlurClient.getClient("127.0.0.1:40020");
-  // QueryArgs queryArgs = new QueryArgs();
-  // queryArgs.setQuery(ThriftLuceneConversion.parseQuery("test:hi",
-  // Version.LUCENE_40, "", new StandardAnalyzer(Version.LUCENE_40)));
-  // List<TopFieldDocs> list = client.search(new
-  // Session(UUID.randomUUID().toString(), "test_table"), queryArgs);
-  // long totalHits = list.get(0).getTotalHits();
-  // System.out.println(totalHits);
-  //
-  // BlurWriter writer = new BlurWriter(configuration, 0);
-  //
-  // AddDocumentsWritable value = new AddDocumentsWritable();
-  // List<Document> documents = new ArrayList<Document>();
-  // Document document = new Document();
-  // Field field = new Field();
-  // field.setName("test");
-  // field.setType(TYPE.TEXT);
-  // field.setValue("hi".getBytes());
-  // document.addToFields(field);
-  // documents.add(document);
-  // value.setDocuments(documents);
-  // writer.write(null, value);
-  //
-  // writer.write(null, value);
-  // }
-
-  public BlurWriter(Configuration configuration, int shardIndex) {
+  public BlurWriter(Configuration configuration, int shardIndex, Counter rateCounter, Counter
documentCounter, Counter fieldCounter, Counter addCounter, Counter updateCounter,
+      Counter deleteCounter) {
     String connectionStr = configuration.get(BLUR_CONNECTION_STR);
     String table = configuration.get(BLUR_TABLE_NAME);
     boolean writeAheadLog = configuration.getBoolean(BLUR_WRITE_AHEAD_LOG, false);
     boolean waitToBeVisible = configuration.getBoolean(BLUR_WAIT_TO_BE_VISIBLE, false);
     client = BlurClient.getClient(connectionStr);
     options = new MutateOptions(table, shardIndex, waitToBeVisible, writeAheadLog);
+    this.rateCounter = rateCounter;
+    this.documentCounter = documentCounter;
+    this.fieldCounter = fieldCounter;
+    this.addCounter = addCounter;
+    this.updateCounter = updateCounter;
+    this.deleteCounter = deleteCounter;
   }
 
   @SuppressWarnings("unchecked")
@@ -80,7 +62,7 @@ public class BlurWriter extends RecordWriter<Writable, MutateWritable>
{
     if (value instanceof AddDocumentsWritable) {
       AddDocumentsWritable addDocumentsWritable = (AddDocumentsWritable) value;
       try {
-        client.addDocuments(options, (List<Document>) addDocumentsWritable.getDocuments());
+        client.addDocuments(options, countAdds((List<Document>) addDocumentsWritable.getDocuments()));
       } catch (BlurException e) {
         throw new IOException(e);
       } catch (TException e) {
@@ -89,7 +71,7 @@ public class BlurWriter extends RecordWriter<Writable, MutateWritable>
{
     } else if (value instanceof DeleteTermsWritable) {
       DeleteTermsWritable deleteTermsWritable = (DeleteTermsWritable) value;
       try {
-        client.deleteDocuments(options, (List<Term>) deleteTermsWritable.getTerms());
+        client.deleteDocuments(options, countDeletes((List<Term>) deleteTermsWritable.getTerms()));
       } catch (BlurException e) {
         throw new IOException(e);
       } catch (TException e) {
@@ -98,7 +80,7 @@ public class BlurWriter extends RecordWriter<Writable, MutateWritable>
{
     } else if (value instanceof UpdatePackagesWritable) {
       UpdatePackagesWritable updatePackagesWritable = (UpdatePackagesWritable) value;
       try {
-        client.updateDocuments(options, (List<UpdatePackage>) updatePackagesWritable.getUpdatePackages());
+        client.updateDocuments(options, countUpdates((List<UpdatePackage>) updatePackagesWritable.getUpdatePackages()));
       } catch (BlurException e) {
         throw new IOException(e);
       } catch (TException e) {
@@ -109,9 +91,37 @@ public class BlurWriter extends RecordWriter<Writable, MutateWritable>
{
     }
   }
 
+  private List<UpdatePackage> countUpdates(List<UpdatePackage> updatePackages)
{
+    updateCounter.increment(1);
+    for (UpdatePackage updatePackage : updatePackages) {
+      List<Document> documents = updatePackage.getDocuments();
+      counterDocuments(documents);
+    }
+    return updatePackages;
+  }
+
+  private List<Term> countDeletes(List<Term> terms) {
+    deleteCounter.increment(1);
+    return terms;
+  }
+
+  private List<Document> countAdds(List<Document> documents) {
+    addCounter.increment(1);
+    counterDocuments(documents);
+    return documents;
+  }
+
+  private void counterDocuments(List<Document> documents) {
+    documentCounter.increment(documents.size());
+    for (Document document : documents) {
+      fieldCounter.increment(document.getFieldsSize());
+    }
+  }
+
   @Override
   public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
-
+    rateCounter.setValue(0);
+    context.progress();
   }
 
 }


Mime
View raw message