incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cr...@apache.org
Subject [10/50] [abbrv] git commit: Adding part of a deep paging implementation, this will not help anything until higher level caching is in place to make use of the low level skipping.
Date Sun, 18 May 2014 21:41:45 GMT
Adding part of a deep paging implementation, this will not help anything until higher level
caching is in place to make use of the low level skipping.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/b615571e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/b615571e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/b615571e

Branch: refs/heads/console-v2
Commit: b615571e8b30e902a34e93d658b1e6a64913cf37
Parents: 4a4ccd5
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Apr 6 15:20:50 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Apr 6 15:20:50 2014 -0400

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   |  14 +-
 .../results/BlurResultIterableSearcher.java     |   5 +-
 .../blur/thrift/ThriftBlurShardServer.java      |   7 +-
 .../apache/blur/manager/IndexManagerTest.java   |   3 +-
 .../blur/lucene/search/DeepPagingCache.java     | 279 +++++++++++++++++++
 .../blur/lucene/search/IterablePaging.java      |  78 ++++--
 .../lucene/search/TestingPagingCollector.java   |  44 ++-
 .../apache/blur/metrics/MetricsConstants.java   |   1 +
 .../org/apache/blur/utils/BlurConstants.java    |   1 +
 .../src/main/resources/blur-default.properties  |   3 +
 10 files changed, 403 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b615571e/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index 4f5c456..6891884 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -51,6 +51,7 @@ import org.apache.blur.index.ExitableReader;
 import org.apache.blur.index.ExitableReader.ExitingReaderException;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.DeepPagingCache;
 import org.apache.blur.lucene.search.FacetExecutor;
 import org.apache.blur.lucene.search.FacetQuery;
 import org.apache.blur.lucene.search.StopExecutionCollector.StopExecutionCollectorException;
