incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [4/6] git commit: Removing the stream reader. Testing seems to add complexity without improving performance.
Date Tue, 17 Mar 2015 13:14:34 GMT
Removing the stream reader.  Testing seems to add complexity without improving performance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9a694bce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9a694bce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9a694bce

Branch: refs/heads/master
Commit: 9a694bce5e0070a04a10958f76e2125149a20628
Parents: 7444b2c
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Mar 17 09:13:12 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Mar 17 09:13:12 2015 -0400

----------------------------------------------------------------------
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 48 ++++++++++++--------
 .../store/hdfs/HdfsRandomAccessIndexInput.java  | 14 +-----
 2 files changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9a694bce/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 6684376..e5641b3 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -30,6 +30,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.log.Log;
@@ -114,9 +116,11 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   protected final Map<String, FStat> _fileStatusMap = new ConcurrentHashMap<String,
FStat>();
   protected final Map<String, Boolean> _symlinkMap = new ConcurrentHashMap<String,
Boolean>();
   protected final Map<String, Path> _symlinkPathMap = new ConcurrentHashMap<String,
Path>();
-  protected final Map<Path, StreamPair> _inputMap = new ConcurrentHashMap<Path,
StreamPair>();
+  protected final Map<Path, FSDataInputStream> _inputMap = new ConcurrentHashMap<Path,
FSDataInputStream>();
   protected final boolean _useCache = true;
 
+  private ExecutorService _service;
+
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
     _fileSystem = path.getFileSystem(configuration);
     _path = _fileSystem.makeQualified(path);
@@ -149,6 +153,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         }
       }
     }
+    _service = Executors.newCachedThreadPool();
   }
 
   private String getRealFileName(String name) {
@@ -211,10 +216,19 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       @Override
       public void close() throws IOException {
         super.close();
+        outputStream.sync();
         _fileStatusMap.put(name, new FStat(System.currentTimeMillis(), outputStream.getPos()));
-        outputStream.close();
-        openForInput(name, true);
-        openForInput(name, false);
+        _service.submit(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              outputStream.close();
+            } catch (IOException e) {
+              LOG.error("Unknown error while trying to close outputstream [{0}]", outputStream);
+            }
+          }
+        });
+        openForInput(name);
       }
 
       @Override
@@ -240,27 +254,23 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     if (!fileExists(name)) {
       throw new FileNotFoundException("File [" + name + "] not found.");
     }
-    FSDataInputStream inputRandomAccess = openForInput(name, false);
-    FSDataInputStream inputStreamAccess = openForInput(name, true);
+    FSDataInputStream inputRandomAccess = openForInput(name);
     long fileLength = fileLength(name);
     Path path = getPath(name);
-    HdfsStreamIndexInput streamInput = new HdfsStreamIndexInput(inputStreamAccess, fileLength,
_metricsGroup, path);
-    return new HdfsRandomAccessIndexInput(inputRandomAccess, fileLength, _metricsGroup, path,
streamInput);
+    return new HdfsRandomAccessIndexInput(inputRandomAccess, fileLength, _metricsGroup, path);
   }
 
-  protected synchronized FSDataInputStream openForInput(String name, boolean stream) throws
IOException {
+  protected synchronized FSDataInputStream openForInput(String name) throws IOException {
     Path path = getPath(name);
-    StreamPair streamPair = _inputMap.get(path);
-    if (streamPair != null) {
-      return streamPair.getInputStream(stream);
+    FSDataInputStream input = _inputMap.get(path);
+    if (input != null) {
+      return input;
     }
     Tracer trace = Trace.trace("filesystem - open", Trace.param("path", path));
     try {
       FSDataInputStream randomInputStream = _fileSystem.open(path);
-      FSDataInputStream streamInputStream = _fileSystem.open(path);
-      streamPair = new StreamPair(randomInputStream, streamInputStream);
-      _inputMap.put(path, streamPair);
-      return streamPair.getInputStream(stream);
+      _inputMap.put(path, randomInputStream);
+      return randomInputStream;
     } finally {
       trace.done();
     }
@@ -331,10 +341,10 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
 
   protected void delete(String name) throws IOException {
     Path path = getPathOrSymlink(name);
-    StreamPair streamPair = _inputMap.remove(path);
+    FSDataInputStream inputStream = _inputMap.remove(path);
     Tracer trace = Trace.trace("filesystem - delete", Trace.param("path", path));
-    if (streamPair != null) {
-      streamPair.close();
+    if (inputStream != null) {
+      IOUtils.closeQuietly(inputStream);
     }
     if (_useCache) {
       _symlinkMap.remove(name);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9a694bce/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
index 2db383f..dcdd6a4 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
@@ -31,16 +31,13 @@ public class HdfsRandomAccessIndexInput extends ReusedBufferedIndexInput
{
   private final FSDataInputStream _inputStream;
   private final MetricsGroup _metricsGroup;
   private final Path _path;
-  private final HdfsStreamIndexInput _streamInput;
 
-  public HdfsRandomAccessIndexInput(FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup, Path path,
-      HdfsStreamIndexInput streamInput) throws IOException {
+  public HdfsRandomAccessIndexInput(FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup, Path path) throws IOException {
     super("HdfsRandomAccessIndexInput(" + path.toString() + ")");
     _inputStream = inputStream;
     _length = length;
     _metricsGroup = metricsGroup;
     _path = path;
-    _streamInput = streamInput;
   }
 
   @Override
@@ -78,15 +75,6 @@ public class HdfsRandomAccessIndexInput extends ReusedBufferedIndexInput
{
 
   @Override
   public IndexInput clone() {
-    if (IndexInputMergeUtil.isMergeThread()) {
-      IndexInput clone = _streamInput.clone();
-      try {
-        clone.seek(getFilePointer());
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      return clone;
-    }
     return (HdfsRandomAccessIndexInput) super.clone();
   }
 


Mime
View raw message