incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Performance enhancements, changed the merge scheduler to be an instance based scheduler instead of one per writer.
Date Tue, 26 Feb 2013 04:13:03 GMT
Performance enhancements, changed the merge scheduler to be an instance based scheduler instead
of one per writer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/61fa200f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/61fa200f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/61fa200f

Branch: refs/heads/0.2-dev
Commit: 61fa200f8076422acdf4258386d01ba774d0c8b4
Parents: b2676d1
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Feb 25 23:10:52 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Feb 25 23:10:52 2013 -0500

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java        |    5 +-
 .../apache/blur/manager/writer/BlurNRTIndex.java   |    9 ++
 .../blur/manager/writer/SharedMergeScheduler.java  |   90 +++++++++++++++
 .../blur/manager/writer/TransactionRecorder.java   |    4 +-
 .../java/org/apache/blur/index/IndexWriter.java    |   26 ++++
 .../org/apache/blur/store/hdfs/HdfsDirectory.java  |   19 ++-
 .../java/org/apache/blur/concurrent/Executors.java |    9 ++-
 7 files changed, 151 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/61fa200f/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index c5098f7..1771960 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -51,11 +51,11 @@ import org.apache.blur.manager.writer.BlurIndexCloser;
 import org.apache.blur.manager.writer.BlurIndexReader;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
 import org.apache.blur.manager.writer.BlurNRTIndex;
+import org.apache.blur.manager.writer.SharedMergeScheduler;
 import org.apache.blur.metrics.BlurMetrics;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.server.ZooKeeperTypeManager;
-import org.apache.blur.store.blockcache.BlockDirectory;
 import org.apache.blur.store.blockcache.Cache;
 import org.apache.blur.store.hdfs.BlurLockFactory;
 import org.apache.blur.thrift.generated.TableDescriptor;
@@ -112,6 +112,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private BlurIndexWarmup _warmup = new DefaultBlurIndexWarmup();
   private DirectoryReferenceFileGC _gc;
   private WatchChildren _watchOnlineShards;
