incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/3] git commit: Working the field warmer.
Date Sun, 28 Apr 2013 23:49:40 GMT
Working the field warmer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/d6f8d535
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/d6f8d535
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/d6f8d535

Branch: refs/heads/0.1.5
Commit: d6f8d535c8bfc9dcbd9b8f79e794b43e87201a88
Parents: f8f33ae
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Apr 27 17:33:15 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Apr 27 17:33:15 2013 -0400

----------------------------------------------------------------------
 .../indexserver/DefaultBlurIndexWarmup.java        |   68 +++----------
 .../indexserver/DistributedIndexServer.java        |   23 +----
 .../apache/blur/manager/writer/BlurNRTIndex.java   |    1 +
 .../blur/manager/writer/FieldBasedWarmer.java      |   78 +++++++++++++++
 4 files changed, 97 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d6f8d535/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
index 25e76e3..b7a626c 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
@@ -22,65 +22,31 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.indexserver.DistributedIndexServer.ReleaseReader;
+import org.apache.blur.manager.writer.FieldBasedWarmer;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexReaderContext;
 
 public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
 
   private static final Log LOG = LogFactory.getLog(DefaultBlurIndexWarmup.class);
 
   @Override
-  public void warmBlurIndex(final TableDescriptor table, final String shard, IndexReader
reader, AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
-    LOG.warn("Warm up NOT supported yet.");
-    // Once the reader warm-up has been re-implemented, this code will change
-    // accordingly.
-
-    // try {
-    // ColumnPreCache columnPreCache = table.columnPreCache;
-    // List<String> preCacheCols = null;
-    // if (columnPreCache != null) {
-    // preCacheCols = columnPreCache.preCacheCols;
-    // }
-    // if (preCacheCols == null) {
-    // LOG.info("No pre cache defined, precache all fields.");
-    // FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
-    // preCacheCols = new ArrayList<String>();
-    // for (FieldInfo fieldInfo : fieldInfos) {
-    // if (fieldInfo.isIndexed) {
-    // preCacheCols.add(fieldInfo.name);
-    // }
-    // }
-    // preCacheCols.remove(BlurConstants.ROW_ID);
-    // preCacheCols.remove(BlurConstants.RECORD_ID);
-    // preCacheCols.remove(BlurConstants.PRIME_DOC);
-    // preCacheCols.remove(BlurConstants.SUPER);
-    // }
-    //
-    // WarmUpByFieldBounds warmUpByFieldBounds = new WarmUpByFieldBounds();
-    // WarmUpByFieldBoundsStatus status = new WarmUpByFieldBoundsStatus() {
-    // @Override
-    // public void complete(String name, Term start, Term end, long
-    // startPosition, long endPosition, long totalBytesRead, long nanoTime,
-    // AtomicBoolean isClosed) {
-    // double bytesPerNano = totalBytesRead / (double) nanoTime;
-    // double mBytesPerNano = bytesPerNano / 1024 / 1024;
-    // double mBytesPerSecond = mBytesPerNano * 1000000000.0;
-    // if (totalBytesRead > 0) {
-    // LOG.info("Precached field [{0}] in table [{1}] shard [{2}] file [{3}], [{4}] bytes
cached at [{5} MB/s]",
-    // start.field(), table.name, shard, name, totalBytesRead,
-    // mBytesPerSecond);
-    // }
-    // }
-    // };
-    // if (preCacheCols != null) {
-    // for (String field : preCacheCols) {
-    // warmUpByFieldBounds.warmUpByField(isClosed, new Term(field), reader,
-    // status);
-    // }
-    // }
-    // } finally {
-    // releaseReader.release();
-    // }
+  public void warmBlurIndex(final TableDescriptor table, final String shard, IndexReader
reader,
+      AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
+    LOG.info("Runngin warmup for reader [{0}]", reader);
+    try {
+      FieldBasedWarmer warmer = new FieldBasedWarmer(table);
+      for (IndexReaderContext context : reader.getContext().leaves()) {
+        AtomicReaderContext atomicReaderContext = (AtomicReaderContext) context;
+        AtomicReader atomicReader = atomicReaderContext.reader();
+        warmer.warm(atomicReader);
+      }
+    } finally {
+      releaseReader.release();
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d6f8d535/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 8e13047..2ed0c43 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -81,8 +81,6 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -94,7 +92,6 @@ import com.yammer.metrics.core.MetricName;
 
 public class DistributedIndexServer extends AbstractIndexServer {
 
-  private static final String LOGS = "logs";
   private static final Log LOG = LogFactory.getLog(DistributedIndexServer.class);
   private static final long _delay = TimeUnit.SECONDS.toMillis(5);
 
@@ -185,6 +182,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
             _prevOnlineShards = onlineShards;
             _layoutManagers.clear();
             _layoutCache.clear();
+            LOG.info("--------------------CALL--------------------");
             LOG.info("Online shard servers changed, clearing layout managers and cache.");
             if (oldOnlineShards == null) {
               oldOnlineShards = new ArrayList<String>();
@@ -554,7 +552,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private BlurIndex warmUp(BlurIndex index, TableDescriptor table, String shard) throws IOException
{
     final IndexSearcherClosable searcher = index.getIndexReader();
     IndexReader reader = searcher.getIndexReader();
-    warmUpAllSegments(searcher);
     _warmup.warmBlurIndex(table, shard, reader, index.isClosed(), new ReleaseReader() {
       @Override
       public void release() throws IOException {
@@ -562,27 +559,9 @@ public class DistributedIndexServer extends AbstractIndexServer {
         searcher.close();
       }
     });
-
     return index;
   }
 
-  private void warmUpAllSegments(IndexSearcherClosable searcher) throws IOException {
-    LOG.warn("Warm up, stupid impl");
-    Directory directory = searcher.getDirectory();
-    String[] listAll = directory.listAll();
-    byte[] buf = new byte[8192];
-    for (String file : listAll) {
-      LOG.info("Warning up [{0}]", file);
-      IndexInput input = directory.openInput(file, IOContext.READ);
-      long length = input.length();
-      for (long i = 0; i < length; i += buf.length) {
-        int len = (int) Math.min(buf.length, length - i);
-        input.readBytes(buf, 0, len);
-      }
-      input.close();
-    }
-  }
-
   private synchronized Map<String, BlurIndex> openMissingShards(final String table,
Set<String> shardsToServe,
       final Map<String, BlurIndex> tableIndexes) {
     Map<String, Future<BlurIndex>> opening = new HashMap<String, Future<BlurIndex>>();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d6f8d535/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index 6dbe939..c6fabd5 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -76,6 +76,7 @@ public class BlurNRTIndex extends BlurIndex {
     conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
     conf.setSimilarity(_tableContext.getSimilarity());
     conf.setIndexDeletionPolicy(_tableContext.getIndexDeletionPolicy());
+    conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext));
 
     TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
     mergePolicy.setUseCompoundFile(false);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d6f8d535/src/blur-core/src/main/java/org/apache/blur/manager/writer/FieldBasedWarmer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/FieldBasedWarmer.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/FieldBasedWarmer.java
new file mode 100644
index 0000000..043c180
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/FieldBasedWarmer.java
@@ -0,0 +1,78 @@
+package org.apache.blur.manager.writer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.ColumnPreCache;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
+
+public class FieldBasedWarmer extends IndexReaderWarmer {
+
+  private static final Log LOG = LogFactory.getLog(FieldBasedWarmer.class);
+
+  private List<String> preCacheCols;
+
+  public FieldBasedWarmer(ShardContext shardContext) {
+    this(shardContext.getTableContext());
+  }
+
+  public FieldBasedWarmer(TableContext tableContext) {
+    this(tableContext.getDescriptor());
+  }
+
+  public FieldBasedWarmer(TableDescriptor tableDescriptor) {
+    this(tableDescriptor.getColumnPreCache());
+  }
+
+  public FieldBasedWarmer(ColumnPreCache columnPreCache) {
+    this(columnPreCache == null ? null : columnPreCache.getPreCacheCols());
+  }
+
+  public FieldBasedWarmer(List<String> preCacheCols) {
+    this.preCacheCols = preCacheCols;
+  }
+
+  @Override
+  public void warm(AtomicReader reader) throws IOException {
+    if (preCacheCols != null) {
+      warm(reader, preCacheCols);
+    } else {
+      Fields fields = reader.fields();
+      warm(reader, fields);
+    }
+  }
+
+  private void warm(AtomicReader reader, Iterable<String> fieldNames) throws IOException
{
+//    for (String field : fieldNames) {
+//      LOG.debug("Warming field [{0}] in reader [{1}]", field, reader);
+//      
+//      AtomicReaderContext context = reader.getContext();
+//      context.
+//      
+//      Fields fields = reader.fields();
+//      Terms terms = fields.terms(field);
+//      TermsEnum termsEnum = terms.iterator(null);
+//      BytesRef ref = null;
+//      Term term = new Term(field);
+//      while ((ref = termsEnum.next()) != null) {
+//        term.set(field, ref);
+//        DocsAndPositionsEnum termPositionsEnum = reader.termPositionsEnum(term);
+//        if (termPositionsEnum != null) {
+//          while (termPositionsEnum.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
+//            int freq = termPositionsEnum.freq();
+//            for (int i = 0; i < freq; i++) {
+//              termPositionsEnum.nextPosition();
+//            }
+//          }
+//        }
+//      }
+//    }
+  }
+}


Mime
View raw message