@@ -166,12 +167,14 @@ public class IndexManager {
 
   private final int _threadCount;
   private final int _mutateThreadCount;
+  private final DeepPagingCache _deepPagingCache;
 
   public static AtomicBoolean DEBUG_RUN_SLOW = new AtomicBoolean(false);
 
   public IndexManager(IndexServer indexServer, ClusterStatus clusterStatus, BlurFilterCache
filterCache,
       int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, long
statusCleanupTimerDelay,
-      int facetThreadCount) {
+      int facetThreadCount, DeepPagingCache deepPagingCache) {
+    _deepPagingCache = deepPagingCache;
     _indexServer = indexServer;
     _clusterStatus = clusterStatus;
     _filterCache = filterCache;
@@ -515,7 +518,7 @@ public class IndexManager {
       Sort sort = getSort(blurQuery, fieldManager);
       call = new SimpleQueryParallelCall(running, table, status, facetedQuery, blurQuery.selector,
           _queriesInternalMeter, shardServerContext, runSlow, _fetchCount, _maxHeapPerRowFetch,
-          context.getSimilarity(), context, sort);
+          context.getSimilarity(), context, sort, _deepPagingCache);
       trace.done();
       MergerBlurResultIterable merger = new MergerBlurResultIterable(blurQuery);
       BlurResultIterable merge = ForkJoin.execute(_executor, blurIndexes.entrySet(), call,
new Cancel() {
@@ -1168,10 +1171,12 @@ public class IndexManager {
     private final Similarity _similarity;
     private final TableContext _context;
     private final Sort _sort;
+    private final DeepPagingCache _deepPagingCache;
 
     public SimpleQueryParallelCall(AtomicBoolean running, String table, QueryStatus status,
Query query,
         Selector selector, Meter queriesInternalMeter, ShardServerContext shardServerContext,
boolean runSlow,
-        int fetchCount, int maxHeapPerRowFetch, Similarity similarity, TableContext context,
Sort sort) {
+        int fetchCount, int maxHeapPerRowFetch, Similarity similarity, TableContext context,
Sort sort,
+        DeepPagingCache deepPagingCache) {
       _running = running;
       _table = table;
       _status = status;
@@ -1185,6 +1190,7 @@ public class IndexManager {
       _similarity = similarity;
       _context = context;
       _sort = sort;
+      _deepPagingCache = deepPagingCache;
     }
 
     @Override
@@ -1221,7 +1227,7 @@ public class IndexManager {
         // context is null.
         trace2 = Trace.trace("query initial search");
         return new BlurResultIterableSearcher(_running, rewrite, _table, shard, searcher,
_selector,
-            _shardServerContext == null, _runSlow, _fetchCount, _maxHeapPerRowFetch, _context,
_sort);
+            _shardServerContext == null, _runSlow, _fetchCount, _maxHeapPerRowFetch, _context,
_sort, _deepPagingCache);
       } catch (BlurException e) {
         switch (_status.getQueryStatus().getState()) {
         case INTERRUPTED:

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b615571e/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
index c0ba88c..000a9cf 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.blur.lucene.search.DeepPagingCache;
 import org.apache.blur.lucene.search.IterablePaging;
 import org.apache.blur.lucene.search.IterablePaging.ProgressRef;
 import org.apache.blur.lucene.search.IterablePaging.TotalHitsRef;
@@ -58,7 +59,7 @@ public class BlurResultIterableSearcher implements BlurResultIterable {
 
   public BlurResultIterableSearcher(AtomicBoolean running, Query query, String table, String
shard,
       IndexSearcherClosable searcher, Selector selector, boolean closeSearcher, boolean runSlow,
int fetchCount,
-      int maxHeapPerRowFetch, TableContext context, Sort sort) throws BlurException {
+      int maxHeapPerRowFetch, TableContext context, Sort sort, DeepPagingCache deepPagingCache)
throws BlurException {
     _sort = sort;
     _running = running;
     _query = query;
@@ -68,7 +69,7 @@ public class BlurResultIterableSearcher implements BlurResultIterable {
     _runSlow = runSlow;
     _fetchCount = fetchCount;
     _iterablePaging = new IterablePaging(_running, _searcher, _query, _fetchCount, _totalHitsRef,
_progressRef,
-        _runSlow, _sort);
+        _runSlow, _sort, deepPagingCache);
     _iteratorConverter = new IteratorConverter<ScoreDoc, BlurResult, BlurException>(_iterablePaging.iterator(),
         new Converter<ScoreDoc, BlurResult, BlurException>() {
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b615571e/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 2c9fc92..975f5c8 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -32,6 +32,7 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_ADDRESS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCK_CACHE_TOTAL_SIZE;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCK_CACHE_VERSION;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DEEP_PAGING_CACHE_SIZE;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FETCHCOUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FILTER_CACHE_CLASS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
@@ -62,6 +63,7 @@ import org.apache.blur.gui.HttpJettyServer;
 import org.apache.blur.gui.JSONReporterServlet;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.DeepPagingCache;
 import org.apache.blur.manager.BlurFilterCache;
 import org.apache.blur.manager.BlurQueryChecker;
 import org.apache.blur.manager.DefaultBlurFilterCache;
@@ -230,9 +232,12 @@ public class ThriftBlurShardServer extends ThriftServer {
     int mutateThreadCount = configuration.getInt(BLUR_INDEXMANAGER_MUTATE_THREAD_COUNT, 32);
     int facetThreadCount = configuration.getInt(BLUR_INDEXMANAGER_FACET_THREAD_COUNT, 16);
     long statusCleanupTimerDelay = TimeUnit.SECONDS.toMillis(10);
+    int cacheSize = configuration.getInt(BLUR_SHARD_DEEP_PAGING_CACHE_SIZE, 1000);
+    DeepPagingCache deepPagingCache = new DeepPagingCache(cacheSize);
 
     final IndexManager indexManager = new IndexManager(indexServer, clusterStatus, filterCache,
maxHeapPerRowFetch,
-        fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay,
facetThreadCount);
+        fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay,
facetThreadCount,
+        deepPagingCache);
 
     final BlurShardServer shardServer = new BlurShardServer();
     shardServer.setIndexServer(indexServer);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b615571e/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index 99377dd..5eb437b 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -47,6 +47,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLongArray;
 
 import org.apache.blur.BlurConfiguration;
+import org.apache.blur.lucene.search.DeepPagingCache;
 import org.apache.blur.manager.clusterstatus.ClusterStatus;
 import org.apache.blur.manager.indexserver.LocalIndexServer;
 import org.apache.blur.manager.results.BlurResultIterable;
@@ -119,7 +120,7 @@ public class IndexManagerTest {
     BlurFilterCache filterCache = new DefaultBlurFilterCache(new BlurConfiguration());
     long statusCleanupTimerDelay = 1000;
     indexManager = new IndexManager(server, getClusterStatus(tableDescriptor), filterCache,
10000000, 100, 1, 1,
-        statusCleanupTimerDelay, 0);
+        statusCleanupTimerDelay, 0, new DeepPagingCache());
     setupData();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b615571e/blur-query/src/main/java/org/apache/blur/lucene/search/DeepPagingCache.java
----------------------------------------------------------------------
diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/DeepPagingCache.java b/blur-query/src/main/java/org/apache/blur/lucene/search/DeepPagingCache.java
new file mode 100644
index 0000000..5c3fb4f
--- /dev/null
+++ b/blur-query/src/main/java/org/apache/blur/lucene/search/DeepPagingCache.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.lucene.search;
+
+import static org.apache.blur.metrics.MetricsConstants.DEEP_PAGING_CACHE;
+import static org.apache.blur.metrics.MetricsConstants.EVICTION;
+import static org.apache.blur.metrics.MetricsConstants.HIT;
+import static org.apache.blur.metrics.MetricsConstants.MISS;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+import static org.apache.blur.metrics.MetricsConstants.SIZE;
+
+import java.lang.ref.WeakReference;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EvictionListener;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+public class DeepPagingCache {
+
+  private final static Log LOG = LogFactory.getLog(DeepPagingCache.class);
+
+  private static final long DEFAULT_MAX = 1024;
+
+  private final ConcurrentNavigableMap<DeepPageKeyPlusPosition, DeepPageContainer>
_positionCache;
+  private final ConcurrentLinkedHashMap<DeepPageKeyPlusPosition, DeepPageContainer>
_lruCache;
+  private final Meter _hits;
+  private final Meter _misses;
+  private final Meter _evictions;
+
+  public DeepPagingCache() {
+    this(DEFAULT_MAX);
+  }
+
+  public DeepPagingCache(long maxEntriesForDeepPaging) {
+    _hits = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, DEEP_PAGING_CACHE, HIT), HIT,
TimeUnit.SECONDS);
+    _misses = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, DEEP_PAGING_CACHE, MISS),
MISS, TimeUnit.SECONDS);
+    _evictions = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, DEEP_PAGING_CACHE, EVICTION),
EVICTION,
+        TimeUnit.SECONDS);
+    _lruCache = new ConcurrentLinkedHashMap.Builder<DeepPageKeyPlusPosition, DeepPageContainer>()
+        .maximumWeightedCapacity(maxEntriesForDeepPaging)
+        .listener(new EvictionListener<DeepPageKeyPlusPosition, DeepPageContainer>()
{
+          @Override
+          public void onEviction(DeepPageKeyPlusPosition key, DeepPageContainer value) {
+            _positionCache.remove(key);
+            _evictions.mark();
+          }
+        }).build();
+    Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, DEEP_PAGING_CACHE, SIZE), new Gauge<Long>()
{
+      @Override
+      public Long value() {
+        return _lruCache.weightedSize();
+      }
+    });
+    _positionCache = new ConcurrentSkipListMap<DeepPageKeyPlusPosition, DeepPageContainer>();
+  }
+
+  public void add(DeepPageKey key, DeepPageContainer deepPageContainer) {
+    LOG.debug("Adding DeepPageContainer [{0}] for key [{1}]", deepPageContainer, key);
+    DeepPageKeyPlusPosition k = new DeepPageKeyPlusPosition(deepPageContainer.position, key);
+    _lruCache.put(k, deepPageContainer);
+    _positionCache.put(k, deepPageContainer);
+  }
+
+  public DeepPageContainer lookup(DeepPageKey key, int skipTo) {
+    LOG.debug("Looking up DeepPageContainer for skipTo [{0}] with key hashcode [{1}] and
key [{2}]", skipTo,
+        key.hashCode(), key);
+    DeepPageKeyPlusPosition k = new DeepPageKeyPlusPosition(skipTo, key);
+    DeepPageContainer deepPageContainer = _lruCache.get(k);
+    if (deepPageContainer != null) {
+      _hits.mark();
+      return deepPageContainer;
+    }
+
+    ConcurrentNavigableMap<DeepPageKeyPlusPosition, DeepPageContainer> headMap = _positionCache.headMap(k,
true);
+    if (headMap == null || headMap.isEmpty()) {
+      _misses.mark();
+      return null;
+    }
+    Entry<DeepPageKeyPlusPosition, DeepPageContainer> firstEntry = headMap.lastEntry();
+    DeepPageKeyPlusPosition dpkpp = firstEntry.getKey();
+    if (dpkpp._deepPageKey.equals(key)) {
+      _hits.mark();
+      return firstEntry.getValue();
+    }
+    _misses.mark();
+    return null;
+  }
+
+  static class DeepPageKeyWithPosition {
+    final DeepPageKey _deepPageKey;
+    final int _position;
+
+    DeepPageKeyWithPosition(DeepPageKey deepPageKey, int position) {
+      _deepPageKey = deepPageKey;
+      _position = position;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((_deepPageKey == null) ? 0 : _deepPageKey.hashCode());
+      result = prime * result + _position;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      DeepPageKeyWithPosition other = (DeepPageKeyWithPosition) obj;
+      if (_deepPageKey == null) {
+        if (other._deepPageKey != null)
+          return false;
+      } else if (!_deepPageKey.equals(other._deepPageKey))
+        return false;
+      if (_position != other._position)
+        return false;
+      return true;
+    }
+
+  }
+
+  static class DeepPageKeyPlusPosition implements Comparable<DeepPageKeyPlusPosition>
{
+
+    final int _position;
+    final DeepPageKey _deepPageKey;
+
+    DeepPageKeyPlusPosition(int position, DeepPageKey deepPageKey) {
+      _position = position;
+      _deepPageKey = deepPageKey;
+    }
+
+    @Override
+    public int compareTo(DeepPageKeyPlusPosition o) {
+      int compareTo = _deepPageKey.compareTo(o._deepPageKey);
+      if (compareTo == 0) {
+        return _position - o._position;
+      }
+      return compareTo;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((_deepPageKey == null) ? 0 : _deepPageKey.hashCode());
+      result = prime * result + _position;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      DeepPageKeyPlusPosition other = (DeepPageKeyPlusPosition) obj;
+      if (_deepPageKey == null) {
+        if (other._deepPageKey != null)
+          return false;
+      } else if (!_deepPageKey.equals(other._deepPageKey))
+        return false;
+      if (_position != other._position)
+        return false;
+      return true;
+    }
+
+  }
+
+  public static class DeepPageKey implements Comparable<DeepPageKey> {
+
+    public DeepPageKey(Query q, Sort s, Object o) {
+      _indexKey = new WeakReference<Object>(o);
+      _sort = s;
+      _query = q;
+    }
+
+    final WeakReference<Object> _indexKey;
+    final Query _query;
+    final Sort _sort;
+
+    @Override
+    public int compareTo(DeepPageKey o) {
+      return hashCode() - o.hashCode();
+    }
+
+    private Object getIndexKey() {
+      return _indexKey.get();
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((getIndexKey() == null) ? 0 : getIndexKey().hashCode());
+      result = prime * result + ((_query == null) ? 0 : _query.hashCode());
+      result = prime * result + ((_sort == null) ? 0 : _sort.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      DeepPageKey other = (DeepPageKey) obj;
+      if (getIndexKey() == null) {
+        if (other.getIndexKey() != null)
+          return false;
+      } else if (!getIndexKey().equals(other.getIndexKey()))
+        return false;
+      if (_query == null) {
+        if (other._query != null)
+          return false;
+      } else if (!_query.equals(other._query))
+        return false;
+      if (_sort == null) {
+        if (other._sort != null)
+          return false;
+      } else if (!_sort.equals(other._sort))
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "DeepPageKey [_indexKey=" + getIndexKey() + ", _query=" + _query + ", _sort="
+ _sort + "]";
+    }
+
+  }
+
+  public static class DeepPageContainer {
+    int position;
+    ScoreDoc scoreDoc;
+
+    @Override
+    public String toString() {
+      return "DeepPageContainer [position=" + position + ", scoreDoc=" + scoreDoc + "]";
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b615571e/blur-query/src/main/java/org/apache/blur/lucene/search/IterablePaging.java
----------------------------------------------------------------------
diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/IterablePaging.java b/blur-query/src/main/java/org/apache/blur/lucene/search/IterablePaging.java
index 3fd7d00..1c638fc 100644
--- a/blur-query/src/main/java/org/apache/blur/lucene/search/IterablePaging.java
+++ b/blur-query/src/main/java/org/apache/blur/lucene/search/IterablePaging.java
@@ -22,6 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.blur.index.ExitableReader.ExitingReaderException;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.DeepPagingCache.DeepPageContainer;
+import org.apache.blur.lucene.search.DeepPagingCache.DeepPageKey;
 import org.apache.blur.lucene.search.StopExecutionCollector.StopExecutionCollectorException;
 import org.apache.blur.thrift.BException;
 import org.apache.blur.thrift.generated.BlurException;
@@ -44,12 +48,18 @@ import org.apache.lucene.search.TopScoreDocCollector;
  */
 public class IterablePaging implements BlurIterable<ScoreDoc, BlurException> {
 
+  private static final Log LOG = LogFactory.getLog(IterablePaging.class);
+
+  private static final boolean DISABLED = true;
+
+  private final DeepPagingCache _deepPagingCache;
   private final IndexSearcher _searcher;
   private final Query _query;
   private final AtomicBoolean _running;
   private final int _numHitsToCollect;
   private final boolean _runSlow;
   private final Sort _sort;
+  private final DeepPageKey _key;
 
   private TotalHitsRef _totalHitsRef;
   private ProgressRef _progressRef;
@@ -57,7 +67,9 @@ public class IterablePaging implements BlurIterable<ScoreDoc, BlurException>
{
   private int gather = -1;
 
   public IterablePaging(AtomicBoolean running, IndexSearcher searcher, Query query, int numHitsToCollect,
-      TotalHitsRef totalHitsRef, ProgressRef progressRef, boolean runSlow, Sort sort) throws
BlurException {
+      TotalHitsRef totalHitsRef, ProgressRef progressRef, boolean runSlow, Sort sort, DeepPagingCache
deepPagingCache)
+      throws BlurException {
+    _deepPagingCache = deepPagingCache;
     _running = running;
     _sort = sort;
     try {
@@ -70,6 +82,7 @@ public class IterablePaging implements BlurIterable<ScoreDoc, BlurException>
{
     _totalHitsRef = totalHitsRef == null ? new TotalHitsRef() : totalHitsRef;
     _progressRef = progressRef == null ? new ProgressRef() : progressRef;
     _runSlow = runSlow;
+    _key = new DeepPageKey(_query, _sort, _searcher.getIndexReader().getCombinedCoreAndDeletesKey());
   }
 
   public static class TotalHitsRef {
@@ -171,7 +184,32 @@ public class IterablePaging implements BlurIterable<ScoreDoc, BlurException>
{
    */
   @Override
   public BlurIterator<ScoreDoc, BlurException> iterator() throws BlurException {
-    return skipHits(new PagingIterator());
+    PagingIterator iterator = new PagingIterator();
+    DeepPageContainer deepPageContainer = getDeepPageContainer(skipTo);
+    if (deepPageContainer == null) {
+      deepPageContainer = new DeepPageContainer();
+    }
+    iterator.after = deepPageContainer.scoreDoc;
+    iterator.counter = deepPageContainer.position;
+    iterator.search();
+    _progressRef.skipTo.set(skipTo);
+    if (skipTo - deepPageContainer.position != 0) {
+      LOG.warn("Skipping [{0}], Key [{1}] was missing, having to execute extra searches.",
skipTo
+          - deepPageContainer.position, _key);
+    }
+    for (int i = deepPageContainer.position; i < skipTo && iterator.hasNext();
i++) {
+      // eats the hits, and moves the iterator to the desired skip to position.
+      _progressRef.currentHitPosition.set(i);
+      iterator.next();
+    }
+    return iterator;
+  }
+
+  private DeepPageContainer getDeepPageContainer(int skipTo) {
+    if (DISABLED) {
+      return null;
+    }
+    return _deepPagingCache.lookup(_key, skipTo);
   }
 
   class PagingIterator implements BlurIterator<ScoreDoc, BlurException> {
@@ -180,11 +218,7 @@ public class IterablePaging implements BlurIterable<ScoreDoc, BlurException>
{
     private int counter = 0;
     private int offset = 0;
     private int endPosition = gather == -1 ? Integer.MAX_VALUE : skipTo + gather;
-    private ScoreDoc lastScoreDoc;
-
-    PagingIterator() throws BlurException {
-      search();
-    }
+    ScoreDoc after;
 
     void search() throws BlurException {
       long s = System.currentTimeMillis();
@@ -192,10 +226,9 @@ public class IterablePaging implements BlurIterable<ScoreDoc, BlurException>
{
       try {
         TopDocsCollector<?> collector;
         if (_sort == null) {
-          collector = TopScoreDocCollector.create(_numHitsToCollect, lastScoreDoc, true);
+          collector = TopScoreDocCollector.create(_numHitsToCollect, after, true);
         } else {
-          collector = TopFieldCollector.create(_sort, _numHitsToCollect, (FieldDoc) lastScoreDoc,
true, true, false,
-              true);
+          collector = TopFieldCollector.create(_sort, _numHitsToCollect, (FieldDoc) after,
true, true, false, true);
         }
         Collector col = new StopExecutionCollector(collector, _running);
         if (_runSlow) {
@@ -213,14 +246,25 @@ public class IterablePaging implements BlurIterable<ScoreDoc, BlurException>
{
         throw new BException("Unknown error during search call", e);
       }
       if (scoreDocs.length > 0) {
-        lastScoreDoc = scoreDocs[scoreDocs.length - 1];
+        after = scoreDocs[scoreDocs.length - 1];
+        addLastScoreDoc();
       } else {
-        lastScoreDoc = null;
+        after = null;
       }
       long e = System.currentTimeMillis();
       _progressRef.queryTime.addAndGet(e - s);
     }
 
+    private void addLastScoreDoc() {
+      if (DISABLED) {
+        return;
+      }
+      DeepPageContainer deepPageContainer = new DeepPageContainer();
+      deepPageContainer.scoreDoc = after;
+      deepPageContainer.position = counter + scoreDocs.length;
+      _deepPagingCache.add(_key, deepPageContainer);
+    }
+
     @Override
     public boolean hasNext() {
       return counter < _totalHitsRef.totalHits() && counter < endPosition ?
true : false;
@@ -242,14 +286,4 @@ public class IterablePaging implements BlurIterable<ScoreDoc, BlurException>
{
     }
   }
 
-  private BlurIterator<ScoreDoc, BlurException> skipHits(PagingIterator iterator) throws
BlurException {
-    _progressRef.skipTo.set(skipTo);
-    for (int i = 0; i < skipTo && iterator.hasNext(); i++) {
-      // eats the hits, and moves the iterator to the desired skip to position.
-      _progressRef.currentHitPosition.set(i);
-      iterator.next();
-    }
-    return iterator;
-  }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b615571e/blur-query/src/test/java/org/apache/blur/lucene/search/TestingPagingCollector.java
----------------------------------------------------------------------
diff --git a/blur-query/src/test/java/org/apache/blur/lucene/search/TestingPagingCollector.java
b/blur-query/src/test/java/org/apache/blur/lucene/search/TestingPagingCollector.java
index 1efccf1..ccdec7e 100644
--- a/blur-query/src/test/java/org/apache/blur/lucene/search/TestingPagingCollector.java
+++ b/blur-query/src/test/java/org/apache/blur/lucene/search/TestingPagingCollector.java
@@ -39,6 +39,7 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
@@ -63,7 +64,8 @@ public class TestingPagingCollector {
     ProgressRef progressRef = new ProgressRef();
 
     TermQuery query = new TermQuery(new Term("f1", "value"));
-    IterablePaging paging = new IterablePaging(new AtomicBoolean(true), searcher, query,
100, null, null, false, null);
+    IterablePaging paging = new IterablePaging(new AtomicBoolean(true), searcher, query,
100, null, null, false, null,
+        new DeepPagingCache());
     IterablePaging itPaging = paging.skipTo(90).gather(20).totalHits(totalHitsRef).progress(progressRef);
     BlurIterator<ScoreDoc, BlurException> iterator = itPaging.iterator();
     int position = 90;
@@ -83,6 +85,43 @@ public class TestingPagingCollector {
   }
 
   @Test
+  public void testSimpleSearchPagingThroughAll() throws Exception {
+    int length = 13245;
+    IndexReader reader = getReaderFlatScore(length);
+    IndexSearcher searcher = new IndexSearcher(reader);
+
+    TotalHitsRef totalHitsRef = new TotalHitsRef();
+    ProgressRef progressRef = new ProgressRef();
+
+    long start = System.nanoTime();
+    int position = 0;
+    DeepPagingCache deepPagingCache = new DeepPagingCache(10);
+    OUTER: while (true) {
+      MatchAllDocsQuery query = new MatchAllDocsQuery();
+      IterablePaging paging = new IterablePaging(new AtomicBoolean(true), searcher, query,
100, null, null, false,
+          null, deepPagingCache);
+      IterablePaging itPaging = paging.skipTo(position).totalHits(totalHitsRef).progress(progressRef);
+      BlurIterator<ScoreDoc, BlurException> iterator = itPaging.iterator();
+
+      while (iterator.hasNext()) {
+        ScoreDoc sd = iterator.next();
+        assertEquals(position, progressRef.currentHitPosition());
+        System.out.println("time [" + progressRef.queryTime() + "] " + "total hits [" + totalHitsRef.totalHits()
+ "] "
+            + "searches [" + progressRef.searchesPerformed() + "] " + "position [" + progressRef.currentHitPosition()
+            + "] " + "doc id [" + sd.doc + "] " + "score [" + sd.score + "]");
+        position++;
+        if (position % 100 == 0) {
+          continue OUTER;
+        }
+      }
+      break OUTER;
+    }
+    long end = System.nanoTime();
+    assertEquals(length, position);
+    System.out.println("Took [" + (end - start) / 1000000.0 + " ms]");
+  }
+
+  @Test
   public void testSimpleSearchPagingWithSorting() throws Exception {
     IndexReader reader = getReaderFlatScore(13245);
     IndexSearcher searcher = new IndexSearcher(reader);
@@ -93,7 +132,8 @@ public class TestingPagingCollector {
     printHeapSize();
     TermQuery query = new TermQuery(new Term("f1", "value"));
     Sort sort = new Sort(new SortField("index", Type.INT, true));
-    IterablePaging paging = new IterablePaging(new AtomicBoolean(true), searcher, query,
100, null, null, false, sort);
+    IterablePaging paging = new IterablePaging(new AtomicBoolean(true), searcher, query,
100, null, null, false, sort,
+        new DeepPagingCache());
     IterablePaging itPaging = paging.skipTo(90).gather(20).totalHits(totalHitsRef).progress(progressRef);
     BlurIterator<ScoreDoc, BlurException> iterator = itPaging.iterator();
     int position = 90;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b615571e/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java b/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
index b99b304..166b4cf 100644
--- a/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
@@ -45,6 +45,7 @@ public class MetricsConstants {
   public static final String REMOVAL = "Removal";
   public static final String MISS = "Miss";
   public static final String CACHE = "Cache";
+  public static final String DEEP_PAGING_CACHE = "DeepPagingCache";
   public static final String CACHE_POOL = "CachePool";
   public static final String JVM = "JVM";
   public static final String HEAP_USED = "Heap Used";

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b615571e/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index bf5a94d..6936265 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -98,6 +98,7 @@ public class BlurConstants {
   public static final String BLUR_SHARD_THRIFT_ACCEPT_QUEUE_SIZE_PER_THREAD = "blur.shard.thrift.accept.queue.size.per.thread";
   public static final String BLUR_SHARD_DISTRIBUTED_LAYOUT_FACTORY_CLASS = "blur.shard.distributed.layout.factory.class";
   public static final String BLUR_SHARD_WARMUP_DISABLED = "blur.shard.warmup.disabled";
+  public static final String BLUR_SHARD_DEEP_PAGING_CACHE_SIZE = "blur.shard.deep.paging.cache.size";
 
   public static final String BLUR_SHARD_BLOCK_CACHE_V2_READ_CACHE_EXT = "blur.shard.block.cache.v2.read.cache.ext";
   public static final String BLUR_SHARD_BLOCK_CACHE_V2_READ_NOCACHE_EXT = "blur.shard.block.cache.v2.read.nocache.ext";

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b615571e/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 28453e9..023858a 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -199,6 +199,9 @@ blur.shard.queue.max.queue.batch.size=100
 # The maximum number of RowMutations that can exist in the inmemory block queue any point
in time.  NOTE: This is PER SHARD.
 blur.shard.queue.max.inmemory.length=100
 
+# The number of deep paging cache entries kept in memory for faster deep paging.
+blur.shard.deep.paging.cache.size=1000
+
 
 ### Controller Server Configuration
 


Mime
View raw message