incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] incubator-blur git commit: First phase of adding a streaming hdfs reads for better merge performance.
Date Wed, 12 Nov 2014 14:57:07 GMT
First phase of adding a streaming hdfs reads for better merge performance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/1e545080
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/1e545080
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/1e545080

Branch: refs/heads/master
Commit: 1e54508072aded9901864a55744abbd54b713471
Parents: 21512a6
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Nov 12 09:56:51 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Nov 12 09:56:51 2014 -0500

----------------------------------------------------------------------
 .../apache/blur/store/hdfs/HdfsDirectory.java   |  20 ++-
 .../apache/blur/store/hdfs/HdfsIndexInput.java  |  85 ---------
 .../store/hdfs/HdfsRandomAccessIndexInput.java  |  85 +++++++++
 .../blur/store/hdfs/HdfsStreamIndexInput.java   | 178 +++++++++++++++++++
 .../apache/blur/store/hdfs/MetricsGroup.java    |  17 +-
 5 files changed, 289 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1e545080/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index e770313..d9c616e 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -135,16 +135,23 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   }
 
   private MetricsGroup createNewMetricsGroup(String scope) {
-    MetricName readAccessName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Latency in \u00B5s",
scope);
+    MetricName readRandomAccessName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Random
Latency in \u00B5s", scope);
+    MetricName readStreamAccessName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Stream
Latency in \u00B5s", scope);
     MetricName writeAcccessName = new MetricName(ORG_APACHE_BLUR, HDFS, "Write Latency in
\u00B5s", scope);
-    MetricName readThroughputName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Throughput",
scope);
+    MetricName readRandomThroughputName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Random
Throughput", scope);
+    MetricName readStreamThroughputName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Stream
Throughput", scope);
+    MetricName readSeekName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Stream Seeks",
scope);
     MetricName writeThroughputName = new MetricName(ORG_APACHE_BLUR, HDFS, "Write Throughput",
scope);
 
-    Histogram readAccess = Metrics.newHistogram(readAccessName);
+    Histogram readRandomAccess = Metrics.newHistogram(readRandomAccessName);
+    Histogram readStreamAccess = Metrics.newHistogram(readStreamAccessName);
     Histogram writeAccess = Metrics.newHistogram(writeAcccessName);
-    Meter readThroughput = Metrics.newMeter(readThroughputName, "Read Bytes", TimeUnit.SECONDS);
+    Meter readRandomThroughput = Metrics.newMeter(readRandomThroughputName, "Read Random
Bytes", TimeUnit.SECONDS);
+    Meter readStreamThroughput = Metrics.newMeter(readStreamThroughputName, "Read Stream
Bytes", TimeUnit.SECONDS);
+    Meter readStreamSeek = Metrics.newMeter(readSeekName, "Read Stream Seeks", TimeUnit.SECONDS);
     Meter writeThroughput = Metrics.newMeter(writeThroughputName, "Write Bytes", TimeUnit.SECONDS);
-    return new MetricsGroup(readAccess, writeAccess, readThroughput, writeThroughput);
+    return new MetricsGroup(readRandomAccess, readStreamAccess, writeAccess, readRandomThroughput,
+        readStreamThroughput, readStreamSeek, writeThroughput);
   }
 
   @Override
@@ -209,7 +216,8 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
     FSDataInputStream inputStream = openForInput(name);
     long fileLength = fileLength(name);
-    return new HdfsIndexInput(name, inputStream, fileLength, _metricsGroup, getPath(name));
+    
+    return new HdfsRandomAccessIndexInput(name, inputStream, fileLength, _metricsGroup, getPath(name));
   }
 
   protected synchronized FSDataInputStream openForInput(String name) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1e545080/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
