incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Adding automatic sequential read and automatic resource management.
Date Wed, 18 Mar 2015 12:58:31 GMT
Adding automatic sequential read and automatic resource management.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/cfd95261
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/cfd95261
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/cfd95261

Branch: refs/heads/master
Commit: cfd95261d2dbb3c662eddaf4b0aeb5723c4a8a19
Parents: 6b37202
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Mar 18 08:57:08 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Mar 18 08:57:08 2015 -0400

----------------------------------------------------------------------
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 91 ++++++++++++++++----
 .../apache/blur/store/hdfs/HdfsIndexInput.java  | 62 +++++++++----
 2 files changed, 120 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cfd95261/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index d6b506b..e1e74a4 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -20,18 +20,23 @@ package org.apache.blur.store.hdfs;
 import static org.apache.blur.metrics.MetricsConstants.HDFS;
 import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
 
+import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.lang.ref.WeakReference;
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.WeakHashMap;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.log.Log;
@@ -72,6 +77,10 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsSymlin
    */
   protected static Map<URI, MetricsGroup> _metricsGroupMap = new WeakHashMap<URI,
MetricsGroup>();
 
+  private static final Timer TIMER;
+  private static final BlockingQueue<Closeable> CLOSING_QUEUE = new LinkedBlockingQueue<Closeable>();
+  private static final BlockingQueue<SequentialRef> SEQUENTIAL_CLOSING_QUEUE = new
LinkedBlockingQueue<SequentialRef>();
+
   static class FStat {
     FStat(FileStatus fileStatus) {
       this(fileStatus.getModificationTime(), fileStatus.getLen());
@@ -107,7 +116,12 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       }
       return _random;
     }
+  }
 
+  static {
+    TIMER = new Timer("HdfsDirectory-Timer", true);
+    TIMER.schedule(getClosingQueueTimerTask(), TimeUnit.SECONDS.toMillis(3), TimeUnit.SECONDS.toMillis(3));
+    TIMER.schedule(getSequentialRefClosingQueueTimerTask(), TimeUnit.SECONDS.toMillis(3),
TimeUnit.SECONDS.toMillis(3));
   }
 
   protected final Path _path;
@@ -119,10 +133,9 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   protected final Map<Path, FSDataInputStream> _inputMap = new ConcurrentHashMap<Path,
FSDataInputStream>();
   protected final boolean _useCache = true;
 
-  private ExecutorService _service;
-
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
     _fileSystem = path.getFileSystem(configuration);
+
     _path = _fileSystem.makeQualified(path);
     _fileSystem.mkdirs(path);
     setLockFactory(NoLockFactory.getNoLockFactory());
@@ -153,7 +166,38 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         }
       }
     }
-    _service = Executors.newCachedThreadPool();
+  }
+
+  private static TimerTask getSequentialRefClosingQueueTimerTask() {
+    return new TimerTask() {
+      @Override
+      public void run() {
+        Iterator<SequentialRef> iterator = SEQUENTIAL_CLOSING_QUEUE.iterator();
+        while (iterator.hasNext()) {
+          SequentialRef sequentialRef = iterator.next();
+          if (sequentialRef.isClosable()) {
+            iterator.remove();
+            CLOSING_QUEUE.add(sequentialRef._inputStream);
+          }
+        }
+      }
+    };
+  }
+
+  private static TimerTask getClosingQueueTimerTask() {
+    return new TimerTask() {
+      @Override
+      public void run() {
+        while (true) {
+          Closeable closeable = CLOSING_QUEUE.poll();
+          if (closeable == null) {
+            return;
+          }
+          LOG.debug("Closing [{0}]", closeable);
+          org.apache.hadoop.io.IOUtils.cleanup(LOG, closeable);
+        }
+      }
+    };
   }
 
   private String getRealFileName(String name) {
@@ -218,16 +262,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         super.close();
         outputStream.sync();
         _fileStatusMap.put(name, new FStat(System.currentTimeMillis(), outputStream.getPos()));
-        _service.submit(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              outputStream.close();
-            } catch (IOException e) {
-              LOG.error("Unknown error while trying to close outputstream [{0}]", outputStream);
-            }
-          }
-        });
+        CLOSING_QUEUE.add(outputStream);
         openForInput(name);
       }
 
@@ -257,7 +292,9 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     FSDataInputStream inputRandomAccess = openForInput(name);
     long fileLength = fileLength(name);
     Path path = getPath(name);
-    return new HdfsIndexInput(inputRandomAccess, fileLength, _metricsGroup, path);
+    boolean sequentialReadAllowed = name.endsWith(".fdt");
+    // boolean sequentialReadAllowed = true;
+    return new HdfsIndexInput(this, inputRandomAccess, fileLength, _metricsGroup, path, sequentialReadAllowed);
   }
 
   protected synchronized FSDataInputStream openForInput(String name) throws IOException {
@@ -523,4 +560,26 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     return this;
   }
 
