incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Adding sequential access detecting.
Date Wed, 18 Mar 2015 02:20:16 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 98601f6c2 -> e965e329d


Adding sequential access detecting.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/10c8699c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/10c8699c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/10c8699c

Branch: refs/heads/master
Commit: 10c8699c7c613f802f814b148f631608a1c253fe
Parents: 98601f6
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Mar 17 22:01:21 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Mar 17 22:01:21 2015 -0400

----------------------------------------------------------------------
 .../store/blockcache_v2/CacheDirectory.java     |   5 +-
 .../store/blockcache_v2/CacheIndexOutput.java   |   6 +-
 .../apache/blur/store/hdfs/HdfsDirectory.java   |   2 +-
 .../apache/blur/store/hdfs/HdfsIndexInput.java  | 109 +++++++++++++++++++
 .../store/hdfs/HdfsRandomAccessIndexInput.java  |  85 ---------------
 5 files changed, 116 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/10c8699c/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheDirectory.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheDirectory.java
index 24b31aa..e2b2303 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheDirectory.java
@@ -84,11 +84,10 @@ public class CacheDirectory extends Directory implements DirectoryDecorator,
Las
   }
 
   public IndexOutput createOutput(String name, IOContext context) throws IOException {
-    IndexOutput indexOutput = _internal.createOutput(name, context);
     if (_cache.cacheFileForWriting(this, name, context) || isCachableFile(name)) {
-      return new CacheIndexOutput(this, name, indexOutput, _cache);
+      return new CacheIndexOutput(this, name, _cache, _internal, context);
     }
-    return indexOutput;
+    return _internal.createOutput(name, context);
   }
 
   public void deleteFile(String name) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/10c8699c/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
