incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Fixed BLUR-76.
Date Fri, 14 Jun 2013 18:56:46 GMT
Fixed BLUR-76.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9daa5dc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9daa5dc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9daa5dc2

Branch: refs/heads/master
Commit: 9daa5dc277e63f757206667897af062fb4d71607
Parents: 7a3735e
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Jun 14 14:56:29 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Jun 14 14:56:29 2013 -0400

----------------------------------------------------------------------
 .../indexserver/DefaultBlurIndexWarmup.java     |  53 +++-
 .../indexserver/DistributedIndexServer.java     |   5 +-
 .../blur/manager/writer/BlurIndexReader.java    |   4 +-
 .../blur/manager/writer/BlurNRTIndex.java       |   6 +-
 .../apache/blur/lucene/warmup/IndexTracer.java  | 141 ++++++++++
 .../blur/lucene/warmup/IndexTracerResult.java   | 218 +++++++++++++++
 .../apache/blur/lucene/warmup/IndexWarmup.java  | 269 +++++++++++++++++++
 .../apache/blur/lucene/warmup/NotSupported.java |  28 ++
 .../blur/lucene/warmup/TraceableDirectory.java  | 127 +++++++++
 .../blur/lucene/warmup/TraceableIndexInput.java |  93 +++++++
 10 files changed, 933 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9daa5dc2/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
index 5f18dc0..cf8d1c8 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
@@ -17,17 +17,24 @@ package org.apache.blur.manager.indexserver;
  * limitations under the License.
  */
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.warmup.IndexTracerResult;
+import org.apache.blur.lucene.warmup.IndexWarmup;
 import org.apache.blur.manager.indexserver.DistributedIndexServer.ReleaseReader;
-import org.apache.blur.manager.writer.FieldBasedWarmer;
+import org.apache.blur.thrift.generated.ColumnPreCache;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexReaderContext;
+import org.apache.lucene.index.SegmentReader;
 
 public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
 
@@ -38,15 +45,49 @@ public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
       AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
     LOG.info("Running warmup for reader [{0}]", reader);
     try {
-      FieldBasedWarmer warmer = new FieldBasedWarmer(table);
-      for (IndexReaderContext context : reader.getContext().leaves()) {
-        AtomicReaderContext atomicReaderContext = (AtomicReaderContext) context;
-        AtomicReader atomicReader = atomicReaderContext.reader();
-        warmer.warm(atomicReader);
+      int maxSampleSize = 1000;
+      IndexWarmup indexWarmup = new IndexWarmup(isClosed, maxSampleSize);
+      String context = table.getName() + "/" + shard;
+      Map<String, List<IndexTracerResult>> sampleIndex = indexWarmup.sampleIndex(reader,
context);
+      ColumnPreCache columnPreCache = table.getColumnPreCache();
+      if (columnPreCache != null) {
+        warm(reader, columnPreCache.preCacheCols, indexWarmup, sampleIndex, context, isClosed);
+      } else {
+        warm(reader, getFields(reader), indexWarmup, sampleIndex, context, isClosed);
       }
     } finally {
       releaseReader.release();
     }
   }
 
