incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixed BLUR-152.
Date Fri, 28 Jun 2013 18:53:24 GMT
Updated Branches:
  refs/heads/master 7ef345f8f -> 89a1cc105


Fixed BLUR-152.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/89a1cc10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/89a1cc10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/89a1cc10

Branch: refs/heads/master
Commit: 89a1cc105718843e6dc42d3482056ffebe51335e
Parents: 7ef345f
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Jun 28 14:52:59 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Jun 28 14:52:59 2013 -0400

----------------------------------------------------------------------
 .../manager/indexserver/BlurIndexWarmup.java    | 33 +++-----------------
 .../indexserver/DefaultBlurIndexWarmup.java     | 22 ++++++++++---
 .../indexserver/DistributedIndexServer.java     |  6 +++-
 3 files changed, 28 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89a1cc10/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
index bbc46f4..95bd19a 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
@@ -18,15 +18,14 @@ package org.apache.blur.manager.indexserver;
  */
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.blur.manager.indexserver.DistributedIndexServer.ReleaseReader;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.lucene.index.IndexReader;
 
-
 public abstract class BlurIndexWarmup {
-  
-  
+
   protected long _warmupBandwidthThrottleBytesPerSec;
 
   public BlurIndexWarmup(long warmupBandwidthThrottleBytesPerSec) {
@@ -38,40 +37,18 @@ public abstract class BlurIndexWarmup {
    * ReleaseReader even if an exception occurs.
    * 
    * @param table
-   *          the table name.
-   * @param shard
-   *          the shard name.
-   * @param reader
-   *          thread reader inself.
-   * @param isClosed
-   *          to check if the shard has been migrated to another node.
-   * @param releaseReader
-   *          to release the handle on the reader.
-   * @throws IOException
-   * 
-   */
-  public void warmBlurIndex(String table, String shard, IndexReader reader, AtomicBoolean
isClosed, ReleaseReader releaseReader) throws IOException {
-
-  }
-
-  /**
-   * Once the reader has be warmed up, release() must be called on the
-   * ReleaseReader even if an exception occurs.
-   * 
-   * @param table
    *          the table descriptor.
    * @param shard
    *          the shard name.
    * @param reader
-   *          thread reader inself.
+   *          thread reader itself.
    * @param isClosed
    *          to check if the shard has been migrated to another node.
    * @param releaseReader
    *          to release the handle on the reader.
    * @throws IOException
    */
-  public void warmBlurIndex(TableDescriptor table, String shard, IndexReader reader, AtomicBoolean
isClosed, ReleaseReader releaseReader) throws IOException {
-    warmBlurIndex(table.name, shard, reader, isClosed, releaseReader);
-  }
+  public abstract void warmBlurIndex(TableDescriptor table, String shard, IndexReader reader,
AtomicBoolean isClosed,
+      ReleaseReader releaseReader, AtomicLong pause) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89a1cc10/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
index c0c90cf..38bb500 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
@@ -22,7 +22,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -48,7 +50,7 @@ public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
 
   @Override
   public void warmBlurIndex(final TableDescriptor table, final String shard, IndexReader
reader,
-      AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
+      AtomicBoolean isClosed, ReleaseReader releaseReader, AtomicLong pauseWarmup) throws
IOException {
     LOG.info("Running warmup for reader [{0}]", reader);
     try {
       if (reader instanceof FilterDirectoryReader) {
@@ -60,9 +62,9 @@ public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
       Map<String, List<IndexTracerResult>> sampleIndex = indexWarmup.sampleIndex(reader,
context);
       ColumnPreCache columnPreCache = table.getColumnPreCache();
       if (columnPreCache != null) {
-        warm(reader, columnPreCache.preCacheCols, indexWarmup, sampleIndex, context, isClosed);
+        warm(reader, columnPreCache.preCacheCols, indexWarmup, sampleIndex, context, isClosed,
pauseWarmup);
       } else {
-        warm(reader, getFields(reader), indexWarmup, sampleIndex, context, isClosed);
+        warm(reader, getFields(reader), indexWarmup, sampleIndex, context, isClosed, pauseWarmup);
       }
     } finally {
       releaseReader.release();
@@ -95,8 +97,9 @@ public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
   }
 
   private void warm(IndexReader reader, Iterable<String> preCacheCols, IndexWarmup
indexWarmup,
-      Map<String, List<IndexTracerResult>> sampleIndex, String context, AtomicBoolean
isClosed) {
+      Map<String, List<IndexTracerResult>> sampleIndex, String context, AtomicBoolean
isClosed, AtomicLong pauseWarmup) {
     for (String field : preCacheCols) {
+      maybePause(pauseWarmup);
       try {
         indexWarmup.warm(reader, sampleIndex, field, context);
       } catch (IOException e) {
@@ -107,7 +110,18 @@ public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
         return;
       }
     }
+  }
 
+  private void maybePause(AtomicLong pauseWarmup) {
+    while (pauseWarmup.get() > 0) {
+      synchronized (this) {
+        try {
+          this.wait(TimeUnit.SECONDS.toMillis(5));
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89a1cc10/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 5be0218..aeca8e1 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -138,6 +138,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private int _internalSearchThreads;
   private ExecutorService _warmupExecutor;
   private int _warmupThreads;
+  private final AtomicLong _pauseWarmup = new AtomicLong();
 
   public static interface ReleaseReader {
     void release() throws IOException;
@@ -500,7 +501,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
               // this will allow for closing of index
               searcher.close();
             }
-          });
+          }, _pauseWarmup);
         } catch (Exception e) {
           LOG.error("Unknown error while trying to warmup index [" + index + "]", e);
         }
@@ -516,6 +517,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
       final String shard = s;
       BlurIndex blurIndex = tableIndexes.get(shard);
       if (blurIndex == null) {
+        _pauseWarmup.incrementAndGet();
         LOG.info("Opening missing shard [{0}] from table [{1}]", shard, table);
         Future<BlurIndex> submit = _openerService.submit(new Callable<BlurIndex>()
{
           @Override
@@ -531,6 +533,8 @@ public class DistributedIndexServer extends AbstractIndexServer {
             } catch (Throwable t) {
               _shardStateManager.openingError(table, shard);
               throw new RuntimeException(t);
+            } finally {
+              _pauseWarmup.decrementAndGet();
             }
           }
         });


Mime
View raw message