incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixed BLUR-153
Date Wed, 26 Jun 2013 20:44:36 GMT
Updated Branches:
  refs/heads/master 745397cec -> 506e4ce36


Fixed BLUR-153


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/506e4ce3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/506e4ce3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/506e4ce3

Branch: refs/heads/master
Commit: 506e4ce36c6e8b96b19bbcdf4025f277fa768164
Parents: 745397c
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Jun 26 16:44:10 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Jun 26 16:44:10 2013 -0400

----------------------------------------------------------------------
 .../manager/indexserver/BlurIndexWarmup.java    |   8 +-
 .../indexserver/DefaultBlurIndexWarmup.java     |   8 +-
 .../indexserver/DistributedIndexServer.java     |   2 +-
 .../blur/thrift/ThriftBlurShardServer.java      |  29 ++--
 .../apache/blur/lucene/warmup/IndexWarmup.java  |  36 +++--
 .../blur/lucene/warmup/ThrottledIndexInput.java | 133 +++++++++++++++++++
 .../org/apache/blur/utils/BlurConstants.java    |   1 +
 .../src/main/resources/blur-default.properties  |   1 +
 8 files changed, 196 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/506e4ce3/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
index 8265cc6..bbc46f4 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
@@ -25,6 +25,13 @@ import org.apache.lucene.index.IndexReader;
 
 
 public abstract class BlurIndexWarmup {
+  
+  
+  protected long _warmupBandwidthThrottleBytesPerSec;
+
+  public BlurIndexWarmup(long warmupBandwidthThrottleBytesPerSec) {
+    _warmupBandwidthThrottleBytesPerSec = warmupBandwidthThrottleBytesPerSec;
+  }
 
   /**
    * Once the reader has be warmed up, release() must be called on the
@@ -42,7 +49,6 @@ public abstract class BlurIndexWarmup {
    *          to release the handle on the reader.
    * @throws IOException
    * 
-   * @deprecated
    */
   public void warmBlurIndex(String table, String shard, IndexReader reader, AtomicBoolean
isClosed, ReleaseReader releaseReader) throws IOException {
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/506e4ce3/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
index 2373a6f..c0c90cf 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
@@ -40,6 +40,10 @@ import org.apache.lucene.index.SegmentReader;
 
 public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
 
+  public DefaultBlurIndexWarmup(long warmupBandwidthThrottleBytesPerSec) {
+    super(warmupBandwidthThrottleBytesPerSec);
+  }
+
   private static final Log LOG = LogFactory.getLog(DefaultBlurIndexWarmup.class);
 
   @Override
@@ -51,7 +55,7 @@ public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
         reader = getBase((FilterDirectoryReader) reader);
       }
       int maxSampleSize = 1000;
-      IndexWarmup indexWarmup = new IndexWarmup(isClosed, maxSampleSize);
+      IndexWarmup indexWarmup = new IndexWarmup(isClosed, maxSampleSize, _warmupBandwidthThrottleBytesPerSec);
       String context = table.getName() + "/" + shard;
       Map<String, List<IndexTracerResult>> sampleIndex = indexWarmup.sampleIndex(reader,
context);
       ColumnPreCache columnPreCache = table.getColumnPreCache();
@@ -71,7 +75,7 @@ public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
       field.setAccessible(true);
       return (IndexReader) field.get(reader);
     } catch (Exception e) {
-      LOG.error("Unknown error trying to get base reader from [{0}]",e,reader);
+      LOG.error("Unknown error trying to get base reader from [{0}]", e, reader);
       return reader;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/506e4ce3/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 46033e5..5be0218 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -121,7 +121,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private BlurFilterCache _filterCache;
   private AtomicBoolean _running = new AtomicBoolean();
   private long _safeModeDelay;
-  private BlurIndexWarmup _warmup = new DefaultBlurIndexWarmup();
+  private BlurIndexWarmup _warmup = new DefaultBlurIndexWarmup(1000000);
   private DirectoryReferenceFileGC _gc;
   private WatchChildren _watchOnlineShards;
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/506e4ce3/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 1d89ea2..66a11a2 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -16,7 +16,7 @@ package org.apache.blur.thrift;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER;
+import static org.apache.blur.utils.BlurConstants.*;
 import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
@@ -169,7 +169,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);
 
     int sessionTimeout = configuration.getInt(BLUR_ZOOKEEPER_TIMEOUT, BLUR_ZOOKEEPER_TIMEOUT_DEFAULT);