+  private SharedMergeScheduler _mergeScheduler;
 
   public static interface ReleaseReader {
     void release() throws IOException;
@@ -124,6 +125,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
     _closer.init();
     _gc = new DirectoryReferenceFileGC();
     _gc.init();
+    _mergeScheduler = new SharedMergeScheduler();
     setupFlushCacheTimer();
     String lockPath = BlurUtil.lockForSafeMode(_zookeeper, getNodeName(), _cluster);
     try {
@@ -476,6 +478,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
       writer.setDirectory(dir);
       writer.setCloser(_closer);
       writer.setGc(_gc);
+      writer.setMergeScheduler(_mergeScheduler);
       writer.init();
       index = writer;
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/61fa200f/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index b00a688..f43d802 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -69,6 +69,11 @@ public class BlurNRTIndex extends BlurIndex {
   private DirectoryReferenceFileGC _gc;
   private TableContext tableContext;
   private ShardContext shardContext;
+  private SharedMergeScheduler mergeScheduler;
+
+  public void setMergeScheduler(SharedMergeScheduler mergeScheduler) {
+    this.mergeScheduler = mergeScheduler;
+  }
 
   // created
   private TransactionRecorder _recorder;
@@ -86,6 +91,7 @@ public class BlurNRTIndex extends BlurIndex {
     conf.setCodec(new AppendingCodec());
     TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
     mergePolicy.setUseCompoundFile(false);
+    conf.setMergeScheduler(mergeScheduler);
     DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(_directory,
_gc);
     _writer = new IndexWriter(referenceCounter, conf);
     _recorder = new TransactionRecorder();
@@ -199,8 +205,11 @@ public class BlurNRTIndex extends BlurIndex {
   private void swap() {
     IndexSearcher searcher = _nrtManager.acquire();
     IndexReader indexReader = searcher.getIndexReader();
+    int numberOfLeaves = indexReader.leaves().size();
     IndexReader oldIndexReader = _indexRef.getAndSet(indexReader);
+    int oldNumberOfLeaves = oldIndexReader.leaves().size();
     _closer.close(oldIndexReader);
+    LOG.debug("Old index version had [{1}] leaves new version has [{2}] leaves, for directory
[{0}].", indexReader, oldNumberOfLeaves, numberOfLeaves);
   }
 
   public void setDirectory(Directory directory) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/61fa200f/src/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
new file mode 100644
index 0000000..b0e2b38
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
@@ -0,0 +1,90 @@
+package org.apache.blur.manager.writer;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.MergeScheduler;
+
+public class SharedMergeScheduler extends MergeScheduler implements Runnable {
+
+  private static final Log LOG = LogFactory.getLog(SharedMergeScheduler.class);
+
+  private static final long ONE_SECOND = 1000;
+
+  private BlockingQueue<IndexWriter> _writers = new LinkedBlockingQueue<IndexWriter>();
+  private AtomicBoolean _running = new AtomicBoolean(true);
+  private ExecutorService service;
+
+  public SharedMergeScheduler() {
+    int threads = 3;
+    service = Executors.newThreadPool("sharedMergeScheduler", threads, false);
+    for (int i = 0; i < threads; i++) {
+      service.submit(this);
+    }
+  }
+
+  @Override
+  public void merge(IndexWriter writer) throws IOException {
+    synchronized (_writers) {
+      if (!_writers.contains(writer)) {
+        LOG.debug("Adding writer to merge [{0}]", writer);
+        _writers.add(writer);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    _running.set(false);
+    service.shutdownNow();
+  }
+
+  @Override
+  public void run() {
+    while (_running.get()) {
+      try {
+        IndexWriter writer;
+        synchronized (_writers) {
+          writer = _writers.poll();
+        }
+        if (writer == null) {
+          synchronized (this) {
+            wait(ONE_SECOND);
+          }
+        } else if (mergeWriter(writer)) {
+          // there seems to be more merges to do
+          merge(writer);
+        }
+      } catch (InterruptedException e) {
+        LOG.info("Merging interrupted, exiting.");
+        return;
+      } catch (IOException e) {
+        LOG.error("Unknown IOException", e);
+      }
+    }
+  }
+
+  private boolean mergeWriter(IndexWriter writer) throws IOException {
+    MergePolicy.OneMerge merge = writer.getNextMerge();
+    if (merge == null) {
+      LOG.debug("No merges to run for [{0}]", writer);
+      return false;
+    }
+    long s = System.currentTimeMillis();
+    writer.merge(merge);
+    long e = System.currentTimeMillis();
+    double time = (e - s) / 1000.0;
+    double rate = (merge.totalBytesSize() / 1024 / 1024) / time;
+    LOG.info("Merge took [{0} s] to complete at rate of [{1} MB/s]", time, rate);
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/61fa200f/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
index f05fbe8..fbfcade 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
@@ -284,10 +284,10 @@ public class TransactionRecorder {
       long s = System.nanoTime();
       writer.commit();
       long m = System.nanoTime();
-      LOG.info("Commit took [{0}] for [{1}]", (m - s) / 1000000.0, writer);
+      LOG.info("Commit took [{0} ms] for [{1}]", (m - s) / 1000000.0, writer);
       rollLog();
       long e = System.nanoTime();
-      LOG.info("Log roller took [{0}] for [{1}]", (e - m) / 1000000.0, writer);
+      LOG.info("Log roller took [{0} ms] for [{1}]", (e - m) / 1000000.0, writer);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/61fa200f/src/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java b/src/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java
index 2842514..4080932 100644
--- a/src/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java
+++ b/src/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java
@@ -60,4 +60,30 @@ public class IndexWriter extends org.apache.lucene.index.IndexWriter {
     }
   }
 
+  /**
+   * This seems a little iffy, but basically only the writer instance itself can
+   * equal itself.
+   */
+  @Override
+  public boolean equals(Object obj) {
+    if (super.equals(obj)) {
+      return true;
+    } else if (obj == null) {
+      return false;
+    } else if (obj == this) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "IndexWriter with directory [" + getDirectory() + "]";
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/61fa200f/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 366e1c7..1c398f1 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -57,13 +57,13 @@ public class HdfsDirectory extends Directory {
       @Override
       public void run() {
         while (true) {
-          LOG.info("Delete Counter [" + deleteCounter + "]");
-          LOG.info("Exists Counter [" + existsCounter + "]");
-          LOG.info("File Status Counter [" + fileStatusCounter + "]");
-          LOG.info("Rename Counter [" + renameCounter + "]");
-          LOG.info("List Counter [" + listCounter + "]");
-          LOG.info("Create Counter [" + createCounter + "]");
-          LOG.info("IsFile Counter [" + isFileCounter + "]");
+          LOG.debug("Delete Counter [" + deleteCounter + "]");
+          LOG.debug("Exists Counter [" + existsCounter + "]");
+          LOG.debug("File Status Counter [" + fileStatusCounter + "]");
+          LOG.debug("Rename Counter [" + renameCounter + "]");
+          LOG.debug("List Counter [" + listCounter + "]");
+          LOG.debug("Create Counter [" + createCounter + "]");
+          LOG.debug("IsFile Counter [" + isFileCounter + "]");
           try {
             Thread.sleep(5000);
           } catch (InterruptedException e) {
@@ -86,6 +86,11 @@ public class HdfsDirectory extends Directory {
     setLockFactory(NoLockFactory.getNoLockFactory());
   }
 
+  @Override
+  public String toString() {
+    return "HdfsDirectory path=[" + path + "]";
+  }
+
   public static class HdfsIndexInput extends ReusedBufferedIndexInput {
 
     private final long len;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/61fa200f/src/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/src/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java b/src/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
index 47bba75..f6b8fb2 100644
--- a/src/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
+++ b/src/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
@@ -26,9 +26,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class Executors {
 
   public static ExecutorService newThreadPool(String prefix, int threadCount) {
+    return newThreadPool(prefix, threadCount, true);
+  }
+
+  public static ExecutorService newThreadPool(String prefix, int threadCount, boolean watch)
{
     ThreadPoolExecutor executorService = new ThreadPoolExecutor(threadCount, threadCount,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new BlurThreadFactory(prefix));
     executorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
-    return ThreadWatcher.instance().watch(executorService);
+    if (watch) {
+      return ThreadWatcher.instance().watch(executorService);
+    }
+    return executorService;
   }
 
   public static ExecutorService newSingleThreadExecutor(String prefix) {


Mime
View raw message