index 13d3aac..93e1e80 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 
 import org.apache.blur.store.buffer.BufferStore;
 import org.apache.blur.store.buffer.Store;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexOutput;
 
 public class CacheIndexOutput extends IndexOutput {
@@ -39,7 +41,7 @@ public class CacheIndexOutput extends IndexOutput {
   private byte[] _buffer;
   private int _bufferPosition;
 
-  public CacheIndexOutput(CacheDirectory directory, String fileName, IndexOutput indexOutput,
Cache cache)
+  public CacheIndexOutput(CacheDirectory directory, String fileName, Cache cache, Directory
dir, IOContext context)
       throws IOException {
     _cache = cache;
     _directory = directory;
@@ -47,7 +49,7 @@ public class CacheIndexOutput extends IndexOutput {
     _fileBufferSize = _cache.getFileBufferSize(_directory, _fileName);
     _cacheBlockSize = _cache.getCacheBlockSize(_directory, _fileName);
     _fileId = _cache.getFileId(_directory, _fileName);
-    _indexOutput = indexOutput;
+    _indexOutput = dir.createOutput(fileName, context);
     _store = BufferStore.instance(_cacheBlockSize);
     _buffer = _store.takeBuffer(_cacheBlockSize);
     _shouldBeQuiet = _cache.shouldBeQuiet(directory, fileName);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/10c8699c/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index e5641b3..d6b506b 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -257,7 +257,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     FSDataInputStream inputRandomAccess = openForInput(name);
     long fileLength = fileLength(name);
     Path path = getPath(name);
-    return new HdfsRandomAccessIndexInput(inputRandomAccess, fileLength, _metricsGroup, path);
+    return new HdfsIndexInput(inputRandomAccess, fileLength, _metricsGroup, path);
   }
 
   protected synchronized FSDataInputStream openForInput(String name) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/10c8699c/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
new file mode 100644
index 0000000..45a54a2
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import java.io.IOException;
+
+import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
+import org.apache.blur.trace.Trace;
+import org.apache.blur.trace.Tracer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.IndexInput;
+
+public class HdfsIndexInput extends ReusedBufferedIndexInput {
+
+  private final long _length;
+  private final FSDataInputStream _inputStream;
+  private final MetricsGroup _metricsGroup;
+  private final Path _path;
+
+  private long _prevFilePointer;
+  private long _sequentialReadDetectorCounter;
+  private long _sequentialReadThreshold = 50;
+  private boolean _sequentialRead;
+  private boolean _isClone;
+
+  public HdfsIndexInput(FSDataInputStream inputStream, long length, MetricsGroup metricsGroup,
Path path)
+      throws IOException {
+    super("HdfsIndexInput(" + path.toString() + ")");
+    _inputStream = inputStream;
+    _length = length;
+    _metricsGroup = metricsGroup;
+    _path = path;
+  }
+
+  @Override
+  public long length() {
+    return _length;
+  }
+
+  @Override
+  protected void seekInternal(long pos) throws IOException {
+
+  }
+
+  @Override
+  protected void readInternal(byte[] b, int offset, int length) throws IOException {
+    Tracer trace = Trace.trace("filesystem - read", Trace.param("file", _path),
+        Trace.param("location", getFilePointer()), Trace.param("length", length));
+    try {
+      long start = System.nanoTime();
+      long filePointer = getFilePointer();
+      if (filePointer == _prevFilePointer) {
+        _sequentialReadDetectorCounter++;
+      } else {
+        if (_sequentialRead) {
+//          System.out.println("Sequential Read OFF clone [" + _isClone + "] [" + _path +
"] count ["
+//              + (_sequentialReadDetectorCounter - _sequentialReadThreshold) + "]");
+        }
+        _sequentialReadDetectorCounter = 0;
+        _sequentialRead = false;
+      }
+      if (_sequentialReadDetectorCounter > _sequentialReadThreshold && !_sequentialRead)
{
+//        System.out.println("Sequential Read ON clone [" + _isClone + "] [" + _path + "]");
+        _sequentialRead = true;
+      }
+      int olen = length;
+      while (length > 0) {
+        int amount;
+        amount = _inputStream.read(filePointer, b, offset, length);
+        length -= amount;
+        offset += amount;
+        filePointer += amount;
+      }
+      long end = System.nanoTime();
+      _metricsGroup.readRandomAccess.update((end - start) / 1000);
+      _metricsGroup.readRandomThroughput.mark(olen);
+      _prevFilePointer = filePointer;
+    } finally {
+      trace.done();
+    }
+  }
+
+  @Override
+  public IndexInput clone() {
+    HdfsIndexInput clone = (HdfsIndexInput) super.clone();
+    clone._isClone = true;
+    return clone;
+  }
+
+  @Override
+  protected void closeInternal() throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/10c8699c/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
deleted file mode 100644
index dcdd6a4..0000000
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.store.hdfs;
-
-import java.io.IOException;
-
-import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
-import org.apache.blur.trace.Trace;
-import org.apache.blur.trace.Tracer;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.IndexInput;
-
-public class HdfsRandomAccessIndexInput extends ReusedBufferedIndexInput {
-
-  private final long _length;
-  private final FSDataInputStream _inputStream;
-  private final MetricsGroup _metricsGroup;
-  private final Path _path;
-
-  public HdfsRandomAccessIndexInput(FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup, Path path) throws IOException {
-    super("HdfsRandomAccessIndexInput(" + path.toString() + ")");
-    _inputStream = inputStream;
-    _length = length;
-    _metricsGroup = metricsGroup;
-    _path = path;
-  }
-
-  @Override
-  public long length() {
-    return _length;
-  }
-
-  @Override
-  protected void seekInternal(long pos) throws IOException {
-
-  }
-
-  @Override
-  protected void readInternal(byte[] b, int offset, int length) throws IOException {
-    Tracer trace = Trace.trace("filesystem - read", Trace.param("file", _path),
-        Trace.param("location", getFilePointer()), Trace.param("length", length));
-    try {
-      long start = System.nanoTime();
-      long filePointer = getFilePointer();
-      int olen = length;
-      while (length > 0) {
-        int amount;
-        amount = _inputStream.read(filePointer, b, offset, length);
-        length -= amount;
-        offset += amount;
-        filePointer += amount;
-      }
-      long end = System.nanoTime();
-      _metricsGroup.readRandomAccess.update((end - start) / 1000);
-      _metricsGroup.readRandomThroughput.mark(olen);
-    } finally {
-      trace.done();
-    }
-  }
-
-  @Override
-  public IndexInput clone() {
-    return (HdfsRandomAccessIndexInput) super.clone();
-  }
-
-  @Override
-  protected void closeInternal() throws IOException {
-
-  }
-}


Mime
View raw message