incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Adding a way to optimze the BlurIndexSimpleWriter.
Date Tue, 31 Dec 2013 03:39:03 GMT
Adding a way to optimze the BlurIndexSimpleWriter.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4f276ffa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4f276ffa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4f276ffa

Branch: refs/heads/apache-blur-0.2
Commit: 4f276ffa392accf50d1c226387c7aede81f42adc
Parents: b0bccbb
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Dec 30 22:38:57 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Dec 30 22:38:57 2013 -0500

----------------------------------------------------------------------
 .../manager/writer/BlurIndexSimpleWriter.java   | 27 +++++++++-
 .../manager/writer/SharedMergeScheduler.java    | 57 +++++++++-----------
 2 files changed, 50 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4f276ffa/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index f27ba44..25b9117 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -72,6 +72,8 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private final ReadWriteLock _lock = new ReentrantReadWriteLock();
   private final Lock _readLock = _lock.readLock();
 
+  private Thread _optimizeThread;
+
   public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
       DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, BlurIndexCloser
indexCloser,
       BlurIndexRefresher refresher, BlurIndexWarmup indexWarmup) throws IOException {
@@ -222,8 +224,29 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   }
 
   @Override
-  public void optimize(int numberOfSegmentsPerShard) throws IOException {
-    throw new RuntimeException("not impl");
+  public synchronized void optimize(final int numberOfSegmentsPerShard) throws IOException
{
+    final String table = _tableContext.getTable();
+    final String shard = _shardContext.getShard();
+    if (_optimizeThread != null || _optimizeThread.isAlive()) {
+      LOG.info("Already running an optimize on table [{0}] shard [{1}]", table, shard);
+      return;
+    }
+    _optimizeThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          waitUntilNotNull(_writer);
+          BlurIndexWriter writer = _writer.get();
+          writer.forceMerge(numberOfSegmentsPerShard, true);
+          writer.commit();
+        } catch (Exception e) {
+          LOG.error("Unknown error during optimize on table [{0}] shard [{1}]", e, table,
shard);
+        }
+      }
+    });
+    _optimizeThread.setDaemon(true);
+    _optimizeThread.setName("Optimize table [" + table + "] shard [" + shard + "]");
+    _optimizeThread.start();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4f276ffa/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
index 5036f37..2314a1f 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
@@ -60,17 +60,13 @@ public class SharedMergeScheduler implements Runnable, Closeable {
   }
 
   private void mergeIndexWriter(IndexWriter writer) {
-    synchronized (_writers) {
-      if (!_writers.contains(writer)) {
-        LOG.debug("Adding writer to merge [{0}]", writer);
-        _writers.add(writer);
-      }
-    }
+    LOG.debug("Adding writer to merge [{0}]", writer);
+    _writers.add(writer);
   }
 
   private void removeWriter(IndexWriter writer) {
-    synchronized (_writers) {
-      _writers.remove(writer);
+    while (_writers.remove(writer)) {
+      // keep looping until all the references are gone
     }
   }
 
@@ -102,17 +98,13 @@ public class SharedMergeScheduler implements Runnable, Closeable {
   public void run() {
     while (_running.get()) {
       try {
-        IndexWriter writer;
-        synchronized (_writers) {
-          writer = _writers.poll();
-        }
+        IndexWriter writer = _writers.poll();
         if (writer == null) {
           synchronized (this) {
             wait(ONE_SECOND);
           }
-        } else if (performMergeWriter(writer)) {
-          // there seems to be more merges to do
-          mergeIndexWriter(writer);
+        } else {
+          performMergeWriter(writer);
         }
       } catch (InterruptedException e) {
         LOG.debug("Merging interrupted, exiting.");
@@ -123,24 +115,25 @@ public class SharedMergeScheduler implements Runnable, Closeable {
     }
   }
 
-  private boolean performMergeWriter(IndexWriter writer) throws IOException {
-    MergePolicy.OneMerge merge = writer.getNextMerge();
-    if (merge == null) {
-      LOG.debug("No merges to run for [{0}]", writer);
-      return false;
-    }
-    long s = System.currentTimeMillis();
-    writer.merge(merge);
-    long e = System.currentTimeMillis();
-    double time = (e - s) / 1000.0;
-    double rate = (merge.totalBytesSize() / 1024 / 1024) / time;
-    if (time > 10) {
-      LOG.info("Merge took [{0} s] to complete at rate of [{1} MB/s]", time, rate);
-    } else {
-      LOG.debug("Merge took [{0} s] to complete at rate of [{1} MB/s]", time, rate);
+  private void performMergeWriter(IndexWriter writer) throws IOException {
+    while (true) {
+      MergePolicy.OneMerge merge = writer.getNextMerge();
+      if (merge == null) {
+        LOG.debug("No merges to run for [{0}]", writer);
+        return;
+      }
+      long s = System.nanoTime();
+      writer.merge(merge);
+      long e = System.nanoTime();
+      double time = (e - s) / 1000000000.0;
+      double rate = (merge.totalBytesSize() / 1000 / 1000) / time;
+      if (time > 10) {
+        LOG.info("Merge took [{0} s] to complete at rate of [{1} MB/s]", time, rate);
+      } else {
+        LOG.debug("Merge took [{0} s] to complete at rate of [{1} MB/s]", time, rate);
+      }
+      _throughputBytes.mark(merge.totalBytesSize());
     }
-    _throughputBytes.mark(merge.totalBytesSize());
-    return true;
   }
 
 }


Mime
View raw message