incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Various small fixes and adding automatic sequential read detection in the hdfs index input.
Date Tue, 24 Mar 2015 13:45:45 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 74c6928d2 -> 4302faea0


Various small fixes and adding automatic sequential read detection in the hdfs index input.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4302faea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4302faea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4302faea

Branch: refs/heads/master
Commit: 4302faea0122bd9379e2a564f912c3b7185975f6
Parents: 74c6928
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Mar 24 09:45:33 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Mar 24 09:45:33 2015 -0400

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java     |  10 +-
 .../manager/writer/SharedMergeScheduler.java    | 114 +++++++++++++++--
 .../apache/blur/server/cache/ThriftCache.java   |   2 +-
 .../java/org/apache/blur/thrift/TableAdmin.java |   2 +-
 .../blur/thrift/ThriftBlurShardServer.java      |   4 +-
 .../blur/store/blockcache_v2/BaseCache.java     |   5 +-
 .../apache/blur/store/hdfs/HdfsDirectory.java   |  47 +++++--
 .../apache/blur/store/hdfs/HdfsIndexInput.java  | 121 ++++++++++---------
 .../blur/store/hdfs/SequentialReadControl.java  |  92 ++++++++++++++
 .../org/apache/blur/utils/BlurConstants.java    |   5 +
 .../src/main/resources/blur-default.properties  |  12 ++
 11 files changed, 332 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index efeb051..2f9ccba 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -53,6 +53,7 @@ import org.apache.blur.server.cache.ThriftCache;
 import org.apache.blur.store.BlockCacheDirectoryFactory;
 import org.apache.blur.store.hdfs.BlurLockFactory;
 import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.store.hdfs.SequentialReadControl;
 import org.apache.blur.store.hdfs_v2.FastHdfsKeyValueDirectory;
 import org.apache.blur.store.hdfs_v2.JoinDirectory;
 import org.apache.blur.thrift.generated.ShardState;
@@ -119,15 +120,17 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
   private final Timer _indexImporterTimer;
   private final Timer _indexBulkTimer;
   private final ThriftCache _thriftCache;
+  private final SequentialReadControl _sequentialReadControl;
 
   public DistributedIndexServer(Configuration configuration, ZooKeeper zookeeper, ClusterStatus
clusterStatus,
       BlurFilterCache filterCache, BlockCacheDirectoryFactory blockCacheDirectoryFactory,
       DistributedLayoutFactory distributedLayoutFactory, String cluster, String nodeName,
long safeModeDelay,
       int shardOpenerThreadCount, int maxMergeThreads, int internalSearchThreads,
       int minimumNumberOfNodesBeforeExitingSafeMode, Timer hdfsKeyValueTimer, Timer indexImporterTimer,
-      long smallMergeThreshold, Timer indexBulkTimer, ThriftCache thriftCache) throws KeeperException,
-      InterruptedException {
+      long smallMergeThreshold, Timer indexBulkTimer, ThriftCache thriftCache,
+      SequentialReadControl sequentialReadControl) throws KeeperException, InterruptedException
{
     super(clusterStatus, configuration, nodeName, cluster);
+    _sequentialReadControl = sequentialReadControl;
     _indexImporterTimer = indexImporterTimer;
     _indexBulkTimer = indexBulkTimer;
     _hdfsKeyValueTimer = hdfsKeyValueTimer;
@@ -503,8 +506,7 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
     Path hdfsDirPath = new Path(tablePath, shard);
 
     BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsDirPath, _nodeName,
BlurUtil.getPid());
-
-    HdfsDirectory longTermStorage = new HdfsDirectory(_configuration, hdfsDirPath);
+    HdfsDirectory longTermStorage = new HdfsDirectory(_configuration, hdfsDirPath, _sequentialReadControl);
     longTermStorage.setLockFactory(lockFactory);
 
     Directory directory;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