deleted file mode 100644
index 0331afd..0000000
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.store.hdfs;
-
-import java.io.IOException;
-
-import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
-import org.apache.blur.trace.Trace;
-import org.apache.blur.trace.Tracer;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
-
-public class HdfsIndexInput extends ReusedBufferedIndexInput {
-
-  private final long _length;
-  private FSDataInputStream _inputStream;
-  private final MetricsGroup _metricsGroup;
-  private final Path _path;
-
-  public HdfsIndexInput(String name, FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup, Path path)
-      throws IOException {
-    super("HdfsIndexInput(" + path.toString() + ")");
-    _inputStream = inputStream;
-    _length = length;
-    _metricsGroup = metricsGroup;
-    _path = path;
-  }
-
-  @Override
-  public long length() {
-    return _length;
-  }
-
-  @Override
-  protected void seekInternal(long pos) throws IOException {
-
-  }
-
-  @Override
-  protected void readInternal(byte[] b, int offset, int length) throws IOException {
-    Tracer trace = Trace.trace("filesystem - read", Trace.param("file", _path),
-        Trace.param("location", getFilePointer()), Trace.param("length", length));
-    try {
-      long start = System.nanoTime();
-      long filePointer = getFilePointer();
-      while (length > 0) {
-        int amount;
-        amount = _inputStream.read(filePointer, b, offset, length);
-        length -= amount;
-        offset += amount;
-        filePointer += amount;
-      }
-      long end = System.nanoTime();
-      _metricsGroup.readAccess.update((end - start) / 1000);
-      _metricsGroup.readThroughput.mark(length);
-    } finally {
-      trace.done();
-    }
-  }
-
-  @Override
-  public ReusedBufferedIndexInput clone() {
-    HdfsIndexInput clone = (HdfsIndexInput) super.clone();
-    return clone;
-  }
-
-  @Override
-  protected void closeInternal() throws IOException {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1e545080/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
new file mode 100644
index 0000000..2bfca7e
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import java.io.IOException;
+
+import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
+import org.apache.blur.trace.Trace;
+import org.apache.blur.trace.Tracer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+public class HdfsRandomAccessIndexInput extends ReusedBufferedIndexInput {
+
+  private final long _length;
+  private FSDataInputStream _inputStream;
+  private final MetricsGroup _metricsGroup;
+  private final Path _path;
+
+  public HdfsRandomAccessIndexInput(String name, FSDataInputStream inputStream, long length,
MetricsGroup metricsGroup,
+      Path path) throws IOException {
+    super("HdfsRandomAccessIndexInput(" + path.toString() + ")");
+    _inputStream = inputStream;
+    _length = length;
+    _metricsGroup = metricsGroup;
+    _path = path;
+  }
+
+  @Override
+  public long length() {
+    return _length;
+  }
+
+  @Override
+  protected void seekInternal(long pos) throws IOException {
+
+  }
+
+  @Override
+  protected void readInternal(byte[] b, int offset, int length) throws IOException {
+    Tracer trace = Trace.trace("filesystem - read", Trace.param("file", _path),
+        Trace.param("location", getFilePointer()), Trace.param("length", length));
+    try {
+      long start = System.nanoTime();
+      long filePointer = getFilePointer();
+      int olen = length;
+      while (length > 0) {
+        int amount;
+        amount = _inputStream.read(filePointer, b, offset, length);
+        length -= amount;
+        offset += amount;
+        filePointer += amount;
+      }
+      long end = System.nanoTime();
+      _metricsGroup.readRandomAccess.update((end - start) / 1000);
+      _metricsGroup.readRandomThroughput.mark(olen);
+    } finally {
+      trace.done();
+    }
+  }
+
+  @Override
+  public ReusedBufferedIndexInput clone() {
+    return (HdfsRandomAccessIndexInput) super.clone();
+  }
+
+  @Override
+  protected void closeInternal() throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1e545080/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
new file mode 100644
index 0000000..a23fc8d
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import java.io.IOException;
+
+import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.IndexInput;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+
+public class HdfsStreamIndexInput extends IndexInput {
+
+  private final long _length;
+  private final FSDataInputStream _inputStream;
+  private final MetricsGroup _metricsGroup;
+  private final Histogram _readStreamAccess;
+  private final Meter _readStreamThroughput;
+  private final Meter _readStreamSeek;
+
+  private long _postion;
+
+  public HdfsStreamIndexInput(String name, FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup,
+      Path path) throws IOException {
+    super("HdfsStreamIndexInput(" + path.toString() + ")");
+    _inputStream = inputStream;
+    _length = length;
+    _metricsGroup = metricsGroup;
+    _postion = _inputStream.getPos();
+    _readStreamSeek = _metricsGroup.readStreamSeek;
+    _readStreamAccess = _metricsGroup.readStreamAccess;
+    _readStreamThroughput = _metricsGroup.readStreamThroughput;
+  }
+
+  private void checkPosition() throws IOException {
+    long pos = _inputStream.getPos();
+    if (pos != _postion) {
+      _inputStream.seek(pos);
+      _readStreamSeek.mark();
+    }
+  }
+
+  public static boolean isMergeThread() {
+    String name = Thread.currentThread().getName();
+    if (name.startsWith(BlurConstants.SHARED_MERGE_SCHEDULER_PREFIX)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public IndexInput clone() {
+    return super.clone();
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public long getFilePointer() {
+    try {
+      return _inputStream.getPos();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    _postion = pos;
+  }
+
+  @Override
+  public long length() {
+    return _length;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    long start = System.nanoTime();
+    synchronized (_inputStream) {
+      checkPosition();
+      try {
+        return _inputStream.readByte();
+      } finally {
+        _postion++;
+        addMetric(start, 1);
+      }
+    }
+  }
+
+  private void addMetric(long start, int length) {
+    long end = System.nanoTime();
+    _readStreamAccess.update((end - start) / 1000);
+    _readStreamThroughput.mark(length);
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len) throws IOException {
+    long start = System.nanoTime();
+    synchronized (_inputStream) {
+      checkPosition();
+      try {
+        _inputStream.read(b, offset, len);
+      } finally {
+        _postion += len;
+        addMetric(start, len);
+      }
+    }
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException
{
+    readBytes(b, offset, len);
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    long start = System.nanoTime();
+    synchronized (_inputStream) {
+      checkPosition();
+      try {
+        return _inputStream.readShort();
+      } finally {
+        _postion += 2;
+        addMetric(start, 2);
+      }
+    }
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    long start = System.nanoTime();
+    synchronized (_inputStream) {
+      checkPosition();
+      try {
+        return _inputStream.readInt();
+      } finally {
+        _postion += 4;
+        addMetric(start, 4);
+      }
+    }
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    long start = System.nanoTime();
+    synchronized (_inputStream) {
+      checkPosition();
+      try {
+        return _inputStream.readLong();
+      } finally {
+        _postion += 8;
+        addMetric(start, 8);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1e545080/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
index 1088504..9cd6ca6 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
@@ -21,15 +21,22 @@ import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.Meter;
 
 public class MetricsGroup {
-  final Histogram readAccess;
+  final Histogram readRandomAccess;
+  final Histogram readStreamAccess;
   final Histogram writeAccess;
+  final Meter readRandomThroughput;
+  final Meter readStreamThroughput;
+  final Meter readStreamSeek;
   final Meter writeThroughput;
-  final Meter readThroughput;
 
-  MetricsGroup(Histogram readAccess, Histogram writeAccess, Meter readThroughput, Meter writeThroughput)
{
-    this.readAccess = readAccess;
+  MetricsGroup(Histogram readRandomAccess, Histogram readStreamAccess, Histogram writeAccess,
+      Meter readRandomThroughput, Meter readStreamThroughput, Meter readStreamSeek, Meter
writeThroughput) {
+    this.readRandomAccess = readRandomAccess;
+    this.readStreamAccess = readStreamAccess;
     this.writeAccess = writeAccess;
-    this.readThroughput = readThroughput;
+    this.readRandomThroughput = readRandomThroughput;
+    this.readStreamThroughput = readStreamThroughput;
     this.writeThroughput = writeThroughput;
+    this.readStreamSeek = readStreamSeek;
   }
 }
\ No newline at end of file


Mime
View raw message