+  protected FSDataInputStream openForSequentialInput(Path p, Object key) throws IOException
{
+    FSDataInputStream input = _fileSystem.open(p);
+    SEQUENTIAL_CLOSING_QUEUE.add(new SequentialRef(input, key));
+    return input;
+  }
+
+  static class SequentialRef {
+
+    final FSDataInputStream _inputStream;
+    final WeakReference<Object> _ref;
+
+    SequentialRef(FSDataInputStream input, Object key) {
+      _inputStream = input;
+      _ref = new WeakReference<Object>(key);
+    }
+
+    boolean isClosable() {
+      return _ref.get() == null ? true : false;
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cfd95261/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
index 45a54a2..bd9fb60 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
@@ -31,20 +31,24 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
   private final FSDataInputStream _inputStream;
   private final MetricsGroup _metricsGroup;
   private final Path _path;
+  private final HdfsDirectory _dir;
+  private final boolean _sequentialReadAllowed;
 
   private long _prevFilePointer;
   private long _sequentialReadDetectorCounter;
   private long _sequentialReadThreshold = 50;
   private boolean _sequentialRead;
-  private boolean _isClone;
+  private FSDataInputStream _sequentialInputStream;
 
-  public HdfsIndexInput(FSDataInputStream inputStream, long length, MetricsGroup metricsGroup,
Path path)
-      throws IOException {
+  public HdfsIndexInput(HdfsDirectory dir, FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup,
+      Path path, boolean sequentialReadAllowed) throws IOException {
     super("HdfsIndexInput(" + path.toString() + ")");
+    _dir = dir;
     _inputStream = inputStream;
     _length = length;
     _metricsGroup = metricsGroup;
     _path = path;
+    _sequentialReadAllowed = sequentialReadAllowed;
   }
 
   @Override
@@ -64,41 +68,65 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
     try {
       long start = System.nanoTime();
       long filePointer = getFilePointer();
+      if (!_sequentialReadAllowed) {
+        randomAccessRead(b, offset, length, start, filePointer);
+        return;
+      }
       if (filePointer == _prevFilePointer) {
         _sequentialReadDetectorCounter++;
       } else {
         if (_sequentialRead) {
-//          System.out.println("Sequential Read OFF clone [" + _isClone + "] [" + _path +
"] count ["
-//              + (_sequentialReadDetectorCounter - _sequentialReadThreshold) + "]");
+          // System.out.println("Sequential Read OFF clone [" + _isClone + "] ["
+          // + _path + "] count ["
+          // + (_sequentialReadDetectorCounter - _sequentialReadThreshold) +
+          // "]");
         }
         _sequentialReadDetectorCounter = 0;
         _sequentialRead = false;
       }
       if (_sequentialReadDetectorCounter > _sequentialReadThreshold && !_sequentialRead)
{
-//        System.out.println("Sequential Read ON clone [" + _isClone + "] [" + _path + "]");
+        // System.out.println("Sequential Read ON clone [" + _isClone + "] [" +
+        // _path + "]");
         _sequentialRead = true;
+        if (_sequentialInputStream == null) {
+          _sequentialInputStream = _dir.openForSequentialInput(_path, this);
+        }
       }
-      int olen = length;
-      while (length > 0) {
-        int amount;
-        amount = _inputStream.read(filePointer, b, offset, length);
-        length -= amount;
-        offset += amount;
-        filePointer += amount;
+      if (_sequentialRead) {
+        long pos = _sequentialInputStream.getPos();
+        if (pos != filePointer) {
+          _sequentialInputStream.seek(filePointer);
+        }
+        _sequentialInputStream.readFully(b, offset, length);
+        // @TODO add metrics back
+      } else {
+        filePointer = randomAccessRead(b, offset, length, start, filePointer);
       }
-      long end = System.nanoTime();
-      _metricsGroup.readRandomAccess.update((end - start) / 1000);
-      _metricsGroup.readRandomThroughput.mark(olen);
       _prevFilePointer = filePointer;
     } finally {
       trace.done();
     }
   }
 
+  private long randomAccessRead(byte[] b, int offset, int length, long start, long filePointer)
throws IOException {
+    int olen = length;
+    while (length > 0) {
+      int amount;
+      amount = _inputStream.read(filePointer, b, offset, length);
+      length -= amount;
+      offset += amount;
+      filePointer += amount;
+    }
+    long end = System.nanoTime();
+    _metricsGroup.readRandomAccess.update((end - start) / 1000);
+    _metricsGroup.readRandomThroughput.mark(olen);
+    return filePointer;
+  }
+
   @Override
   public IndexInput clone() {
     HdfsIndexInput clone = (HdfsIndexInput) super.clone();
-    clone._isClone = true;
+    clone._sequentialInputStream = null;
     return clone;
   }
 


Mime
View raw message