incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding a feature to change the document buffering in the mapreduce program from fixed document size to a heap based strategy. See BlurOutputFormat.setDocumentBufferStrategy method.
Date Thu, 19 Jun 2014 18:39:33 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/apache-blur-0.2 cd2673f8a -> 79803a133


Adding a feature to change the document buffering in the mapreduce program from fixed document
size to a heap based strategy.  See BlurOutputFormat.setDocumentBufferStrategy method.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/79803a13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/79803a13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/79803a13

Branch: refs/heads/apache-blur-0.2
Commit: 79803a13360f430afbb02a60b3fde0776bce011d
Parents: cd2673f
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jun 19 14:39:13 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jun 19 14:39:13 2014 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurOutputFormat.java    | 36 ++++++++++++-
 .../mapreduce/lib/DocumentBufferStrategy.java   | 57 ++++++++++++++++++++
 .../lib/DocumentBufferStrategyFixedSize.java    | 44 +++++++++++++++
 .../lib/DocumentBufferStrategyHeapSize.java     | 53 ++++++++++++++++++
 .../blur/mapreduce/lib/DocumentSizeOf.java      | 31 +++++++++++
 .../mapreduce/lib/GenericBlurRecordWriter.java  | 36 +++++++------
 .../mapreduce/lib/BlurOutputFormatTest.java     |  2 +
 .../blur/mapreduce/lib/BlurOutputFormat.java    | 33 ++++++++++++
 .../mapreduce/lib/DocumentBufferStrategy.java   | 57 ++++++++++++++++++++
 .../lib/DocumentBufferStrategyFixedSize.java    | 44 +++++++++++++++
 .../lib/DocumentBufferStrategyHeapSize.java     | 53 ++++++++++++++++++
 .../blur/mapreduce/lib/DocumentSizeOf.java      | 31 +++++++++++
 .../mapreduce/lib/GenericBlurRecordWriter.java  | 36 +++++++------
 .../mapreduce/lib/BlurOutputFormatTest.java     |  2 +
 14 files changed, 479 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index 7bbc567..bf68e66 100644
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -19,6 +19,7 @@ package org.apache.blur.mapreduce.lib;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
@@ -65,11 +66,12 @@ import org.apache.hadoop.util.Progressable;
  * 
  */
 public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> {
-
   public static final String BLUR_OUTPUT_REDUCER_MULTIPLIER = "blur.output.reducer.multiplier";
   public static final String BLUR_OUTPUT_OPTIMIZEINFLIGHT = "blur.output.optimizeinflight";
   public static final String BLUR_OUTPUT_INDEXLOCALLY = "blur.output.indexlocally";
   public static final String BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE = "blur.output.max.document.buffer.size";
+  public static final String BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_HEAP_SIZE = "blur.output.max.document.buffer.heap.size";
+  public static final String BLUR_OUTPUT_DOCUMENT_BUFFER_STRATEGY = "blur.output.document.buffer.strategy";
   public static final String BLUR_TABLE_DESCRIPTOR = "blur.table.descriptor";
   public static final String BLUR_OUTPUT_PATH = "blur.output.path";
 
@@ -121,7 +123,7 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
 
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
InterruptedException {
-    return new BlurOutputCommitter(context.getTaskAttemptID().isMap(), context.getNumReduceTasks());
+    return new BlurOutputCommitter();
   }
 
   public static TableDescriptor getTableDescriptor(Configuration configuration) throws IOException
{
@@ -237,6 +239,36 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     return configuration.getInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, 1000);
   }
 