-    
+
     final ZooKeeper zooKeeper = ZkUtils.newZooKeeper(zkConnectionStr, sessionTimeout);
 
     BlurUtil.setupZookeeper(zooKeeper, configuration.get(BLUR_CLUSTER_NAME));
@@ -228,7 +228,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     int threadCount = configuration.getInt(BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT, 32);
 
     ShardServerEventHandler eventHandler = new ShardServerEventHandler();
-    
+
     final ThriftBlurShardServer server = new ThriftBlurShardServer();
     server.setNodeName(nodeName);
     server.setBindAddress(bindAddress);
@@ -297,14 +297,23 @@ public class ThriftBlurShardServer extends ThriftServer {
 
   private static BlurIndexWarmup getIndexWarmup(BlurConfiguration configuration) {
     String _blurFilterCacheClass = configuration.get(BLUR_SHARD_INDEX_WARMUP_CLASS);
-    if (_blurFilterCacheClass != null) {
-      try {
-        Class<?> clazz = Class.forName(_blurFilterCacheClass);
-        return (BlurIndexWarmup) clazz.newInstance();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
+    if (_blurFilterCacheClass != null && _blurFilterCacheClass.isEmpty()) {
+      if (!_blurFilterCacheClass.equals("org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup"))
{
+        try {
+          Class<?> clazz = Class.forName(_blurFilterCacheClass);
+          return (BlurIndexWarmup) clazz.newInstance();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
       }
     }
-    return new DefaultBlurIndexWarmup();
+    long totalThrottle = configuration.getLong(BLUR_SHARD_INDEX_WARMUP_THROTTLE, 30000000);
+    int totalThreadCount = configuration.getInt(BLUR_SHARD_WARMUP_THREAD_COUNT, 30000000);
+    long warmupBandwidthThrottleBytesPerSec = totalThrottle / totalThreadCount;
+    if (warmupBandwidthThrottleBytesPerSec <= 0) {
+      LOG.warn("Invalid values of either [{0} = {1}] or [{2} = {3}], needs to be greater
then 0",
+          BLUR_SHARD_INDEX_WARMUP_THROTTLE, totalThrottle, BLUR_SHARD_WARMUP_THREAD_COUNT,
totalThreadCount);
+    }
+    return new DefaultBlurIndexWarmup(warmupBandwidthThrottleBytesPerSec);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/506e4ce3/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexWarmup.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexWarmup.java b/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexWarmup.java
index 6e9d0ae..10a962c 100644
--- a/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexWarmup.java
+++ b/blur-store/src/main/java/org/apache/blur/lucene/warmup/IndexWarmup.java
@@ -58,17 +58,24 @@ import org.apache.lucene.store.IndexOutput;
  */
 public class IndexWarmup {
 
+  private static final String SAMPLE_PREFIX = "sample";
   private static final Log LOG = LogFactory.getLog(IndexWarmup.class);
-
-  private static final String SAMPLE_EXT = ".sample";
+  private static final String SAMPLE_EXT = ".spl";
   private static final long _5_SECONDS = TimeUnit.SECONDS.toNanos(5);
+  private static final int DEFAULT_THROTTLE = 2000000;
 
   private final AtomicBoolean _isClosed;
   private final int _maxSampleSize;
+  private final long _maxBytesPerSec;
 
-  public IndexWarmup(AtomicBoolean isClosed, int maxSampleSize) {
+  public IndexWarmup(AtomicBoolean isClosed, int maxSampleSize, long maxBytesPerSec) {
     _isClosed = isClosed;
     _maxSampleSize = maxSampleSize;
+    _maxBytesPerSec = maxBytesPerSec;
+  }
+
+  public IndexWarmup(AtomicBoolean isClosed, int maxSampleSize) {
+    this(isClosed, maxSampleSize, DEFAULT_THROTTLE);
   }
 
   private static ThreadLocal<Boolean> runTrace = new ThreadLocal<Boolean>() {
@@ -137,18 +144,21 @@ public class IndexWarmup {
     }
     long length = endingPosition - startingPosition;
     final long totalLength = length;
-    LOG.info("Context [{3}] warming field [{0}] in file [{1}] has length [{2}]", fieldName,
fileName, length, context);
-    IndexInput input = dir.openInput(fileName, IOContext.READ);
+    IndexInput input = new ThrottledIndexInput(dir.openInput(fileName, IOContext.READ), _maxBytesPerSec);
     input.seek(startingPosition);
     byte[] buf = new byte[8192];
     long start = System.nanoTime();
+    long bytesReadPerPass = 0;
     while (length > 0) {
       long now = System.nanoTime();
       if (start + _5_SECONDS < now) {
+        double seconds = (now - start) / 1000000000.0;
+        double rateMbPerSec = (bytesReadPerPass / seconds) / 1000 / 1000;
         double complete = (((double) totalLength - (double) length) / (double) totalLength)
* 100.0;
-        LOG.info("Context [{3}] warming field [{0}] in file [{1}] is [{2}%] complete", fieldName,
fileName, complete,
-            context);
+        LOG.info("Context [{3}] warming field [{0}] in file [{1}] is [{2}%] complete at rate
of [{4} MB/s]", fieldName,
+            fileName, complete, context, rateMbPerSec);
         start = System.nanoTime();
+        bytesReadPerPass = 0;
         if (_isClosed.get()) {
           LOG.info("Context [{0}] index closed", context);
           return;
@@ -157,7 +167,17 @@ public class IndexWarmup {
       int len = (int) Math.min(length, buf.length);
       input.readBytes(buf, 0, len);
       length -= len;
+      bytesReadPerPass += len;
+    }
+    long now = System.nanoTime();
+    double seconds = (now - start) / 1000000000.0;
+    if (seconds < 1) {
+      seconds = 1;
     }
+    double rateMbPerSec = (bytesReadPerPass / seconds) / 1000 / 1000;
+    LOG.info("Context [{3}] warming field [{0}] in file [{1}] is [{2}%] complete at rate
of [{4} MB/s]", fieldName,
+        fileName, 100, context, rateMbPerSec);
+    input.clone();
   }
 
   private Directory getDirectory(IndexReader reader, String segmentName, String context)
{
@@ -237,7 +257,7 @@ public class IndexWarmup {
         return results;
       }
       IndexTracer tracer = new IndexTracer((TraceableDirectory) directory, _maxSampleSize);
-      String fileName = segmentReader.getSegmentName() + SAMPLE_EXT;
+      String fileName = SAMPLE_PREFIX + segmentReader.getSegmentName() + SAMPLE_EXT;
       List<IndexTracerResult> segmentTraces = new ArrayList<IndexTracerResult>();
       if (directory.fileExists(fileName)) {
         IndexInput input = directory.openInput(fileName, IOContext.READONCE);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/506e4ce3/blur-store/src/main/java/org/apache/blur/lucene/warmup/ThrottledIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/warmup/ThrottledIndexInput.java
b/blur-store/src/main/java/org/apache/blur/lucene/warmup/ThrottledIndexInput.java
new file mode 100644
index 0000000..5e612a1
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/lucene/warmup/ThrottledIndexInput.java
@@ -0,0 +1,133 @@
+package org.apache.blur.lucene.warmup;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexInput;
+
+/**
+ * This class has been derived from the
+ * org.apache.hadoop.tools.util.ThrottledInputStream class found in Hadoop.
+ * 
+ * The ThrottledIndexInput provides bandwidth throttling on a specified
+ * IndexInput. It is implemented as a wrapper on top of another IndexInput
+ * instance. The throttling works by examining the number of bytes read from the
+ * underlying IndexInput from the beginning, and sleep()ing for a time interval
+ * if the byte-transfer is found exceed the specified tolerable maximum. (Thus,
+ * while the read-rate might exceed the maximum for a given short interval, the
+ * average tends towards the specified maximum, overall.)
+ */
+public class ThrottledIndexInput extends IndexInput {
+
+  private static final long SLEEP_DURATION_MS = 50;
+
+  private final IndexInput _rawStream;
+  private final double _maxBytesPerSec;
+  private final long _startTime = System.nanoTime();
+
+  private long _bytesRead = 0;
+  private long _totalSleepTime = 0;
+
+  public ThrottledIndexInput(IndexInput rawStream, long maxBytesPerSec) {
+    super(rawStream.toString());
+    _rawStream = rawStream;
+    _maxBytesPerSec = maxBytesPerSec;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public long getFilePointer() {
+    return _rawStream.getFilePointer();
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void seek(long pos) throws IOException {
+    _rawStream.seek(pos);
+  }
+
+  /** @inheritDoc */
+  @Override
+  public long length() {
+    return _rawStream.length();
+  }
+
+  /** @inheritDoc */
+  @Override
+  public byte readByte() throws IOException {
+    throttle();
+    try {
+      return _rawStream.readByte();
+    } finally {
+      _bytesRead++;
+    }
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void readBytes(byte[] b, int offset, int len) throws IOException {
+    throttle();
+    try {
+      _rawStream.readBytes(b, offset, len);
+    } finally {
+      _bytesRead += len;
+    }
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void close() throws IOException {
+    _rawStream.close();
+  }
+
+  private void throttle() throws IOException {
+    while (getBytesPerSec() > _maxBytesPerSec) {
+      try {
+        Thread.sleep(SLEEP_DURATION_MS);
+        _totalSleepTime += SLEEP_DURATION_MS;
+      } catch (InterruptedException e) {
+        throw new IOException("Thread aborted", e);
+      }
+    }
+  }
+
+  /**
+   * Getter for the number of bytes read from this stream, since creation.
+   * 
+   * @return The number of bytes.
+   */
+  public long getTotalBytesRead() {
+    return _bytesRead;
+  }
+
+  /**
+   * Getter for the read-rate from this stream, since creation. Calculated as
+   * bytesRead/elapsedTimeSinceStart.
+   * 
+   * @return Read rate, in bytes/sec.
+   */
+  public double getBytesPerSec() {
+    double elapsed = (System.nanoTime() - _startTime) / 1000000000;
+    if (elapsed == 0) {
+      return _bytesRead;
+    } else {
+      return _bytesRead / elapsed;
+    }
+  }
+
+  /**
+   * Getter the total time spent in sleep.
+   * 
+   * @return Number of milliseconds spent in sleep.
+   */
+  public long getTotalSleepTime() {
+    return _totalSleepTime;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public String toString() {
+    return "ThrottledIndexInput{" + "bytesRead=" + _bytesRead + ", maxBytesPerSec=" + _maxBytesPerSec
+        + ", bytesPerSec=" + getBytesPerSec() + ", totalSleepTime=" + _totalSleepTime + '}';
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/506e4ce3/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index de2bcf8..5af88e7 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -61,6 +61,7 @@ public class BlurConstants {
   public static final String BLUR_SHARD_DATA_FETCH_THREAD_COUNT = "blur.shard.data.fetch.thread.count";
   public static final String BLUR_SHARD_INTERNAL_SEARCH_THREAD_COUNT = "blur.shard.internal.search.thread.count";
   public static final String BLUR_SHARD_WARMUP_THREAD_COUNT = "blur.shard.warmup.thread.count";
+  public static final String BLUR_SHARD_INDEX_WARMUP_THROTTLE = "blur.shard.index.warmup.throttle";
   public static final String BLUR_MAX_CLAUSE_COUNT = "blur.max.clause.count";
   public static final String BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS = "blur.shard.cache.max.querycache.elements";
   public static final String BLUR_SHARD_OPENER_THREAD_COUNT = "blur.shard.opener.thread.count";

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/506e4ce3/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 9187a05..39b3d7c 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -23,6 +23,7 @@ blur.shard.cache.max.querycache.elements=128
 blur.shard.cache.max.timetolive=60000
 blur.shard.filter.cache.class=org.apache.blur.manager.DefaultBlurFilterCache
 blur.shard.index.warmup.class=org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup
+blur.shard.index.warmup.throttle=30000000
 blur.shard.blockcache.direct.memory.allocation=true
 blur.shard.blockcache.slab.count=-1
 blur.shard.buffercache.1024=8192


Mime
View raw message