index bc97dbc..d5e78f3 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
@@ -23,8 +23,12 @@ import static org.apache.blur.utils.BlurConstants.SHARED_MERGE_SCHEDULER_PREFIX;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +41,9 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.MergePolicy;
 import org.apache.lucene.index.MergePolicy.OneMerge;
 import org.apache.lucene.index.MergeScheduler;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.store.Directory;
 
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Gauge;
@@ -88,13 +95,16 @@ public class SharedMergeScheduler implements Closeable {
     }
 
     public void merge() throws IOException {
-      long s = System.nanoTime();
-      _writer.merge(_merge);
-      long e = System.nanoTime();
-      double time = (e - s) / 1000000000.0;
-      double rate = (_size / 1000.0 / 1000.0) / time;
-      LOG.info("Merge took [{0} s] to complete at rate of [{1} MB/s], input bytes [{2}],
segments merged {3}", time,
-          rate, _size, _merge.segments);
+      MergeStatus mergeStatus = new MergeStatus(_merge, _writer.getDirectory(), _size, _merge.segments);
+      // Trace.setupTrace(BlurConstants.SHARED_MERGE_SCHEDULER_PREFIX + "-" +
+      // System.nanoTime());
+      startWatching(mergeStatus);
+      try {
+        _writer.merge(_merge);
+      } finally {
+        stopWatching(mergeStatus);
+        // Trace.tearDownTrace();
+      }
       _throughputBytes.mark(_size);
     }
 
@@ -107,10 +117,63 @@ public class SharedMergeScheduler implements Closeable {
     }
   }
 
+  static class MergeStatus {
+
+    final String _id;
+    final Directory _directory;
+    final long _start;
+    final long _size;
+    final OneMerge _oneMerge;
+    final List<SegmentInfoPerCommit> _segments;
+
+    MergeStatus(OneMerge oneMerge, Directory directory, long size, List<SegmentInfoPerCommit>
segments) {
+      _id = UUID.randomUUID().toString();
+      _directory = directory;
+      _start = System.nanoTime();
+      _size = size;
+      _oneMerge = oneMerge;
+      _segments = segments;
+    }
+
+    void finalReport() throws IOException {
+      long e = System.nanoTime();
+      double time = (e - _start) / 1000000000.0;
+      double rate = (_size / 1000.0 / 1000.0) / time;
+      SegmentInfo segmentInfo = getSegmentInfo(_oneMerge);
+      long segmentSize = getSegmentSize(segmentInfo, _directory);
+      LOG.info(
+          "Merge took [{0} s] to complete at rate of [{1} MB/s], input bytes [{2}], output
bytes [{4}], segments merged {3}",
+          time, rate, _size, _segments, segmentSize);
+    }
+
+    void report() throws IOException {
+      long e = System.nanoTime();
+      double time = (e - _start) / 1000000000.0;
+      SegmentInfo segmentInfo = getSegmentInfo(_oneMerge);
+      long segmentSize = getSegmentSize(segmentInfo, _directory);
+      double rate = (segmentSize / 1000.0 / 1000.0) / time;
+      LOG.info(
+          "Merge running for [{0} s] at rate of [{1} MB/s], input bytes [{2}], output bytes
[{4}], segments being merged {3}",
+          time, rate, _size, _segments, segmentSize);
+    }
+
+  }
+
   public SharedMergeScheduler(int threads) {
     this(threads, 128 * 1000 * 1000);
   }
 
