incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [7/8] git commit: Adding multiple ways to read data from HDFS, from testing almost everyway is better than case '0' which is how it was implemented in Blur.
Date Wed, 28 Aug 2013 18:16:53 GMT
Adding multiple ways to read data from HDFS, from testing almost everyway is better than case
'0' which is how it was implemented in Blur.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/b778013c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/b778013c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/b778013c

Branch: refs/heads/master
Commit: b778013c146bb2ed727d0d28f77572551671736a
Parents: 267d8ea
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Aug 28 14:14:57 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Aug 28 14:14:57 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 84 ++++++++++++++++++--
 1 file changed, 77 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b778013c/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index b030bee..6da74e0 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.WeakHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -54,6 +55,41 @@ public class HdfsDirectory extends Directory {
 
   private static final Log LOG = LogFactory.getLog(HdfsDirectory.class);
 
+  public static AtomicInteger fetchImpl = new AtomicInteger(1);
+
+//  static {
+//    Thread thread = new Thread(new Runnable() {
+//      @Override
+//      public void run() {
+//        while (true) {
+//          File file = new File("/tmp/fetch.impl");
+//          if (file.exists()) {
+//            try {
+//              BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
+//              String line = reader.readLine();
+//              String trim = line.trim();
+//              int i = Integer.parseInt(trim);
+//              if (i != fetchImpl.get()) {
+//                LOG.info("Changing fetch impl [" + i + "]");
+//                fetchImpl.set(i);
+//              }
+//              reader.close();
+//            } catch (Exception e) {
+//              LOG.error("Unknown error", e);
+//            }
+//          }
+//          try {
+//            Thread.sleep(5000);
+//          } catch (InterruptedException e) {
+//            return;
+//          }
+//        }
+//      }
+//    });
+//    thread.setDaemon(true);
+//    thread.start();
+//  }
+
   private final Path path;
   private final FileSystem fileSystem;
   private final MetricsGroup metricsGroup;
@@ -114,6 +150,7 @@ public class HdfsDirectory extends Directory {
     private FSDataInputStream inputStream;
     private boolean isClone;
     private final MetricsGroup metricsGroup;
+    private int _readVersion;
 
     public HdfsIndexInput(FileSystem fileSystem, Path filePath, MetricsGroup metricsGroup)
throws IOException {
       super(filePath.toString());
@@ -121,6 +158,7 @@ public class HdfsDirectory extends Directory {
       FileStatus fileStatus = fileSystem.getFileStatus(filePath);
       len = fileStatus.getLen();
       this.metricsGroup = metricsGroup;
+      _readVersion = fetchImpl.get();
     }
 
     @Override
@@ -135,14 +173,45 @@ public class HdfsDirectory extends Directory {
 
     @Override
     protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      synchronized (inputStream) {
-        long start = System.nanoTime();
-        inputStream.seek(getFilePointer());
-        inputStream.readFully(b, offset, length);
-        long end = System.nanoTime();
-        metricsGroup.readAccess.update((end - start) / 1000);
-        metricsGroup.readThroughput.mark(length);
+      long start = System.nanoTime();
+      long filePointer = getFilePointer();
+      switch (_readVersion) {
+      case 0:
+        synchronized (inputStream) {
+          inputStream.seek(getFilePointer());
+          inputStream.readFully(b, offset, length);
+        }
+        break;
+      case 1:
+        while (length > 0) {
+          int amount;
+          synchronized (inputStream) {
+            inputStream.seek(filePointer);
+            amount = inputStream.read(b, offset, length);
+          }
+          length -= amount;
+          offset += amount;
+          filePointer += amount;
+        }
+        break;
+      case 2:
+        inputStream.readFully(filePointer, b, offset, length);
+        break;
+      case 3:
+        while (length > 0) {
+          int amount;
+          amount = inputStream.read(filePointer, b, offset, length);
+          length -= amount;
+          offset += amount;
+          filePointer += amount;
+        }
+        break;
+      default:
+        break;
       }
+      long end = System.nanoTime();
+      metricsGroup.readAccess.update((end - start) / 1000);
+      metricsGroup.readThroughput.mark(length);
     }
 
     @Override
@@ -156,6 +225,7 @@ public class HdfsDirectory extends Directory {
     public ReusedBufferedIndexInput clone() {
       HdfsIndexInput clone = (HdfsIndexInput) super.clone();
       clone.isClone = true;
+      clone._readVersion = fetchImpl.get();
       return clone;
     }
   }


Mime
View raw message