+  private Iterable<String> getFields(IndexReader reader) throws IOException {
+    Set<String> fields = new TreeSet<String>();
+    for (IndexReaderContext ctext : reader.getContext().leaves()) {
+      AtomicReaderContext atomicReaderContext = (AtomicReaderContext) ctext;
+      AtomicReader atomicReader = atomicReaderContext.reader();
+      if (atomicReader instanceof SegmentReader) {
+        for (String f : atomicReader.fields()) {
+          fields.add(f);
+        }
+      }
+    }
+    return fields;
+  }
+
+  private void warm(IndexReader reader, Iterable<String> preCacheCols, IndexWarmup
indexWarmup,
+      Map<String, List<IndexTracerResult>> sampleIndex, String context, AtomicBoolean
isClosed) {
+    for (String field : preCacheCols) {
+      try {
+        indexWarmup.warm(reader, sampleIndex, field, context);
+      } catch (IOException e) {
+        LOG.error("Context [{0}] unknown error trying to warmup the [{1}] field", e, context,
field);
+      }
+      if (isClosed.get()) {
+        LOG.info("Context [{0}] index closed", context);
+        return;
+      }
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9daa5dc2/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 58393f8..215d1e6 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -51,6 +51,7 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.search.FairSimilarity;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
 import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
+import org.apache.blur.lucene.warmup.TraceableDirectory;
 import org.apache.blur.manager.BlurFilterCache;
 import org.apache.blur.manager.clusterstatus.ClusterStatus;
 import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
@@ -463,10 +464,10 @@ public class DistributedIndexServer extends AbstractIndexServer {
     } else {
       dir = directory;
     }
-
+    
     BlurIndex index;
     if (_clusterStatus.isReadOnly(true, _cluster, table)) {
-      BlurIndexReader reader = new BlurIndexReader(shardContext, directory, _refresher, _indexCloser);
+      BlurIndexReader reader = new BlurIndexReader(shardContext, dir, _refresher, _indexCloser);
       index = reader;
     } else {
       BlurNRTIndex writer = new BlurNRTIndex(shardContext, _mergeScheduler, _closer, dir,
_gc, _searchExecutor);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9daa5dc2/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
index f31fa38..bc4c2ee 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.blur.index.IndexWriter;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.warmup.TraceableDirectory;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
@@ -50,7 +51,8 @@ public class BlurIndexReader extends BlurIndex {
   public BlurIndexReader(ShardContext shardContext, Directory directory, BlurIndexRefresher
refresher,
       BlurIndexCloser closer) throws IOException {
     _tableContext = shardContext.getTableContext();
-    _directory = directory;
+    // This directory allows for warm up by adding tracing ability.
+    _directory = new TraceableDirectory(directory);
     _shardContext = shardContext;
     _refresher = refresher;
     _closer = closer;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9daa5dc2/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index 43310fb..62d2274 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -34,6 +34,7 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
 import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
 import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
+import org.apache.blur.lucene.warmup.TraceableDirectory;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.IndexSearcherClosableNRT;
 import org.apache.blur.server.ShardContext;
@@ -90,8 +91,9 @@ public class BlurNRTIndex extends BlurIndex {
     conf.setMergeScheduler(mergeScheduler);
 
     DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(directory,
gc, closer);
-
-    _writer = new IndexWriter(referenceCounter, conf);
+    // This directory allows for warm up by adding tracing ability.
+    TraceableDirectory dir = new TraceableDirectory(referenceCounter);
+    _writer = new IndexWriter(dir, conf);
     _recorder = new TransactionRecorder(shardContext);
     _recorder.replay(_writer);
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9daa5dc2/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexTracer.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexTracer.java b/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexTracer.java
new file mode 100644
index 0000000..84ce8bb
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexTracer.java
@@ -0,0 +1,141 @@
+package org.apache.blur.lucene.warmup;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.lucene.warmup.IndexTracerResult.FILE_TYPE;
+import org.apache.lucene.index.DocsAndPositionsEnum;
+import org.apache.lucene.index.DocsEnum;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.util.Bits;
+
+public class IndexTracer {
+
+  private boolean _hasOffsets;
+  private boolean _hasPayloads;
+  private boolean _hasPositions;
+  private SegmentReader _segmentReader;
+  private Bits _liveDocs;
+  private IndexTracerResult _result;
+  private TraceableDirectory _traceableDirectory;
+  private final int _maxSampleSize;
+
+  public IndexTracer(TraceableDirectory dir, int maxSampleSize) {
+    _traceableDirectory = dir;
+    _maxSampleSize = maxSampleSize;
+  }
+
+  public void initTrace(SegmentReader segmentReader, String field, boolean hasPositions,
boolean hasPayloads,
+      boolean hasOffsets) {
+    _segmentReader = segmentReader;
+    _hasPositions = hasPositions;
+    _hasPayloads = hasPayloads;
+    _hasOffsets = hasOffsets;
+    _liveDocs = _segmentReader.getLiveDocs();
+    _result = new IndexTracerResult(segmentReader.getSegmentName(), field);
+  }
+
+  public void trace(String name, long filePointer) {
+    int index = name.lastIndexOf('.');
+    String ext = name.substring(index);
+    if (ext.endsWith(".tim")) {
+      if (!_result.isFilePositionCaptured(FILE_TYPE.TIM)) {
+        _result.setPosition(filePointer, name, FILE_TYPE.TIM);
+      }
+    } else if (ext.endsWith(".doc")) {
+      if (!_result.isFilePositionCaptured(FILE_TYPE.DOC)) {
+        _result.setPosition(filePointer, name, FILE_TYPE.DOC);
+      }
+    } else if (ext.endsWith(".pos")) {
+      if (!_result.isFilePositionCaptured(FILE_TYPE.POS)) {
+        _result.setPosition(filePointer, name, FILE_TYPE.POS);
+      }
+    } else {
+      throw new RuntimeException("Not Implemented");
+    }
+  }
+
+  public IndexTracerResult runTrace(Terms terms) throws IOException {
+    IndexWarmup.enableRunTrace();
+    _traceableDirectory.setIndexTracer(this);
+    _traceableDirectory.setTrace(true);
+    try {
+      TermsEnum termsEnum = terms.iterator(null);
+      int sampleCount = 0;
+      while (termsEnum.next() != null) {
+        if (_hasPositions || _hasOffsets || _hasPayloads) {
+          DocsAndPositionsEnum docsAndPositions = termsEnum.docsAndPositions(_liveDocs, null);
+          int nextDoc;
+          do {
+            nextDoc = docsAndPositions.nextDoc();
+            int freq = docsAndPositions.freq();
+            for (int i = 0; i < freq; i++) {
+              docsAndPositions.nextPosition();
+              if (_hasPayloads) {
+                docsAndPositions.getPayload();
+              }
+              if (traceComplete()) {
+                return getResult();
+              }
+            }
+          } while (nextDoc != DocsEnum.NO_MORE_DOCS);
+        } else {
+          DocsEnum docsEnum = termsEnum.docs(_liveDocs, null);
+          int nextDoc;
+          do {
+            nextDoc = docsEnum.nextDoc();
+            if (traceComplete()) {
+              return getResult();
+            }
+          } while (nextDoc != DocsEnum.NO_MORE_DOCS);
+        }
+        sampleCount++;
+        if (sampleCount >= _maxSampleSize) {
+          break;
+        }
+      }
+      return getResult();
+    } finally {
+      _traceableDirectory.setTrace(false);
+      IndexWarmup.disableRunTrace();
+    }
+  }
+
+  private boolean traceComplete() {
+    if (!_result.isFilePositionCaptured(FILE_TYPE.TIM)) {
+      return false;
+    }
+    if (!_result.isFilePositionCaptured(FILE_TYPE.DOC)) {
+      return false;
+    }
+    if (_hasPositions && !_result.isFilePositionCaptured(FILE_TYPE.POS)) {
+      return false;
+    }
+    if (_hasPayloads && !_result.isFilePositionCaptured(FILE_TYPE.PAY)) {
+      return false;
+    }
+    return true;
+  }
+
+  IndexTracerResult getResult() {
+    return _result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9daa5dc2/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexTracerResult.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexTracerResult.java
b/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexTracerResult.java
new file mode 100644
index 0000000..0853f5b
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexTracerResult.java
@@ -0,0 +1,218 @@
+package org.apache.blur.lucene.warmup;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+public class IndexTracerResult {
+
+  public enum FILE_TYPE {
+    TIM, DOC, POS, PAY
+  }
+
+  private boolean _timCaptured;
+  private long _timPosition;
+  private String _timFileName;
+
+  private boolean _docCaptured;
+  private long _docPosition;
+  private String _docFileName;
+
+  private boolean _posCaptured;
+  private long _posPosition;
+  private String _posFileName;
+
+  private boolean _payCaptured;
+  private long _payPosition;
+  private String _payFileName;
+
+  private String _segmentName;
+  private String _field;
+
+  IndexTracerResult() {
+
+  }
+
+  public static IndexTracerResult read(IndexInput input) throws IOException {
+
+    IndexTracerResult result = new IndexTracerResult();
+
+    result._field = input.readString();
+    result._segmentName = input.readString();
+
+    result._timCaptured = readBoolean(input);
+    if (result._timCaptured) {
+      result._timPosition = input.readVLong();
+      result._timFileName = input.readString();
+    }
+
+    result._docCaptured = readBoolean(input);
+    if (result._docCaptured) {
+      result._docPosition = input.readVLong();
+      result._docFileName = input.readString();
+    }
+
+    result._posCaptured = readBoolean(input);
+    if (result._posCaptured) {
+      result._posPosition = input.readVLong();
+      result._posFileName = input.readString();
+    }
+
+    result._payCaptured = readBoolean(input);
+    if (result._payCaptured) {
+      result._payPosition = input.readVLong();
+      result._payFileName = input.readString();
+    }
+    return result;
+  }
+
+  public void write(IndexOutput output) throws IOException {
+    output.writeString(_field);
+    output.writeString(_segmentName);
+
+    writeBoolean(output, _timCaptured);
+    if (_timCaptured) {
+      output.writeVLong(_timPosition);
+      output.writeString(_timFileName);
+    }
+
+    writeBoolean(output, _docCaptured);
+    if (_docCaptured) {
+      output.writeVLong(_docPosition);
+      output.writeString(_docFileName);
+    }
+
+    writeBoolean(output, _posCaptured);
+    if (_posCaptured) {
+      output.writeVLong(_posPosition);
+      output.writeString(_posFileName);
+    }
+
+    writeBoolean(output, _payCaptured);
+    if (_payCaptured) {
+      output.writeVLong(_payPosition);
+      output.writeString(_payFileName);
+    }
+  }
+
+  private static boolean readBoolean(IndexInput input) throws IOException {
+    return input.readVInt() == 1;
+  }
+
+  private static void writeBoolean(IndexOutput output, boolean b) throws IOException {
+    output.writeVInt(b ? 1 : 0);
+  }
+
+  public IndexTracerResult(String segmentName, String field) {
+    _segmentName = segmentName;
+    _field = field;
+  }
+
+  public void setPosition(long position, String fileName, FILE_TYPE type) {
+    switch (type) {
+    case TIM:
+      _timPosition = position;
+      _timCaptured = true;
+      _timFileName = fileName;
+      break;
+    case DOC:
+      _docPosition = position;
+      _docCaptured = true;
+      _docFileName = fileName;
+      break;
+    case PAY:
+      _payPosition = position;
+      _payCaptured = true;
+      _payFileName = fileName;
+      break;
+    case POS:
+      _posPosition = position;
+      _posCaptured = true;
+      _posFileName = fileName;
+      break;
+    default:
+      throw new NotSupported(type);
+    }
+
+  }
+
+  public long getPosition(FILE_TYPE type) {
+    switch (type) {
+    case TIM:
+      return _timPosition;
+    case DOC:
+      return _docPosition;
+    case PAY:
+      return _payPosition;
+    case POS:
+      return _posPosition;
+    default:
+      throw new NotSupported(type);
+    }
+  }
+
+  public boolean isFilePositionCaptured(FILE_TYPE type) {
+    switch (type) {
+    case TIM:
+      return _timCaptured;
+    case DOC:
+      return _docCaptured;
+    case PAY:
+      return _payCaptured;
+    case POS:
+      return _posCaptured;
+    default:
+      throw new NotSupported(type);
+    }
+  }
+
+  public String getFileName(FILE_TYPE type) {
+    switch (type) {
+    case TIM:
+      return _timFileName;
+    case DOC:
+      return _docFileName;
+    case PAY:
+      return _payFileName;
+    case POS:
+      return _posFileName;
+    default:
+      throw new NotSupported(type);
+    }
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  public String getField() {
+    return _field;
+  }
+
+  @Override
+  public String toString() {
+    return "IndexTracerResult [_docCaptured=" + _docCaptured + ", _docPosition=" + _docPosition
+ ", _docFileName="
+        + _docFileName + ", _posCaptured=" + _posCaptured + ", _posPosition=" + _posPosition
+ ", _posFileName="
+        + _posFileName + ", _payCaptured=" + _payCaptured + ", _payPosition=" + _payPosition
+ ", _payFileName="
+        + _payFileName + ", _timCaptured=" + _timCaptured + ", _timPosition=" + _timPosition
+ ", _timFileName="
+        + _timFileName + ", _segmentName=" + _segmentName + ", _field=" + _field + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9daa5dc2/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexWarmup.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexWarmup.java b/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexWarmup.java
new file mode 100644
index 0000000..c616c8e
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexWarmup.java
@@ -0,0 +1,269 @@
+package org.apache.blur.lucene.warmup;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.warmup.IndexTracerResult.FILE_TYPE;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexReaderContext;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * IndexWarmup is used to pre-read portions of the index by field. Usage:<br/>
+ * <br/>
+ * int maxSampleSize = 1000; <br/>
+ * Directory dir = FSDirectory.open(new File("/path/index"));<br/>
+ * dir = new TraceableDirectory(dir);<br/>
+ * DirectoryReader reader = DirectoryReader.open(dir);<br/>
+ * <br/>
+ * IndexWarmup indexWarmup = new IndexWarmup(new AtomicBoolean());<br/>
+ * Map&lt;String, List&lt;IndexTracerResult&gt;&gt; sampleIndex =
+ * indexWarmup.sampleIndex(reader, "");<br/>
+ * indexWarmup.warm(reader, sampleIndex, "uuid", null);<br/>
+ * indexWarmup.warm(reader, sampleIndex, "test", "test");<br/>
+ * indexWarmup.warm(reader, sampleIndex, "nothing", null);<br/>
+ * indexWarmup.warm(reader, sampleIndex, "id2", "tst");<br/>
+ */
+public class IndexWarmup {
+
+  private static final Log LOG = LogFactory.getLog(IndexWarmup.class);
+
+  private static final String SAMPLE_EXT = ".sample";
+  private static final long _5_SECONDS = TimeUnit.SECONDS.toNanos(5);
+
+  private final AtomicBoolean _isClosed;
+  private final int _maxSampleSize;
+
+  public IndexWarmup(AtomicBoolean isClosed, int maxSampleSize) {
+    _isClosed = isClosed;
+    _maxSampleSize = maxSampleSize;
+  }
+
+  private static ThreadLocal<Boolean> runTrace = new ThreadLocal<Boolean>() {
+    @Override
+    protected Boolean initialValue() {
+      return Boolean.FALSE;
+    }
+  };
+
+  public static boolean isRunTrace() {
+    return runTrace.get();
+  }
+
+  public static void enableRunTrace() {
+    runTrace.set(Boolean.TRUE);
+  }
+
+  public static void disableRunTrace() {
+    runTrace.set(Boolean.FALSE);
+  }
+
+  public void warm(IndexReader reader, Map<String, List<IndexTracerResult>> sampleIndex,
String fieldName,
+      String context) throws IOException {
+    for (Entry<String, List<IndexTracerResult>> segment : sampleIndex.entrySet())
{
+      warm(reader, segment.getValue(), fieldName, context);
+    }
+  }
+
+  public void warm(IndexReader reader, List<IndexTracerResult> traces, String fieldName,
String context)
+      throws IOException {
+    int index = find(traces, fieldName);
+    if (index < 0) {
+      // not found
+      return;
+    }
+    IndexTracerResult trace = traces.get(index);
+    for (FILE_TYPE type : FILE_TYPE.values()) {
+      if (trace.isFilePositionCaptured(type)) {
+        long startingPosition = trace.getPosition(type);
+        long endingPosition = Long.MAX_VALUE;
+        int nextIndex = findNextSetTrace(traces, index + 1, type);
+        if (nextIndex >= 0) {
+          IndexTracerResult next = traces.get(nextIndex);
+          endingPosition = next.getPosition(type);
+        }
+        String fileName = trace.getFileName(type);
+        String segmentName = trace.getSegmentName();
+        warm(reader, segmentName, fileName, fieldName, startingPosition, endingPosition,
context);
+      }
+    }
+  }
+
+  private void warm(IndexReader reader, String segmentName, String fileName, String fieldName,
long startingPosition,
+      long endingPosition, String context) throws IOException {
+    Directory dir = getDirectory(reader, segmentName, context);
+    if (dir == null) {
+      LOG.info("Context [{0}] cannot find segment [{1}]", context, segmentName);
+      return;
+    }
+    if (endingPosition == Long.MAX_VALUE) {
+      endingPosition = dir.fileLength(fileName) - 1;
+    }
+    if (_isClosed.get()) {
+      LOG.info("Context [{0}] index closed", context);
+      return;
+    }
+    long length = endingPosition - startingPosition;
+    final long totalLength = length;
+    LOG.info("Context [{3}] warming field [{0}] in file [{1}] has length [{2}]", fieldName,
fileName, length, context);
+    IndexInput input = dir.openInput(fileName, IOContext.READ);
+    input.seek(startingPosition);
+    byte[] buf = new byte[8192];
+    long start = System.nanoTime();
+    while (length > 0) {
+      long now = System.nanoTime();
+      if (start + _5_SECONDS < now) {
+        double complete = (((double) totalLength - (double) length) / (double) totalLength)
* 100.0;
+        LOG.info("Context [{3}] warming field [{0}] in file [{1}] is [{2}%] complete", fieldName,
fileName, complete,
+            context);
+        start = System.nanoTime();
+        if (_isClosed.get()) {
+          LOG.info("Context [{0}] index closed", context);
+          return;
+        }
+      }
+      int len = (int) Math.min(length, buf.length);
+      input.readBytes(buf, 0, len);
+      length -= len;
+    }
+  }
+
+  private Directory getDirectory(IndexReader reader, String segmentName, String context)
{
+    for (IndexReaderContext ctext : reader.getContext().leaves()) {
+      if (_isClosed.get()) {
+        LOG.info("Context [{0}] index closed", context);
+        return null;
+      }
+      AtomicReaderContext atomicReaderContext = (AtomicReaderContext) ctext;
+      AtomicReader atomicReader = atomicReaderContext.reader();
+      if (atomicReader instanceof SegmentReader) {
+        SegmentReader segmentReader = (SegmentReader) atomicReader;
+        if (segmentReader.getSegmentName().equals(segmentName)) {
+          return segmentReader.directory();
+        }
+      }
+    }
+    return null;
+  }
+
+  private int findNextSetTrace(List<IndexTracerResult> traces, int startingIndex, FILE_TYPE
type) {
+    int size = traces.size();
+    for (int i = startingIndex; i < size; i++) {
+      IndexTracerResult trace = traces.get(i);
+      if (trace.isFilePositionCaptured(type)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  private int find(List<IndexTracerResult> traces, String fieldName) {
+    int index = 0;
+    for (IndexTracerResult trace : traces) {
+      if (trace.getField().equals(fieldName)) {
+        return index;
+      }
+      index++;
+    }
+    return -1;
+  }
+
+  public Map<String, List<IndexTracerResult>> sampleIndex(IndexReader reader,
String context) throws IOException {
+    Map<String, List<IndexTracerResult>> results = new HashMap<String, List<IndexTracerResult>>();
+    LOOP: for (IndexReaderContext ctext : reader.getContext().leaves()) {
+      if (_isClosed.get()) {
+        LOG.info("Context [{0}] index closed", context);
+        return null;
+      }
+      AtomicReaderContext atomicReaderContext = (AtomicReaderContext) ctext;
+      AtomicReader atomicReader = atomicReaderContext.reader();
+      if (atomicReader instanceof SegmentReader) {
+        SegmentReader segmentReader = (SegmentReader) atomicReader;
+        Directory directory = segmentReader.directory();
+        if (!(directory instanceof TraceableDirectory)) {
+          LOG.info("Context [{1}] cannot warmup directory [{0}] needs to be a TraceableDirectory.",
directory, context);
+          continue LOOP;
+        }
+        IndexTracer tracer = new IndexTracer((TraceableDirectory) directory, _maxSampleSize);
+        String fileName = segmentReader.getSegmentName() + SAMPLE_EXT;
+        List<IndexTracerResult> segmentTraces = new ArrayList<IndexTracerResult>();
+        if (directory.fileExists(fileName)) {
+          IndexInput input = directory.openInput(fileName, IOContext.READONCE);
+          segmentTraces = read(input);
+          input.close();
+        } else {
+          Fields fields = atomicReader.fields();
+          for (String field : fields) {
+            LOG.info("Context [{1}] sampling field [{0}].", field, context);
+            Terms terms = fields.terms(field);
+            boolean hasOffsets = terms.hasOffsets();
+            boolean hasPayloads = terms.hasPayloads();
+            boolean hasPositions = terms.hasPositions();
+
+            tracer.initTrace(segmentReader, field, hasPositions, hasPayloads, hasOffsets);
+            IndexTracerResult result = tracer.runTrace(terms);
+            segmentTraces.add(result);
+          }
+          if (_isClosed.get()) {
+            LOG.info("Context [{0}] index closed", context);
+            return null;
+          }
+          IndexOutput output = directory.createOutput(fileName, IOContext.DEFAULT);
+          write(segmentTraces, output);
+          output.close();
+        }
+        results.put(segmentReader.getSegmentName(), segmentTraces);
+      }
+    }
+    return results;
+  }
+
+  private List<IndexTracerResult> read(IndexInput input) throws IOException {
+    int count = input.readVInt();
+    List<IndexTracerResult> results = new ArrayList<IndexTracerResult>(count);
+    for (int i = 0; i < count; i++) {
+      results.add(IndexTracerResult.read(input));
+    }
+    return results;
+  }
+
+  private void write(List<IndexTracerResult> segmentTraces, IndexOutput output) throws
IOException {
+    output.writeVInt(segmentTraces.size());
+    for (IndexTracerResult r : segmentTraces) {
+      r.write(output);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9daa5dc2/blur-store/src/main/java/org/apache/blur/lucene/warmup/NotSupported.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/warmup/NotSupported.java b/blur-store/src/main/java/org/apache/blur/lucene/warmup/NotSupported.java
new file mode 100644
index 0000000..a9e2f0e
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/lucene/warmup/NotSupported.java
@@ -0,0 +1,28 @@
+package org.apache.blur.lucene.warmup;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.blur.lucene.warmup.IndexTracerResult.FILE_TYPE;
+
+public class NotSupported extends RuntimeException {
+
+  private static final long serialVersionUID = 5988131795588013735L;
+
+  public NotSupported(FILE_TYPE type) {
+    super("FILE_TYPE of [" + type.name() + "] not supported");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9daa5dc2/blur-store/src/main/java/org/apache/blur/lucene/warmup/TraceableDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/warmup/TraceableDirectory.java
b/blur-store/src/main/java/org/apache/blur/lucene/warmup/TraceableDirectory.java
new file mode 100644
index 0000000..638d61d
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/lucene/warmup/TraceableDirectory.java
@@ -0,0 +1,127 @@
+package org.apache.blur.lucene.warmup;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.blur.store.hdfs.DirectoryDecorator;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class TraceableDirectory extends Directory implements DirectoryDecorator {
+
+  private final Directory _dir;
+  private boolean _trace = false;
+  private IndexTracer _indexTracer;
+
+  public TraceableDirectory(Directory dir) {
+    _dir = dir;
+  }
+
+  public void trace(String name, long filePointer) {
+    if (_indexTracer != null) {
+      _indexTracer.trace(name, filePointer);
+    }
+  }
+
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return new TraceableIndexInput(this, name, context, _dir.openInput(name, context));
+  }
+
+  public String[] listAll() throws IOException {
+    return _dir.listAll();
+  }
+
+  public boolean fileExists(String name) throws IOException {
+    return _dir.fileExists(name);
+  }
+
+  public void deleteFile(String name) throws IOException {
+    _dir.deleteFile(name);
+  }
+
+  public long fileLength(String name) throws IOException {
+    return _dir.fileLength(name);
+  }
+
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return _dir.createOutput(name, context);
+  }
+
+  public void sync(Collection<String> names) throws IOException {
+    _dir.sync(names);
+  }
+
+  public void close() throws IOException {
+    _dir.close();
+  }
+
+  public boolean isTrace() {
+    return _trace;
+  }
+
+  public void setTrace(boolean trace) {
+    _trace = trace;
+  }
+
+  public void setIndexTracer(IndexTracer indexTracer) {
+    _indexTracer = indexTracer;
+  }
+
+  public Lock makeLock(String name) {
+    return _dir.makeLock(name);
+  }
+
+  public void clearLock(String name) throws IOException {
+    _dir.clearLock(name);
+  }
+
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _dir.setLockFactory(lockFactory);
+  }
+
+  public LockFactory getLockFactory() {
+    return _dir.getLockFactory();
+  }
+
+  public String getLockID() {
+    return _dir.getLockID();
+  }
+
+  public String toString() {
+    return _dir.toString();
+  }
+
+  public void copy(Directory to, String src, String dest, IOContext context) throws IOException
{
+    _dir.copy(to, src, dest, context);
+  }
+
+  public IndexInputSlicer createSlicer(String name, IOContext context) throws IOException
{
+    return _dir.createSlicer(name, context);
+  }
+
+  @Override
+  public Directory getOriginalDirectory() {
+    return _dir;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9daa5dc2/blur-store/src/main/java/org/apache/blur/lucene/warmup/TraceableIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/warmup/TraceableIndexInput.java
b/blur-store/src/main/java/org/apache/blur/lucene/warmup/TraceableIndexInput.java
new file mode 100644
index 0000000..a62062d
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/lucene/warmup/TraceableIndexInput.java
@@ -0,0 +1,93 @@
+package org.apache.blur.lucene.warmup;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+
+public class TraceableIndexInput extends IndexInput {
+
+  private final TraceableDirectory _traceableDirectory;
+  private final String _name;
+  private IndexInput _input;
+  private boolean _traceOn;
+
+  public TraceableIndexInput(TraceableDirectory traceableDirectory, String name, IOContext
context, IndexInput input) {
+    super(name);
+    _traceableDirectory = traceableDirectory;
+    _input = input;
+    _traceOn = _traceableDirectory.isTrace();
+    _name = name;
+  }
+
+  @Override
+  public void close() throws IOException {
+    _input.close();
+  }
+
+  @Override
+  public long getFilePointer() {
+    return _input.getFilePointer();
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    _input.seek(pos);
+  }
+
+  @Override
+  public long length() {
+    return _input.length();
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    trace();
+    return _input.readByte();
+  }
+
+  private void trace() {
+    if (_traceOn) {
+      if (IndexWarmup.isRunTrace()) {
+        _traceableDirectory.trace(_name, _input.getFilePointer());
+      }
+    }
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len) throws IOException {
+    trace();
+    _input.readBytes(b, offset, len);
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException
{
+    trace();
+    _input.readBytes(b, offset, len, useBuffer);
+  }
+
+  @Override
+  public IndexInput clone() {
+    TraceableIndexInput clone = (TraceableIndexInput) super.clone();
+    clone._input = _input.clone();
+    clone._traceOn = _traceableDirectory.isTrace();
+    return clone;
+  }
+
+}


Mime
View raw message