+  private static ConcurrentMap<String, MergeStatus> _mergeStatusMap = new ConcurrentHashMap<String,
MergeStatus>();
+
+  protected static void stopWatching(MergeStatus mergeStatus) throws IOException {
+    MergeStatus status = _mergeStatusMap.remove(mergeStatus._id);
+    status.finalReport();
+  }
+
+  protected static void startWatching(MergeStatus mergeStatus) {
+    _mergeStatusMap.put(mergeStatus._id, mergeStatus);
+  }
+
   public SharedMergeScheduler(int threads, long smallMergeThreshold) {
     MetricName mergeSmallQueueDepth = new MetricName(ORG_APACHE_BLUR, LUCENE, SMALL_QUEUE_DEPTH);
     MetricName mergeSmallQueueDepthInBytes = new MetricName(ORG_APACHE_BLUR, LUCENE, SMALL_QUEUE_DEPTH_IN_BYTES);
@@ -250,4 +313,41 @@ public class SharedMergeScheduler implements Closeable {
     _largeMergeService.shutdownNow();
   }
 
+  protected static long getSegmentSize(SegmentInfo newSegmentInfo, Directory directory) throws
IOException {
+    if (newSegmentInfo == null) {
+      return -1L;
+    }
+    String prefix = newSegmentInfo.name;
+    long total = 0;
+    for (String name : directory.listAll()) {
+      if (name.startsWith(prefix)) {
+        total += directory.fileLength(name);
+      }
+    }
+    return total;
+  }
+
+  protected static SegmentInfo getSegmentInfo(OneMerge oneMerge) {
+    Object segmentInfoPerCommit = getFieldObject(oneMerge, "info");
+    if (segmentInfoPerCommit == null) {
+      return null;
+    }
+    return (SegmentInfo) getFieldObject(segmentInfoPerCommit, "info");
+  }
+
+  protected static Object getFieldObject(Object o, String fieldName) {
+    try {
+      Field field = o.getClass().getDeclaredField(fieldName);
+      field.setAccessible(true);
+      return field.get(o);
+    } catch (NoSuchFieldException e) {
+      return null;
+    } catch (SecurityException e) {
+      return null;
+    } catch (IllegalArgumentException e) {
+      return null;
+    } catch (IllegalAccessException e) {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-core/src/main/java/org/apache/blur/server/cache/ThriftCache.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/cache/ThriftCache.java b/blur-core/src/main/java/org/apache/blur/server/cache/ThriftCache.java
index ca7af7b..a1beb03 100644
--- a/blur-core/src/main/java/org/apache/blur/server/cache/ThriftCache.java
+++ b/blur-core/src/main/java/org/apache/blur/server/cache/ThriftCache.java
@@ -214,7 +214,7 @@ public class ThriftCache {
       return null;
     }
     if (!(attributes instanceof TreeMap)) {
-      attributes = new TreeMap<String, String>();
+      attributes = new TreeMap<String, String>(attributes);
     }
     Map<String, String> internalInstance = _attributeKeys.get(attributes);
     if (internalInstance == null) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
index 2dd7f53..f2c29ba 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
@@ -625,7 +625,7 @@ public abstract class TableAdmin implements Iface {
     }
     org.apache.log4j.Level current = logger.getLevel();
     org.apache.log4j.Level newLevel = getLevel(level);
-    LOG.info("Changing Logger [{0}] from logging level [{1}] to [{2}]", logger, current,
newLevel);
+    LOG.info("Changing Logger [{0}] from logging level [{1}] to [{2}]", logger.getName(),
current, newLevel);
     logger.setLevel(newLevel);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index ddbd1d1..8a6fd55 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -101,6 +101,7 @@ import org.apache.blur.store.BlockCacheDirectoryFactory;
 import org.apache.blur.store.BlockCacheDirectoryFactoryV1;
 import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
 import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.store.hdfs.SequentialReadControl;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
 import org.apache.blur.thirdparty.thrift_0_9_0.server.TServlet;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TServerTransport;
@@ -223,10 +224,11 @@ public class ThriftBlurShardServer extends ThriftServer {
     final Timer indexImporterTimer = new Timer("IndexImporter", true);
     final Timer indexBulkTimer = new Timer("BulkIndex", true);
     long smallMergeThreshold = configuration.getLong(BLUR_SHARD_SMALL_MERGE_THRESHOLD, 128
* 1000 * 1000);
+    SequentialReadControl sequentialReadControl = new SequentialReadControl(configuration);
     final DistributedIndexServer indexServer = new DistributedIndexServer(config, zooKeeper,
clusterStatus,
         filterCache, blockCacheDirectoryFactory, distributedLayoutFactory, cluster, nodeName,
safeModeDelay,
         shardOpenerThreadCount, maxMergeThreads, internalSearchThreads, minimumNumberOfNodesBeforeExitingSafeMode,
-        hdfsKeyValueTimer, indexImporterTimer, smallMergeThreshold, indexBulkTimer, thriftCache);
+        hdfsKeyValueTimer, indexImporterTimer, smallMergeThreshold, indexBulkTimer, thriftCache,
sequentialReadControl);
 
     BooleanQuery.setMaxClauseCount(configuration.getInt(BLUR_MAX_CLAUSE_COUNT, 1024));
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
index b532759..861482c 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
@@ -71,7 +71,7 @@ public class BaseCache extends Cache implements Closeable {
     }
   }
 
-  class BaseCacheWeigher implements Weigher<CacheValue> {
+  protected static class BaseCacheWeigher implements Weigher<CacheValue> {
     @Override
     public int weightOf(CacheValue value) {
       try {
@@ -267,6 +267,7 @@ public class BaseCache extends Cache implements Closeable {
     CacheValue cacheValue = _cacheMap.get(key);
     if (cacheValue == null) {
       _misses.mark();
+//      System.out.println("Loud Miss [" + fileName + "] Key [" + key + "]");
     } else {
       _hits.mark();
     }
@@ -278,6 +279,8 @@ public class BaseCache extends Cache implements Closeable {
     CacheValue cacheValue = _cacheMap.getQuietly(key);
     if (cacheValue != null) {
       _hits.mark();
+    } else {
+//      System.out.println("Quiet Miss [" + fileName + "] Key [" + key + "]");
     }
     return cacheValue;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 9396638..c4959ae 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.store.blockcache.LastModified;
@@ -79,7 +80,7 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsSymlin
 
   private static final Timer TIMER;
   private static final BlockingQueue<Closeable> CLOSING_QUEUE = new LinkedBlockingQueue<Closeable>();
-  private static final BlockingQueue<SequentialRef> SEQUENTIAL_CLOSING_QUEUE = new
LinkedBlockingQueue<SequentialRef>();
+  private static final BlockingQueue<WeakRef> WEAK_CLOSING_QUEUE = new LinkedBlockingQueue<WeakRef>();
 
   static class FStat {
     FStat(FileStatus fileStatus) {
@@ -133,10 +134,18 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   protected final Map<Path, FSDataInputStream> _inputMap = new ConcurrentHashMap<Path,
FSDataInputStream>();
   protected final boolean _useCache = true;
   protected final boolean _asyncClosing;
+  protected final Path _localCachePath = new Path("/tmp/cache");
+  protected final SequentialReadControl _sequentialReadControl;
 
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
+    this(configuration, path, new SequentialReadControl(new BlurConfiguration()));
+  }
+
+  public HdfsDirectory(Configuration configuration, Path path, SequentialReadControl sequentialReadControl)
+      throws IOException {
     _fileSystem = path.getFileSystem(configuration);
     _path = _fileSystem.makeQualified(path);
+    _sequentialReadControl = sequentialReadControl;
     if (_path.toUri().getScheme().equals("hdfs")) {
       _asyncClosing = true;
     } else {
@@ -177,12 +186,12 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     return new TimerTask() {
       @Override
       public void run() {
-        Iterator<SequentialRef> iterator = SEQUENTIAL_CLOSING_QUEUE.iterator();
+        Iterator<WeakRef> iterator = WEAK_CLOSING_QUEUE.iterator();
         while (iterator.hasNext()) {
-          SequentialRef sequentialRef = iterator.next();
-          if (sequentialRef.isClosable()) {
+          WeakRef weakRef = iterator.next();
+          if (weakRef.isClosable()) {
             iterator.remove();
-            CLOSING_QUEUE.add(sequentialRef._inputStream);
+            CLOSING_QUEUE.add(weakRef._inputStream);
           }
         }
       }
@@ -301,9 +310,9 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     FSDataInputStream inputRandomAccess = openForInput(name);
     long fileLength = fileLength(name);
     Path path = getPath(name);
-    boolean sequentialReadAllowed = name.endsWith(".fdt");
-    // boolean sequentialReadAllowed = true;
-    return new HdfsIndexInput(this, inputRandomAccess, fileLength, _metricsGroup, path, sequentialReadAllowed);
+    HdfsIndexInput input = new HdfsIndexInput(this, inputRandomAccess, fileLength, _metricsGroup,
path,
+        _sequentialReadControl.clone());
+    return input;
   }
 
   protected synchronized FSDataInputStream openForInput(String name) throws IOException {
@@ -570,17 +579,21 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   }
 
   protected FSDataInputStream openForSequentialInput(Path p, Object key) throws IOException
{
-    FSDataInputStream input = _fileSystem.open(p);
-    SEQUENTIAL_CLOSING_QUEUE.add(new SequentialRef(input, key));
+    return openInputStream(_fileSystem, p, key);
+  }
+
+  protected FSDataInputStream openInputStream(FileSystem fileSystem, Path p, Object key)
throws IOException {
+    FSDataInputStream input = fileSystem.open(p);
+    WEAK_CLOSING_QUEUE.add(new WeakRef(input, key));
     return input;
   }
 
-  static class SequentialRef {
+  static class WeakRef {
 
     final FSDataInputStream _inputStream;
     final WeakReference<Object> _ref;
 
-    SequentialRef(FSDataInputStream input, Object key) {
+    WeakRef(FSDataInputStream input, Object key) {
       _inputStream = input;
       _ref = new WeakReference<Object>(key);
     }
@@ -591,4 +604,14 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
 
   }
 
+  protected boolean isAlreadyExistsLocally(FileSystem localFileSystem, Path localFile, long
length) throws IOException {
+    if (localFileSystem.exists(localFile)) {
+      FileStatus fileStatus = localFileSystem.getFileStatus(localFile);
+      if (fileStatus.getLen() == length) {
+        return true;
+      }
+    }
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
index bd9fb60..bcbd26f 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
@@ -18,6 +18,8 @@ package org.apache.blur.store.hdfs;
 
 import java.io.IOException;
 
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
@@ -27,28 +29,28 @@ import org.apache.lucene.store.IndexInput;
 
 public class HdfsIndexInput extends ReusedBufferedIndexInput {
 
+  private static final Log LOG = LogFactory.getLog(HdfsIndexInput.class);
+
   private final long _length;
   private final FSDataInputStream _inputStream;
   private final MetricsGroup _metricsGroup;
   private final Path _path;
   private final HdfsDirectory _dir;
-  private final boolean _sequentialReadAllowed;
+
+  private SequentialReadControl _sequentialReadControl;
 
   private long _prevFilePointer;
-  private long _sequentialReadDetectorCounter;
-  private long _sequentialReadThreshold = 50;
-  private boolean _sequentialRead;
   private FSDataInputStream _sequentialInputStream;
 
   public HdfsIndexInput(HdfsDirectory dir, FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup,
-      Path path, boolean sequentialReadAllowed) throws IOException {
+      Path path, SequentialReadControl sequentialReadControl) throws IOException {
     super("HdfsIndexInput(" + path.toString() + ")");
+    _sequentialReadControl = sequentialReadControl;
     _dir = dir;
     _inputStream = inputStream;
     _length = length;
     _metricsGroup = metricsGroup;
     _path = path;
-    _sequentialReadAllowed = sequentialReadAllowed;
   }
 
   @Override
@@ -63,70 +65,79 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
 
   @Override
   protected void readInternal(byte[] b, int offset, int length) throws IOException {
-    Tracer trace = Trace.trace("filesystem - read", Trace.param("file", _path),
-        Trace.param("location", getFilePointer()), Trace.param("length", length));
-    try {
-      long start = System.nanoTime();
-      long filePointer = getFilePointer();
-      if (!_sequentialReadAllowed) {
-        randomAccessRead(b, offset, length, start, filePointer);
-        return;
-      }
-      if (filePointer == _prevFilePointer) {
-        _sequentialReadDetectorCounter++;
-      } else {
-        if (_sequentialRead) {
-          // System.out.println("Sequential Read OFF clone [" + _isClone + "] ["
-          // + _path + "] count ["
-          // + (_sequentialReadDetectorCounter - _sequentialReadThreshold) +
-          // "]");
+    long start = System.nanoTime();
+    long filePointer = getFilePointer();
+    if (!_sequentialReadControl.isSequentialReadAllowed()) {
+      randomAccessRead(b, offset, length, start, filePointer);
+      return;
+    }
+    if (filePointer == _prevFilePointer) {
+      _sequentialReadControl.incrReadDetector();
+    } else {
+      if (_sequentialReadControl.isEnabled()) {
+        // System.out.println("Sequential Read OFF clone [" + _isClone + "] ["
+        // + _path + "] count ["
+        // + (_sequentialReadDetectorCounter - _sequentialReadThreshold) +
+        // "]");
+
+        if (_sequentialReadControl.shouldSkipInput(filePointer, _prevFilePointer)) {
+          _sequentialInputStream.skip(filePointer - _prevFilePointer);
+        } else {
+          LOG.debug("Current Pos [{0}] Prev Pos [{1}] Diff [{2}]", filePointer, _prevFilePointer,
filePointer
+              - _prevFilePointer);
         }
-        _sequentialReadDetectorCounter = 0;
-        _sequentialRead = false;
       }
-      if (_sequentialReadDetectorCounter > _sequentialReadThreshold && !_sequentialRead)
{
-        // System.out.println("Sequential Read ON clone [" + _isClone + "] [" +
-        // _path + "]");
-        _sequentialRead = true;
-        if (_sequentialInputStream == null) {
-          _sequentialInputStream = _dir.openForSequentialInput(_path, this);
-        }
+    }
+    if (_sequentialReadControl.switchToSequentialRead()) {
+
+      _sequentialReadControl.setEnabled(true);
+      if (_sequentialInputStream == null) {
+        Tracer trace = Trace.trace("filesystem - read - openForSequentialInput", Trace.param("file",
_path),
+            Trace.param("location", getFilePointer()));
+        _sequentialInputStream = _dir.openForSequentialInput(_path, this);
+        trace.done();
       }
-      if (_sequentialRead) {
-        long pos = _sequentialInputStream.getPos();
-        if (pos != filePointer) {
-          _sequentialInputStream.seek(filePointer);
-        }
-        _sequentialInputStream.readFully(b, offset, length);
-        // @TODO add metrics back
-      } else {
-        filePointer = randomAccessRead(b, offset, length, start, filePointer);
+    }
+    if (_sequentialReadControl.isEnabled()) {
+      long pos = _sequentialInputStream.getPos();
+      if (pos != filePointer) {
+        _sequentialInputStream.seek(filePointer);
       }
-      _prevFilePointer = filePointer;
-    } finally {
-      trace.done();
+      _sequentialInputStream.readFully(b, offset, length);
+      filePointer = _sequentialInputStream.getPos();
+      // @TODO add metrics back
+    } else {
+      filePointer = randomAccessRead(b, offset, length, start, filePointer);
     }
+    _prevFilePointer = filePointer;
   }
 
   private long randomAccessRead(byte[] b, int offset, int length, long start, long filePointer)
throws IOException {
-    int olen = length;
-    while (length > 0) {
-      int amount;
-      amount = _inputStream.read(filePointer, b, offset, length);
-      length -= amount;
-      offset += amount;
-      filePointer += amount;
+    Tracer trace = Trace.trace("filesystem - read - randomAccessRead", Trace.param("file",
_path),
+        Trace.param("location", getFilePointer()), Trace.param("length", length));
+    try {
+      int olen = length;
+      while (length > 0) {
+        int amount;
+        amount = _inputStream.read(filePointer, b, offset, length);
+        length -= amount;
+        offset += amount;
+        filePointer += amount;
+      }
+      long end = System.nanoTime();
+      _metricsGroup.readRandomAccess.update((end - start) / 1000);
+      _metricsGroup.readRandomThroughput.mark(olen);
+      return filePointer;
+    } finally {
+      trace.done();
     }
-    long end = System.nanoTime();
-    _metricsGroup.readRandomAccess.update((end - start) / 1000);
-    _metricsGroup.readRandomThroughput.mark(olen);
-    return filePointer;
   }
 
   @Override
   public IndexInput clone() {
     HdfsIndexInput clone = (HdfsIndexInput) super.clone();
     clone._sequentialInputStream = null;
+    clone._sequentialReadControl = _sequentialReadControl.clone();
     return clone;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java
new file mode 100644
index 0000000..dcf821e
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/SequentialReadControl.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DEFAULT_READ_SEQUENTIAL_SKIP_THRESHOLD;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DEFAULT_READ_SEQUENTIAL_THRESHOLD;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_MERGE_READ_SEQUENTIAL_SKIP_THRESHOLD;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_MERGE_READ_SEQUENTIAL_THRESHOLD;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.utils.BlurConstants;
+
+public class SequentialReadControl implements Cloneable {
+
+  private final BlurConfiguration _configuration;
+
+  private long _sequentialReadThreshold;
+  private long _sequentialReadSkipThreshold;
+  private int _sequentialReadDetectorCounter = 0;
+  private boolean _sequentialReadAllowed = true;
+  private boolean _sequentialRead;
+
+  public SequentialReadControl(BlurConfiguration configuration) {
+    _configuration = configuration;
+    setup(_configuration, this);
+  }
+
+  @Override
+  public SequentialReadControl clone() {
+    try {
+      SequentialReadControl control = (SequentialReadControl) super.clone();
+      setup(_configuration, control);
+      return control;
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void setup(BlurConfiguration configuration, SequentialReadControl control)
{
+    if (Thread.currentThread().getName().startsWith(BlurConstants.SHARED_MERGE_SCHEDULER_PREFIX))
{
+      control._sequentialReadThreshold = configuration.getLong(BLUR_SHARD_MERGE_READ_SEQUENTIAL_THRESHOLD,
5L);
+      control._sequentialReadSkipThreshold = configuration.getLong(BLUR_SHARD_MERGE_READ_SEQUENTIAL_SKIP_THRESHOLD,
+          128L * 1024L);
+    } else {
+      control._sequentialReadThreshold = configuration.getLong(BLUR_SHARD_DEFAULT_READ_SEQUENTIAL_THRESHOLD,
25L);
+      control._sequentialReadSkipThreshold = configuration.getLong(BLUR_SHARD_DEFAULT_READ_SEQUENTIAL_SKIP_THRESHOLD,
+          32L * 1024L);
+    }
+  }
+
+  public boolean isSequentialReadAllowed() {
+    return _sequentialReadAllowed;
+  }
+
+  public void incrReadDetector() {
+    _sequentialReadDetectorCounter++;
+  }
+
+  public boolean switchToSequentialRead() {
+    if (_sequentialReadDetectorCounter > _sequentialReadThreshold && !_sequentialRead)
{
+      return true;
+    }
+    return false;
+  }
+
+  public boolean shouldSkipInput(long filePointer, long prevFilePointer) {
+    return filePointer - prevFilePointer <= _sequentialReadSkipThreshold;
+  }
+
+  public boolean isEnabled() {
+    return _sequentialRead;
+  }
+
+  public void setEnabled(boolean sequentialRead) {
+    _sequentialRead = sequentialRead;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 6dbe515..ba22d7e 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -114,6 +114,11 @@ public class BlurConstants {
   public static final String BLUR_SHARD_SMALL_MERGE_THRESHOLD = "blur.shard.small.merge.threshold";
   public static final String BLUR_SHARD_REQUEST_CACHE_SIZE = "blur.shard.request.cache.size";
   public static final String BLUR_GC_BACK_PRESSURE_HEAP_RATIO = "blur.gc.back.pressure.heap.ratio";
+  
+  public static final String BLUR_SHARD_DEFAULT_READ_SEQUENTIAL_SKIP_THRESHOLD = "blur.shard.default.read.sequential.skip.threshold";
+  public static final String BLUR_SHARD_DEFAULT_READ_SEQUENTIAL_THRESHOLD = "blur.shard.default.read.sequential.threshold";
+  public static final String BLUR_SHARD_MERGE_READ_SEQUENTIAL_SKIP_THRESHOLD = "blur.shard.merge.read.sequential.skip.threshold";
+  public static final String BLUR_SHARD_MERGE_READ_SEQUENTIAL_THRESHOLD = "blur.shard.merge.read.sequential.threshold";
 
   public static final String BLUR_FIELDTYPE = "blur.fieldtype.";
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4302faea/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 5b2c599..aa9ab7b 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -213,6 +213,18 @@ blur.indexmanager.mutate.thread.count=8
 # The number of thread used for parallel faceting in the index manager
 blur.indexmanager.facet.thread.count=8
 
+# The number of reads during normal read operations of a index input in sequence that will
trigger a sequential read switch.
+blur.shard.default.read.sequential.threshold=25
+
+# The number of bytes during normal read operations where sequential read of an input stream
will skip instead of seeking and breaking the sequential read.
+blur.shard.default.read.sequential.skip.threshold=32768
+
+# The number of reads during a merge of a index input in sequence that will trigger a sequential
read switch.
+blur.shard.merge.read.sequential.threshold=5
+
+# The number of bytes during a merge where sequential read of an input stream will skip instead
of seeking and breaking the sequential read.
+blur.shard.merge.read.sequential.skip.threshold=131072
+
 # The default index deletion policy class that manages removing old segments that are no
longer referenced.
 blur.shard.index.deletion.policy.maxage=org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy
 


Mime
View raw message