incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [6/8] git commit: Adding both heap and record limits for a row request. This should help prevent OOM errors in the shard and controller servers.
Date Tue, 23 Jul 2013 18:58:43 GMT
Adding both heap and record limits for a row request.  This should help prevent OOM errors
in the shard and controller servers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/31150dbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/31150dbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/31150dbc

Branch: refs/heads/master
Commit: 31150dbce2ae4fe66e8344bc7d7a03e9b7f2202f
Parents: e0caab9
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Jul 23 14:54:18 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Jul 23 14:54:18 2013 -0400

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   | 30 +++++---
 .../results/BlurResultIterableSearcher.java     |  9 ++-
 .../blur/thrift/BlurControllerServer.java       |  2 +
 .../org/apache/blur/thrift/BlurShardServer.java |  2 +
 .../java/org/apache/blur/thrift/TableAdmin.java | 22 ++++++
 .../blur/thrift/ThriftBlurControllerServer.java |  2 +
 .../blur/thrift/ThriftBlurShardServer.java      |  5 +-
 .../java/org/apache/blur/utils/BlurUtil.java    | 15 +++-
 .../org/apache/blur/utils/CreateGarbage.java    | 37 ++++++++++
 .../ResetableDocumentStoredFieldVisitor.java    | 72 ++++++++++++++++----
 .../org/apache/blur/utils/BlurUtilsTest.java    |  4 +-
 .../org/apache/blur/mapreduce/BlurReducer.java  |  2 +-
 .../org/apache/blur/utils/BlurConstants.java    |  2 +
 13 files changed, 169 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index b071189..3ad2502 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -143,6 +143,7 @@ public class IndexManager {
   private Meter _queriesInternalMeter;
   private Timer _fetchTimer;
   private int _fetchCount = 100;
+  private int _maxHeapPerRowFetch = 10000000;
 
   public static AtomicBoolean DEBUG_RUN_SLOW = new AtomicBoolean(false);
 
@@ -230,7 +231,7 @@ public class IndexManager {
 
       Query highlightQuery = getHighlightQuery(selector, table, analyzer);
 
-      fetchRow(searcher.getIndexReader(), table, selector, fetchResult, highlightQuery, analyzer);
+      fetchRow(searcher.getIndexReader(), table, shard, selector, fetchResult, highlightQuery,
analyzer, _maxHeapPerRowFetch);
       if (fetchResult.rowResult != null) {
         if (fetchResult.rowResult.row != null && fetchResult.rowResult.row.records
!= null) {
           _recordsMeter.mark(fetchResult.rowResult.row.records.size());
@@ -389,7 +390,7 @@ public class IndexManager {
         Query facetedQuery = getFacetedQuery(blurQuery, userQuery, facetedCounts, analyzer,
context, postFilter,
             preFilter);
         call = new SimpleQueryParallelCall(running, table, status, _indexServer, facetedQuery,
blurQuery.selector,
-            _queriesInternalMeter, shardServerContext, runSlow, _fetchCount);
+            _queriesInternalMeter, shardServerContext, runSlow, _fetchCount, _maxHeapPerRowFetch);
       } else {
         Query query = getQuery(blurQuery.expertQuery);
         Filter filter = getFilter(blurQuery.expertQuery);
@@ -401,7 +402,7 @@ public class IndexManager {
         }
         Query facetedQuery = getFacetedQuery(blurQuery, userQuery, facetedCounts, analyzer,
context, null, null);
         call = new SimpleQueryParallelCall(running, table, status, _indexServer, facetedQuery,
blurQuery.selector,
-            _queriesInternalMeter, shardServerContext, runSlow, _fetchCount);
+            _queriesInternalMeter, shardServerContext, runSlow, _fetchCount, _maxHeapPerRowFetch);
       }
       MergerBlurResultIterable merger = new MergerBlurResultIterable(blurQuery);
       return ForkJoin.execute(_executor, blurIndexes.entrySet(), call, new Cancel() {
@@ -506,13 +507,13 @@ public class IndexManager {
     return _statusManager.queryStatusIdList(table);
   }
 
-  public static void fetchRow(IndexReader reader, String table, Selector selector, FetchResult
fetchResult,
-      Query highlightQuery) throws CorruptIndexException, IOException {
-    fetchRow(reader, table, selector, fetchResult, highlightQuery, null);
+  public static void fetchRow(IndexReader reader, String table, String shard, Selector selector,
FetchResult fetchResult,
+      Query highlightQuery, int maxHeap) throws CorruptIndexException, IOException {
+    fetchRow(reader, table, shard, selector, fetchResult, highlightQuery, null, maxHeap);
   }
 
-  public static void fetchRow(IndexReader reader, String table, Selector selector, FetchResult
fetchResult,
-      Query highlightQuery, BlurAnalyzer analyzer) throws CorruptIndexException, IOException
{
+  public static void fetchRow(IndexReader reader, String table, String shard, Selector selector,
FetchResult fetchResult,
+      Query highlightQuery, BlurAnalyzer analyzer, int maxHeap) throws CorruptIndexException,
IOException {
     fetchResult.table = table;
     String locationId = selector.locationId;
     int lastSlash = locationId.lastIndexOf('/');
@@ -578,7 +579,7 @@ public class IndexManager {
             docs = HighlightHelper.highlightDocuments(reader, term, fieldVisitor, selector,
highlightQuery, analyzer,
                 preTag, postTag);
           } else {
-            docs = BlurUtil.fetchDocuments(reader, term, fieldVisitor, selector);
+            docs = BlurUtil.fetchDocuments(reader, term, fieldVisitor, selector, maxHeap,
table + "/" + shard);
           }
           fetchResult.rowResult = new FetchRowResult(getRow(docs));
         }
@@ -1075,10 +1076,11 @@ public class IndexManager {
     private final ShardServerContext _shardServerContext;
     private final boolean _runSlow;
     private final int _fetchCount;
+    private final int _maxHeapPerRowFetch;
 
     public SimpleQueryParallelCall(AtomicBoolean running, String table, QueryStatus status,
IndexServer indexServer,
         Query query, Selector selector, Meter queriesInternalMeter, ShardServerContext shardServerContext,
-        boolean runSlow, int fetchCount) {
+        boolean runSlow, int fetchCount, int maxHeapPerRowFetch) {
       _running = running;
       _table = table;
       _status = status;
@@ -1089,6 +1091,7 @@ public class IndexManager {
       _shardServerContext = shardServerContext;
       _runSlow = runSlow;
       _fetchCount = fetchCount;
+      _maxHeapPerRowFetch = maxHeapPerRowFetch;
     }
 
     @Override
@@ -1120,7 +1123,7 @@ public class IndexManager {
         // BlurResultIterableSearcher will close searcher, if shard server
         // context is null.
         return new BlurResultIterableSearcher(_running, rewrite, _table, shard, searcher,
_selector,
-            _shardServerContext == null, _runSlow, _fetchCount);
+            _shardServerContext == null, _runSlow, _fetchCount, _maxHeapPerRowFetch);
       } catch (BlurException e) {
         switch (_status.getQueryStatus().getState()) {
         case INTERRUPTED:
@@ -1175,5 +1178,10 @@ public class IndexManager {
   public void setFetchCount(int fetchCount) {
     _fetchCount = fetchCount;
   }
+  
+  public void setMaxHeapPerRowFetch(int maxHeapPerRowFetch) {
+    _maxHeapPerRowFetch = maxHeapPerRowFetch;
+  }
+  
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
index 2fab74f..465f5a7 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
@@ -54,9 +54,11 @@ public class BlurResultIterableSearcher implements BlurResultIterable {
   private final AtomicBoolean _running;
   private final boolean _closeSearcher;
   private final boolean _runSlow;
+  private final int _maxHeapPerRowFetch;
 
   public BlurResultIterableSearcher(AtomicBoolean running, Query query, String table, String
shard,
-      IndexSearcherClosable searcher, Selector selector, boolean closeSearcher, boolean runSlow,
int fetchCount) throws BlurException {
+      IndexSearcherClosable searcher, Selector selector, boolean closeSearcher, boolean runSlow,
int fetchCount,
+      int maxHeapPerRowFetch) throws BlurException {
     _running = running;
     _table = table;
     _query = query;
@@ -66,6 +68,7 @@ public class BlurResultIterableSearcher implements BlurResultIterable {
     _closeSearcher = closeSearcher;
     _runSlow = runSlow;
     _fetchCount = fetchCount;
+    _maxHeapPerRowFetch = maxHeapPerRowFetch;
     performSearch();
   }
 
@@ -91,7 +94,9 @@ public class BlurResultIterableSearcher implements BlurResultIterable {
     _selector.setLocationId(resolveId);
     IndexManager.validSelector(_selector);
     try {
-      IndexManager.fetchRow(_searcher.getIndexReader(), _table, _selector, fetchResult, null);
+
+      IndexManager.fetchRow(_searcher.getIndexReader(), _table, _shard, _selector, fetchResult,
null,
+          _maxHeapPerRowFetch);
     } catch (IOException e) {
       throw new BlurException("Unknown IO error", null, ErrorType.UNKNOWN);
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 06e1ac2..173cff1 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -302,6 +302,7 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     checkTable(table);
     String cluster = _clusterStatus.getCluster(true, table);
     _queryChecker.checkQuery(blurQuery);
+    checkSelectorFetchSize(blurQuery.getSelector());
     int shardCount = _clusterStatus.getShardCount(true, cluster, table);
 
     OUTER: for (int retries = 0; retries < _maxDefaultRetries; retries++) {
@@ -428,6 +429,7 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
   @Override
   public FetchResult fetchRow(final String table, final Selector selector) throws BlurException,
TException {
     checkTable(table);
+    checkSelectorFetchSize(selector);
     IndexManager.validSelector(selector);
     String clientHostnamePort = null;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index c7f18ac..f133dc0 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -93,6 +93,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
     resetSearchers();
     _queryChecker.checkQuery(blurQuery);
     try {
+      checkSelectorFetchSize(blurQuery.getSelector());
       BlurQuery original = new BlurQuery(blurQuery);
       Selector selector = original.getSelector();
       if (selector != null) {
@@ -149,6 +150,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
   @Override
   public FetchResult fetchRow(String table, Selector selector) throws BlurException, TException
{
     checkTable(_cluster, table);
+    checkSelectorFetchSize(selector);
     try {
       FetchResult fetchResult = new FetchResult();
       _indexManager.fetchRow(table, selector, fetchResult);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
index 9ab5a8a..991ad01 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
@@ -31,6 +31,7 @@ import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Metric;
+import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.thrift.generated.TableStats;
@@ -44,6 +45,19 @@ public abstract class TableAdmin implements Iface {
   protected ZooKeeper _zookeeper;
   protected ClusterStatus _clusterStatus;
   protected BlurConfiguration _configuration;
+  protected int _maxRecordsPerRowFetchRequest = 1000;
+
+  protected void checkSelectorFetchSize(Selector selector) {
+    if (selector == null) {
+      return;
+    }
+    int maxRecordsToFetch = selector.getMaxRecordsToFetch();
+    if (maxRecordsToFetch > _maxRecordsPerRowFetchRequest) {
+      LOG.warn("Max records to fetch is too high [{0}] max [{1}] in Selector [{2}]", maxRecordsToFetch,
+          _maxRecordsPerRowFetchRequest, selector);
+      selector.setMaxRecordsToFetch(_maxRecordsPerRowFetchRequest);
+    }
+  }
 
   @Override
   public Map<String, Metric> metrics(Set<String> metrics) throws BlurException,
TException {
@@ -406,4 +420,12 @@ public abstract class TableAdmin implements Iface {
   public Map<String, String> configuration() throws BlurException, TException {
     return _configuration.getProperties();
   }
+  
+  public int getMaxRecordsPerRowFetchRequest() {
+    return _maxRecordsPerRowFetchRequest;
+  }
+
+  public void setMaxRecordsPerRowFetchRequest(int _maxRecordsPerRowFetchRequest) {
+    this._maxRecordsPerRowFetchRequest = _maxRecordsPerRowFetchRequest;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index a67c6bf..a6fb587 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -33,6 +33,7 @@ import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_SERVER_REMOTE_
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_MAX_RECORDS_PER_ROW_FETCH_REQUEST;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT;
@@ -118,6 +119,7 @@ public class ThriftBlurControllerServer extends ThriftServer {
     controllerServer.setMaxFetchDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_FETCH_DELAY,
2000));
     controllerServer.setMaxMutateDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_MUTATE_DELAY,
2000));
     controllerServer.setMaxDefaultDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_DEFAULT_DELAY,
2000));
+    controllerServer.setMaxRecordsPerRowFetchRequest(configuration.getInt(BLUR_MAX_RECORDS_PER_ROW_FETCH_REQUEST,
1000));
 
     controllerServer.init();
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index d26bdee..13f6038 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -124,8 +124,7 @@ public class ThriftBlurShardServer extends ThriftServer {
       int webServerPort = baseGuiPort + serverIndex;
 
       // TODO: this got ugly, there has to be a better way to handle all these
-      // params
-      // without reversing the mvn dependancy and making blur-gui on top.
+      // params without reversing the mvn dependancy and making blur-gui on top.
       httpServer = new HttpJettyServer(bindPort, webServerPort, configuration.getInt(BLUR_CONTROLLER_BIND_PORT,
-1),
           configuration.getInt(BLUR_SHARD_BIND_PORT, -1), configuration.getInt(BLUR_GUI_CONTROLLER_PORT,
-1),
           configuration.getInt(BLUR_GUI_SHARD_PORT, -1), "shard");
@@ -207,6 +206,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     indexManager.setFilterCache(filterCache);
     indexManager.setClusterStatus(clusterStatus);
     indexManager.setFetchCount(configuration.getInt(BLUR_SHARD_FETCHCOUNT, 100));
+    indexManager.setMaxHeapPerRowFetch(configuration.getInt(BLUR_MAX_HEAP_PER_ROW_FETCH,
10000000));
     indexManager.init();
 
     final BlurShardServer shardServer = new BlurShardServer();
@@ -216,6 +216,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     shardServer.setClusterStatus(clusterStatus);
     shardServer.setQueryChecker(queryChecker);
     shardServer.setConfiguration(configuration);
+    shardServer.setMaxRecordsPerRowFetchRequest(configuration.getInt(BLUR_MAX_RECORDS_PER_ROW_FETCH_REQUEST,
1000));
     shardServer.init();
 
     Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(shardServer, Iface.class, false);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index ee0ac9b..c56191d 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -122,12 +122,13 @@ public class BlurUtil {
   public static final Comparator<? super PeekableIterator<BlurResult, BlurException>>
HITS_PEEKABLE_ITERATOR_COMPARATOR = new BlurResultPeekableIteratorComparator();
   public static final Comparator<? super BlurResult> HITS_COMPARATOR = new BlurResultComparator();
   public static final Term PRIME_DOC_TERM = new Term(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE);
-  
+
   static class LoggerArgsState {
     public LoggerArgsState(int size) {
       _buffer = new ResetableTMemoryBuffer(size);
       _tjsonProtocol = new TJSONProtocol(_buffer);
     }
+
     TJSONProtocol _tjsonProtocol;
     ResetableTMemoryBuffer _buffer;
     StringBuilder _builder = new StringBuilder();
@@ -726,7 +727,8 @@ public class BlurUtil {
    * @throws IOException
    */
   public static List<Document> fetchDocuments(IndexReader reader, Term term,
-      ResetableDocumentStoredFieldVisitor fieldSelector, Selector selector) throws IOException
{
+      ResetableDocumentStoredFieldVisitor fieldSelector, Selector selector, int maxHeap,
String context)
+      throws IOException {
     IndexSearcher indexSearcher = new IndexSearcher(reader);
     int docFreq = reader.docFreq(term);
     BooleanQuery booleanQueryForFamily = null;
@@ -749,13 +751,22 @@ public class BlurUtil {
     int start = selector.getStartRecord();
     int end = selector.getMaxRecordsToFetch() + start;
 
+    int totalHeap = 0;
+
     for (int i = start; i < end; i++) {
       if (i >= totalHits) {
         break;
       }
+      if (totalHeap >= maxHeap) {
+        LOG.warn("Max heap size exceeded for this request [{0}] max [{1}] for [{2}] and rowid
[{3}]", totalHeap,
+            maxHeap, context, term.text());
+        break;
+      }
       int doc = topDocs.scoreDocs[i].doc;
       indexSearcher.doc(doc, fieldSelector);
       docs.add(fieldSelector.getDocument());
+      int heapSize = fieldSelector.getSize();
+      totalHeap += heapSize;
       fieldSelector.reset();
     }
     return docs;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/main/java/org/apache/blur/utils/CreateGarbage.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/CreateGarbage.java b/blur-core/src/main/java/org/apache/blur/utils/CreateGarbage.java
new file mode 100644
index 0000000..4f4c6a7
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/utils/CreateGarbage.java
@@ -0,0 +1,37 @@
+package org.apache.blur.utils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.utils.GCWatcher.Action;
+
+public class CreateGarbage {
+
+  public static void main(String[] args) throws InterruptedException {
+    final Map<String, String> map = new ConcurrentHashMap<String, String>();
+    Action action = new Action() {
+      @Override
+      public void takeAction() throws Exception {
+        map.clear();
+      }
+    };
+    GCWatcher.init(0.75);
+    GCWatcher.registerAction(action);
+    while (true) {
+
+      int count = 0;
+      int max = 100000;
+      for (long i = 0; i < 100000000; i++) {
+        if (count >= max) {
+          Thread.sleep(250);
+          count = 0;
+        }
+        map.put(Long.toString(i), Long.toString(i));
+        count++;
+      }
+      System.out.println(map.size());
+      map.clear();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/main/java/org/apache/blur/utils/ResetableDocumentStoredFieldVisitor.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/ResetableDocumentStoredFieldVisitor.java
b/blur-core/src/main/java/org/apache/blur/utils/ResetableDocumentStoredFieldVisitor.java
index 24fee3e..d991d35 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/ResetableDocumentStoredFieldVisitor.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/ResetableDocumentStoredFieldVisitor.java
@@ -29,19 +29,41 @@ import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.FieldInfo;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.StoredFieldVisitor;
-
-/** A {@link StoredFieldVisitor} that creates a {@link
- *  Document} containing all stored fields, or only specific
- *  requested fields provided to {@link #DocumentStoredFieldVisitor(Set)}.
- *  <p>
- *  This is used by {@link IndexReader#document(int)} to load a
- *  document.
- *
- * @lucene.experimental */
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * A {@link StoredFieldVisitor} that creates a {@link Document} containing all
+ * stored fields, or only specific requested fields provided to
+ * {@link #DocumentStoredFieldVisitor(Set)}.
+ * <p>
+ * This is used by {@link IndexReader#document(int)} to load a document.
+ * 
+ * @lucene.experimental
+ */
 
 public class ResetableDocumentStoredFieldVisitor extends StoredFieldVisitor {
+
+  private static final int _emptyDocumentSize;
+  private static final int _integerFieldSize;
+  private static final int _longFieldSize;
+  private static final int _doubleFieldSize;
+  private static final int _floatFieldSize;
+  private static final int _emptyString;
+  private static final int _emptyByteArrayFieldSize;
+
+  static {
+    _emptyDocumentSize = (int) RamUsageEstimator.sizeOf(new Document());
+    _integerFieldSize = (int) RamUsageEstimator.sizeOf(new StoredField("", 0));
+    _longFieldSize = (int) RamUsageEstimator.sizeOf(new StoredField("", 0l));
+    _doubleFieldSize = (int) RamUsageEstimator.sizeOf(new StoredField("", 0.0));
+    _floatFieldSize = (int) RamUsageEstimator.sizeOf(new StoredField("", 0.0f));
+    _emptyByteArrayFieldSize = (int) RamUsageEstimator.sizeOf(new StoredField("", new byte[]
{}));
+    _emptyString = (int) RamUsageEstimator.sizeOf("");
+  }
+
   private Document doc = new Document();
   private final Set<String> fieldsToAdd;
+  private int size = _emptyDocumentSize;
 
   /** Load only fields named in the provided <code>Set&lt;String&gt;</code>.
*/
   public ResetableDocumentStoredFieldVisitor(Set<String> fieldsToAdd) {
@@ -51,7 +73,7 @@ public class ResetableDocumentStoredFieldVisitor extends StoredFieldVisitor
{
   /** Load only fields named in the provided <code>Set&lt;String&gt;</code>.
*/
   public ResetableDocumentStoredFieldVisitor(String... fields) {
     fieldsToAdd = new HashSet<String>(fields.length);
-    for(String field : fields) {
+    for (String field : fields) {
       fieldsToAdd.add(field);
     }
   }
@@ -64,6 +86,8 @@ public class ResetableDocumentStoredFieldVisitor extends StoredFieldVisitor
{
   @Override
   public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException {
     doc.add(new StoredField(fieldInfo.name, value));
+    addSize(fieldInfo, _emptyByteArrayFieldSize);
+    size += value.length;
   }
 
   @Override
@@ -74,26 +98,39 @@ public class ResetableDocumentStoredFieldVisitor extends StoredFieldVisitor
{
     ft.setOmitNorms(fieldInfo.omitsNorms());
     ft.setIndexOptions(fieldInfo.getIndexOptions());
     doc.add(new Field(fieldInfo.name, value, ft));
+    size += _emptyString * 2;
+    size += fieldInfo.name.length() * 2;
+    size += value.length() * 2;
   }
 
   @Override
   public void intField(FieldInfo fieldInfo, int value) {
     doc.add(new StoredField(fieldInfo.name, value));
+    addSize(fieldInfo, _integerFieldSize);
   }
 
   @Override
   public void longField(FieldInfo fieldInfo, long value) {
     doc.add(new StoredField(fieldInfo.name, value));
+    addSize(fieldInfo, _longFieldSize);
   }
 
   @Override
   public void floatField(FieldInfo fieldInfo, float value) {
     doc.add(new StoredField(fieldInfo.name, value));
+    addSize(fieldInfo, _floatFieldSize);
   }
 
   @Override
   public void doubleField(FieldInfo fieldInfo, double value) {
     doc.add(new StoredField(fieldInfo.name, value));
+    addSize(fieldInfo, _doubleFieldSize);
+  }
+
+  private void addSize(FieldInfo fieldInfo, int fieldSize) {
+    size += fieldSize;
+    size += _emptyString;
+    size += fieldInfo.name.length() * 2;
   }
 
   @Override
@@ -103,16 +140,21 @@ public class ResetableDocumentStoredFieldVisitor extends StoredFieldVisitor
{
 
   /**
    * Retrieve the visited document.
-   * @return Document populated with stored fields. Note that only
-   *         the stored information in the field instances is valid,
-   *         data such as boosts, indexing options, term vector options,
-   *         etc is not set.
+   * 
+   * @return Document populated with stored fields. Note that only the stored
+   *         information in the field instances is valid, data such as boosts,
+   *         indexing options, term vector options, etc is not set.
    */
   public Document getDocument() {
     return doc;
   }
-  
+
+  public int getSize() {
+    return size;
+  }
+
   public void reset() {
     doc = new Document();
+    size = _emptyDocumentSize;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-core/src/test/java/org/apache/blur/utils/BlurUtilsTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/utils/BlurUtilsTest.java b/blur-core/src/test/java/org/apache/blur/utils/BlurUtilsTest.java
index 5346f6d..a8b3ed5 100644
--- a/blur-core/src/test/java/org/apache/blur/utils/BlurUtilsTest.java
+++ b/blur-core/src/test/java/org/apache/blur/utils/BlurUtilsTest.java
@@ -162,7 +162,7 @@ public class BlurUtilsTest {
 	  selector.setColumnFamiliesToFetch(columnFamiliesToFetch);
 	  
 	  ResetableDocumentStoredFieldVisitor resetableDocumentStoredFieldVisitor = new ResetableDocumentStoredFieldVisitor();
-	  List<Document> docs = BlurUtil.fetchDocuments(getReader(), new Term("a","b"), resetableDocumentStoredFieldVisitor,
selector);
+	  List<Document> docs = BlurUtil.fetchDocuments(getReader(), new Term("a","b"), resetableDocumentStoredFieldVisitor,
selector, 10000000, "test-context");
 	  assertEquals(docs.size(),1);
   }
   
@@ -170,7 +170,7 @@ public class BlurUtilsTest {
   public void testFetchDocumentsWithoutFamily() throws CorruptIndexException, LockObtainFailedException,
IOException{
 	  Selector selector = new Selector();
 	  ResetableDocumentStoredFieldVisitor resetableDocumentStoredFieldVisitor = new ResetableDocumentStoredFieldVisitor();
-	  List<Document> docs = BlurUtil.fetchDocuments(getReader(), new Term("a","b"), resetableDocumentStoredFieldVisitor,
selector);
+	  List<Document> docs = BlurUtil.fetchDocuments(getReader(), new Term("a","b"), resetableDocumentStoredFieldVisitor,
selector, 10000000, "test-context");
 	  assertEquals(docs.size(),2);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
index 413c0d8..bff087a 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
@@ -246,7 +246,7 @@ public class BlurReducer extends Reducer<Text, BlurMutate, Text, BlurMutate>
{
 
   protected void fetchOldRecords() throws IOException {
     List<Document> docs = BlurUtil.fetchDocuments(_reader, _rowIdTerm, new ResetableDocumentStoredFieldVisitor(),
-        new Selector());
+        new Selector(), Integer.MAX_VALUE, "reducer-context");
     for (Document document : docs) {
       String recordId = document.get(RECORD_ID);
       // add them to the new records if the new records do not contain them.

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/31150dbc/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 50a819e..9c434e3 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -53,6 +53,8 @@ public class BlurConstants {
   public static final String BLUR_QUERY_MAX_RECORD_FETCH = "blur.query.max.record.fetch";
   public static final String BLUR_QUERY_MAX_RESULTS_FETCH = "blur.query.max.results.fetch";
   public static final String BLUR_SHARD_FETCHCOUNT = "blur.shard.fetchcount";
+  public static final String BLUR_MAX_HEAP_PER_ROW_FETCH = "blur.max.heap.per.row.fetch";
+  public static final String BLUR_MAX_RECORDS_PER_ROW_FETCH_REQUEST = "blur.max.records.per.row.fetch.request";
 
   public static final String BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT = "blur.shard.server.thrift.thread.count";
   public static final String BLUR_SHARD_CACHE_MAX_TIMETOLIVE = "blur.shard.cache.max.timetolive";


Mime
View raw message