+  public static int getMaxDocumentBufferHeapSize(Configuration configuration) {
+    return configuration.getInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_HEAP_SIZE, 32 * 1024 * 1024);
+  }
+
+  public static void setMaxDocumentBufferHeapSize(Configuration configuration, int maxDocumentBufferHeapSize)
{
+    configuration.setInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_HEAP_SIZE, maxDocumentBufferHeapSize);
+  }
+
+  public static void setMaxDocumentBufferHeapSize(Job job, int maxDocumentBufferHeapSize)
{
+    setMaxDocumentBufferHeapSize(job.getConfiguration(), maxDocumentBufferHeapSize);
+  }
+
+  public static DocumentBufferStrategy getDocumentBufferStrategy(Configuration configuration)
{
+    Class<? extends DocumentBufferStrategy> clazz = configuration.getClass(BLUR_OUTPUT_DOCUMENT_BUFFER_STRATEGY,
DocumentBufferStrategyFixedSize.class, DocumentBufferStrategy.class);
+    try {
+      Constructor<? extends DocumentBufferStrategy> constructor = clazz.getConstructor(new
Class[]{Configuration.class});
+      return constructor.newInstance(new Object[]{configuration});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public static void setDocumentBufferStrategy(Job job, Class<? extends DocumentBufferStrategy>
documentBufferStrategyClass) {
+    setDocumentBufferStrategy(job.getConfiguration(), documentBufferStrategyClass);
+  }
+  
+  public static void setDocumentBufferStrategy(Configuration configuration, Class<? extends
DocumentBufferStrategy> documentBufferStrategyClass) {
+    configuration.setClass(BLUR_OUTPUT_DOCUMENT_BUFFER_STRATEGY, documentBufferStrategyClass,
DocumentBufferStrategy.class);
+  }
+
   public static void setOutputPath(Job job, Path path) {
     setOutputPath(job.getConfiguration(), path);
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategy.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategy.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategy.java
new file mode 100644
index 0000000..8b6f255
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategy.java
@@ -0,0 +1,57 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.document.Field;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public abstract class DocumentBufferStrategy {
+
+  protected final Map<String, List<Field>> _documents = new TreeMap<String,
List<Field>>();
+
+  public DocumentBufferStrategy(Configuration configuration) {
+    
+  }
+  
+  public abstract boolean isFull();
+
+  protected abstract void newDocument(List<Field> document);
+
+  public List<Field> add(String recordId, List<Field> document) {
+    newDocument(document);
+    return _documents.put(recordId, document);
+  }
+
+  public List<List<Field>> getAndClearBuffer() {
+    List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
+    _documents.clear();
+    return docs;
+  }
+
+  public boolean isEmpty() {
+    return _documents.isEmpty();
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyFixedSize.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyFixedSize.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyFixedSize.java
new file mode 100644
index 0000000..18a5f8e
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyFixedSize.java
@@ -0,0 +1,44 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.document.Field;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class DocumentBufferStrategyFixedSize extends DocumentBufferStrategy {
+
+  private final int _maxDocumentBufferSize;
+
+  public DocumentBufferStrategyFixedSize(Configuration configuration) {
+    super(configuration);
+    _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(configuration);
+  }
+
+  @Override
+  public boolean isFull() {
+    return _documents.size() >= _maxDocumentBufferSize;
+  }
+
+  @Override
+  protected void newDocument(List<Field> document) {
+    // no op
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyHeapSize.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyHeapSize.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyHeapSize.java
new file mode 100644
index 0000000..4e6b626
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyHeapSize.java
@@ -0,0 +1,53 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.util.List;
+
+import org.apache.blur.utils.RamUsageEstimator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.document.Field;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class DocumentBufferStrategyHeapSize extends DocumentBufferStrategy {
+
+  private final long _maxDocumentBufferHeapSize;
+  private long _currentHeapSize;
+
+  public DocumentBufferStrategyHeapSize(Configuration configuration) {
+    super(configuration);
+    _maxDocumentBufferHeapSize = BlurOutputFormat.getMaxDocumentBufferHeapSize(configuration);
+  }
+
+  @Override
+  public boolean isFull() {
+    return _currentHeapSize >= _maxDocumentBufferHeapSize;
+  }
+
+  @Override
+  protected void newDocument(List<Field> document) {
+    _currentHeapSize += RamUsageEstimator.sizeOf(document);
+  }
+
+  @Override
+  public List<List<Field>> getAndClearBuffer() {
+    List<List<Field>> docs = super.getAndClearBuffer();
+    _currentHeapSize = 0;
+    return docs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentSizeOf.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentSizeOf.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentSizeOf.java
new file mode 100644
index 0000000..7914838
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/DocumentSizeOf.java
@@ -0,0 +1,31 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.util.List;
+
+import org.apache.blur.utils.RamUsageEstimator;
+import org.apache.lucene.document.Field;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class DocumentSizeOf {
+
+  public static long sizeOf(List<Field> fields) {
+    return RamUsageEstimator.sizeOf(fields);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
index 0ec7dd3..ebe9a7b 100644
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
@@ -18,10 +18,7 @@ package org.apache.blur.mapreduce.lib;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -66,18 +63,19 @@ public class GenericBlurRecordWriter {
   private static final Counter NULL_COUNTER = new NullCounter();
 
   private final Text _prevKey = new Text();
-  private final Map<String, List<Field>> _documents = new TreeMap<String,
List<Field>>();
+
   private final IndexWriter _writer;
   private final FieldManager _fieldManager;
   private final Directory _finalDir;
   private final Directory _localDir;
   private final File _localPath;
-  private final int _maxDocumentBufferSize;
+  // private final int _maxDocumentBufferSize;
   private final IndexWriterConfig _conf;
   private final IndexWriterConfig _overFlowConf;
   private final Path _newIndex;
   private final boolean _indexLocally;
   private final boolean _optimizeInFlight;
+  private final DocumentBufferStrategy _documentBufferStrategy;
   private Counter _columnCount = NULL_COUNTER;
   private Counter _fieldCount = NULL_COUNTER;
   private Counter _recordCount = NULL_COUNTER;
@@ -98,6 +96,7 @@ public class GenericBlurRecordWriter {
 
   public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName)
throws IOException {
     _configuration = configuration;
+    _documentBufferStrategy = BlurOutputFormat.getDocumentBufferStrategy(_configuration);
     _indexLocally = BlurOutputFormat.isIndexLocally(_configuration);
     _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(_configuration);
 
@@ -105,7 +104,6 @@ public class GenericBlurRecordWriter {
     int shardCount = tableDescriptor.getShardCount();
     int shardId = attemptId % shardCount;
 
-    _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(_configuration);
     Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
     String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
     Path indexPath = new Path(tableOutput, shardName);
@@ -203,7 +201,7 @@ public class GenericBlurRecordWriter {
       _columnCount.increment(record.getColumns().size());
     }
     List<Field> document = RowDocumentUtil.getDoc(_fieldManager, blurRecord.getRowId(),
record);
-    List<Field> dup = _documents.put(recordId, document);
+    List<Field> dup = _documentBufferStrategy.add(recordId, document);
     if (_countersSetup) {
       if (dup != null) {
         _recordDuplicateCount.increment(1);
@@ -216,13 +214,14 @@ public class GenericBlurRecordWriter {
   }
 
   private void flushToTmpIndexIfNeeded() throws IOException {
-    if (_documents.size() > _maxDocumentBufferSize) {
+    if (_documentBufferStrategy.isFull()) {
+      LOG.info("Document Buffer is full overflow to disk.");
       flushToTmpIndex();
     }
   }
 
   private void flushToTmpIndex() throws IOException {
-    if (_documents.isEmpty()) {
+    if (_documentBufferStrategy.isEmpty()) {
       return;
     }
     _usingLocalTmpindex = true;
@@ -235,13 +234,17 @@ public class GenericBlurRecordWriter {
       // The local tmp writer has merging disabled so the first document in is
       // going to be doc 0.
       // Therefore the first document added is the prime doc
-      List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
+      List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
       docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
-      _localTmpWriter.addDocuments(docs);
+      for (List<Field> doc : docs) {
+        _localTmpWriter.addDocument(doc);
+      }
     } else {
-      _localTmpWriter.addDocuments(_documents.values());
+      List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
+      for (List<Field> doc : docs) {
+        _localTmpWriter.addDocument(doc);
+      }
     }
-    _documents.clear();
   }
 
   private void resetLocalTmp() {
@@ -280,7 +283,7 @@ public class GenericBlurRecordWriter {
         _rowOverFlowCount.increment(1);
       }
     } else {
-      if (_documents.isEmpty()) {
+      if (_documentBufferStrategy.isEmpty()) {
         if (_deletedRowId != null) {
           _writer.addDocument(getDeleteDoc());
           if (_countersSetup) {
@@ -288,13 +291,12 @@ public class GenericBlurRecordWriter {
           }
         }
       } else {
-        List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
+        List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
         docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
         _writer.addDocuments(docs);
         if (_countersSetup) {
-          _recordRateCounter.mark(_documents.size());
+          _recordRateCounter.mark(docs.size());
         }
-        _documents.clear();
       }
     }
     _deletedRowId = null;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index 811dba5..416b868 100644
--- a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -220,6 +220,8 @@ public class BlurOutputFormatTest {
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
     BlurOutputFormat.setIndexLocally(job, false);
+    BlurOutputFormat.setDocumentBufferStrategy(job, DocumentBufferStrategyHeapSize.class);
+    BlurOutputFormat.setMaxDocumentBufferHeapSize(job, 128 * 1024);
 
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index c959997..00c163a 100644
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -19,6 +19,7 @@ package org.apache.blur.mapreduce.lib;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
@@ -70,6 +71,8 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
   public static final String BLUR_OUTPUT_OPTIMIZEINFLIGHT = "blur.output.optimizeinflight";
   public static final String BLUR_OUTPUT_INDEXLOCALLY = "blur.output.indexlocally";
   public static final String BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE = "blur.output.max.document.buffer.size";
+  public static final String BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_HEAP_SIZE = "blur.output.max.document.buffer.heap.size";
+  public static final String BLUR_OUTPUT_DOCUMENT_BUFFER_STRATEGY = "blur.output.document.buffer.strategy";
   public static final String BLUR_TABLE_DESCRIPTOR = "blur.table.descriptor";
   public static final String BLUR_OUTPUT_PATH = "blur.output.path";
 
@@ -237,6 +240,36 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     return configuration.getInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, 1000);
   }
 
+  public static int getMaxDocumentBufferHeapSize(Configuration configuration) {
+    return configuration.getInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_HEAP_SIZE, 32 * 1024 * 1024);
+  }
+
+  public static void setMaxDocumentBufferHeapSize(Configuration configuration, int maxDocumentBufferHeapSize)
{
+    configuration.setInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_HEAP_SIZE, maxDocumentBufferHeapSize);
+  }
+
+  public static void setMaxDocumentBufferHeapSize(Job job, int maxDocumentBufferHeapSize)
{
+    setMaxDocumentBufferHeapSize(job.getConfiguration(), maxDocumentBufferHeapSize);
+  }
+
+  public static DocumentBufferStrategy getDocumentBufferStrategy(Configuration configuration)
{
+    Class<? extends DocumentBufferStrategy> clazz = configuration.getClass(BLUR_OUTPUT_DOCUMENT_BUFFER_STRATEGY,
DocumentBufferStrategyFixedSize.class, DocumentBufferStrategy.class);
+    try {
+      Constructor<? extends DocumentBufferStrategy> constructor = clazz.getConstructor(new
Class[]{Configuration.class});
+      return constructor.newInstance(new Object[]{configuration});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public static void setDocumentBufferStrategy(Job job, Class<? extends DocumentBufferStrategy>
documentBufferStrategyClass) {
+    setDocumentBufferStrategy(job.getConfiguration(), documentBufferStrategyClass);
+  }
+  
+  public static void setDocumentBufferStrategy(Configuration configuration, Class<? extends
DocumentBufferStrategy> documentBufferStrategyClass) {
+    configuration.setClass(BLUR_OUTPUT_DOCUMENT_BUFFER_STRATEGY, documentBufferStrategyClass,
DocumentBufferStrategy.class);
+  }
+
   public static void setOutputPath(Job job, Path path) {
     setOutputPath(job.getConfiguration(), path);
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategy.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategy.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategy.java
new file mode 100644
index 0000000..8b6f255
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategy.java
@@ -0,0 +1,57 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.document.Field;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public abstract class DocumentBufferStrategy {
+
+  protected final Map<String, List<Field>> _documents = new TreeMap<String,
List<Field>>();
+
+  public DocumentBufferStrategy(Configuration configuration) {
+    
+  }
+  
+  public abstract boolean isFull();
+
+  protected abstract void newDocument(List<Field> document);
+
+  public List<Field> add(String recordId, List<Field> document) {
+    newDocument(document);
+    return _documents.put(recordId, document);
+  }
+
+  public List<List<Field>> getAndClearBuffer() {
+    List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
+    _documents.clear();
+    return docs;
+  }
+
+  public boolean isEmpty() {
+    return _documents.isEmpty();
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyFixedSize.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyFixedSize.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyFixedSize.java
new file mode 100644
index 0000000..18a5f8e
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyFixedSize.java
@@ -0,0 +1,44 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.document.Field;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class DocumentBufferStrategyFixedSize extends DocumentBufferStrategy {
+
+  private final int _maxDocumentBufferSize;
+
+  public DocumentBufferStrategyFixedSize(Configuration configuration) {
+    super(configuration);
+    _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(configuration);
+  }
+
+  @Override
+  public boolean isFull() {
+    return _documents.size() >= _maxDocumentBufferSize;
+  }
+
+  @Override
+  protected void newDocument(List<Field> document) {
+    // no op
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyHeapSize.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyHeapSize.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyHeapSize.java
new file mode 100644
index 0000000..4e6b626
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentBufferStrategyHeapSize.java
@@ -0,0 +1,53 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.util.List;
+
+import org.apache.blur.utils.RamUsageEstimator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.document.Field;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class DocumentBufferStrategyHeapSize extends DocumentBufferStrategy {
+
+  private final long _maxDocumentBufferHeapSize;
+  private long _currentHeapSize;
+
+  public DocumentBufferStrategyHeapSize(Configuration configuration) {
+    super(configuration);
+    _maxDocumentBufferHeapSize = BlurOutputFormat.getMaxDocumentBufferHeapSize(configuration);
+  }
+
+  @Override
+  public boolean isFull() {
+    return _currentHeapSize >= _maxDocumentBufferHeapSize;
+  }
+
+  @Override
+  protected void newDocument(List<Field> document) {
+    _currentHeapSize += RamUsageEstimator.sizeOf(document);
+  }
+
+  @Override
+  public List<List<Field>> getAndClearBuffer() {
+    List<List<Field>> docs = super.getAndClearBuffer();
+    _currentHeapSize = 0;
+    return docs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentSizeOf.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentSizeOf.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentSizeOf.java
new file mode 100644
index 0000000..7914838
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/DocumentSizeOf.java
@@ -0,0 +1,31 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.util.List;
+
+import org.apache.blur.utils.RamUsageEstimator;
+import org.apache.lucene.document.Field;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class DocumentSizeOf {
+
+  public static long sizeOf(List<Field> fields) {
+    return RamUsageEstimator.sizeOf(fields);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
index 0ec7dd3..ebe9a7b 100644
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
@@ -18,10 +18,7 @@ package org.apache.blur.mapreduce.lib;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -66,18 +63,19 @@ public class GenericBlurRecordWriter {
   private static final Counter NULL_COUNTER = new NullCounter();
 
   private final Text _prevKey = new Text();
-  private final Map<String, List<Field>> _documents = new TreeMap<String,
List<Field>>();
+
   private final IndexWriter _writer;
   private final FieldManager _fieldManager;
   private final Directory _finalDir;
   private final Directory _localDir;
   private final File _localPath;
-  private final int _maxDocumentBufferSize;
+  // private final int _maxDocumentBufferSize;
   private final IndexWriterConfig _conf;
   private final IndexWriterConfig _overFlowConf;
   private final Path _newIndex;
   private final boolean _indexLocally;
   private final boolean _optimizeInFlight;
+  private final DocumentBufferStrategy _documentBufferStrategy;
   private Counter _columnCount = NULL_COUNTER;
   private Counter _fieldCount = NULL_COUNTER;
   private Counter _recordCount = NULL_COUNTER;
@@ -98,6 +96,7 @@ public class GenericBlurRecordWriter {
 
   public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName)
throws IOException {
     _configuration = configuration;
+    _documentBufferStrategy = BlurOutputFormat.getDocumentBufferStrategy(_configuration);
     _indexLocally = BlurOutputFormat.isIndexLocally(_configuration);
     _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(_configuration);
 
@@ -105,7 +104,6 @@ public class GenericBlurRecordWriter {
     int shardCount = tableDescriptor.getShardCount();
     int shardId = attemptId % shardCount;
 
-    _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(_configuration);
     Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
     String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
     Path indexPath = new Path(tableOutput, shardName);
@@ -203,7 +201,7 @@ public class GenericBlurRecordWriter {
       _columnCount.increment(record.getColumns().size());
     }
     List<Field> document = RowDocumentUtil.getDoc(_fieldManager, blurRecord.getRowId(),
record);
-    List<Field> dup = _documents.put(recordId, document);
+    List<Field> dup = _documentBufferStrategy.add(recordId, document);
     if (_countersSetup) {
       if (dup != null) {
         _recordDuplicateCount.increment(1);
@@ -216,13 +214,14 @@ public class GenericBlurRecordWriter {
   }
 
   private void flushToTmpIndexIfNeeded() throws IOException {
-    if (_documents.size() > _maxDocumentBufferSize) {
+    if (_documentBufferStrategy.isFull()) {
+      LOG.info("Document Buffer is full overflow to disk.");
       flushToTmpIndex();
     }
   }
 
   private void flushToTmpIndex() throws IOException {
-    if (_documents.isEmpty()) {
+    if (_documentBufferStrategy.isEmpty()) {
       return;
     }
     _usingLocalTmpindex = true;
@@ -235,13 +234,17 @@ public class GenericBlurRecordWriter {
       // The local tmp writer has merging disabled so the first document in is
       // going to be doc 0.
       // Therefore the first document added is the prime doc
-      List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
+      List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
       docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
-      _localTmpWriter.addDocuments(docs);
+      for (List<Field> doc : docs) {
+        _localTmpWriter.addDocument(doc);
+      }
     } else {
-      _localTmpWriter.addDocuments(_documents.values());
+      List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
+      for (List<Field> doc : docs) {
+        _localTmpWriter.addDocument(doc);
+      }
     }
-    _documents.clear();
   }
 
   private void resetLocalTmp() {
@@ -280,7 +283,7 @@ public class GenericBlurRecordWriter {
         _rowOverFlowCount.increment(1);
       }
     } else {
-      if (_documents.isEmpty()) {
+      if (_documentBufferStrategy.isEmpty()) {
         if (_deletedRowId != null) {
           _writer.addDocument(getDeleteDoc());
           if (_countersSetup) {
@@ -288,13 +291,12 @@ public class GenericBlurRecordWriter {
           }
         }
       } else {
-        List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
+        List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
         docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
         _writer.addDocuments(docs);
         if (_countersSetup) {
-          _recordRateCounter.mark(_documents.size());
+          _recordRateCounter.mark(docs.size());
         }
-        _documents.clear();
       }
     }
     _deletedRowId = null;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/79803a13/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index 2cdeeb8..8d25470 100644
--- a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -235,6 +235,8 @@ public class BlurOutputFormatTest {
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
     BlurOutputFormat.setIndexLocally(job, false);
+    BlurOutputFormat.setDocumentBufferStrategy(job, DocumentBufferStrategyHeapSize.class);
+    BlurOutputFormat.setMaxDocumentBufferHeapSize(job, 128 * 1024);
 
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();


Mime
View raw message