incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Adding metrics for hdfs data locality.
Date Fri, 01 May 2015 12:55:57 GMT
Adding metrics for hdfs data locality.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/32ef7b39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/32ef7b39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/32ef7b39

Branch: refs/heads/master
Commit: 32ef7b394f9fec72c73ffc4a4b1ce3a711d52105
Parents: 68ca06a
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri May 1 08:55:48 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri May 1 08:55:48 2015 -0400

----------------------------------------------------------------------
 .../apache/blur/store/hdfs/DirectoryUtil.java   |   2 +-
 .../store/hdfs/FSDataInputRandomAccess.java     |   6 ++
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 106 ++++++++++++++++---
 .../apache/blur/store/hdfs/MetricsGroup.java    |   8 +-
 4 files changed, 107 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/32ef7b39/blur-store/src/main/java/org/apache/blur/store/hdfs/DirectoryUtil.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/DirectoryUtil.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/DirectoryUtil.java
index 394c3d9..e3b1a01 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/DirectoryUtil.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/DirectoryUtil.java
@@ -41,7 +41,7 @@ public class DirectoryUtil {
           configuration, getFastDirectoryPath(hdfsDirPath));
       return new JoinDirectory(dir, shortTermStorage);
     } else {
-      LOG.info("Using regular HDFS directory.");
+      LOG.info("Using regular HDFS directory for [{0}].", hdfsDirPath);
       return dir;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/32ef7b39/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputRandomAccess.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputRandomAccess.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputRandomAccess.java
index 673a713..ed23861 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputRandomAccess.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/FSDataInputRandomAccess.java
@@ -19,8 +19,14 @@ package org.apache.blur.store.hdfs;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.fs.Path;
+
 public interface FSDataInputRandomAccess extends Closeable {
 
   int read(long filePointer, byte[] b, int offset, int length) throws IOException;
 
+  Path getPath();
+
+  long length();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/32ef7b39/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 930e2a2..34e3df9 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -25,6 +25,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.lang.ref.WeakReference;
+import java.net.InetAddress;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
@@ -41,6 +42,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
@@ -51,6 +53,7 @@ import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -66,6 +69,7 @@ import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.NoLockFactory;
 
 import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.Meter;
 import com.yammer.metrics.core.MetricName;
@@ -144,6 +148,8 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   protected final boolean _useCache = true;
   protected final boolean _asyncClosing;
   protected final SequentialReadControl _sequentialReadControl;
+  protected final String _hostname;
+  protected final TimerTask _reportOnBlockLocality;
 
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
     this(configuration, path, new SequentialReadControl(new BlurConfiguration()));
@@ -181,6 +187,12 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       }
       _metricsGroup = metricsGroup;
     }
+
+    _hostname = InetAddress.getLocalHost().getHostName();
+    LOG.info("Using hostname [{0}] for data locality checks directory [{1}].", _hostname,
_path);
+    _reportOnBlockLocality = reportOnBlockLocality();
+    TIMER.schedule(_reportOnBlockLocality, TimeUnit.SECONDS.toMillis(30), TimeUnit.SECONDS.toMillis(30));
+
     if (_useCache) {
       if (filesToExpose == null) {
         FileStatus[] listStatus = _fileSystem.listStatus(_path);
@@ -203,6 +215,61 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
   }
 
+  protected TimerTask reportOnBlockLocality() {
+    final Counter totalHdfsBlock = _metricsGroup.totalHdfsBlock;
+    final Counter localHdfsBlock = _metricsGroup.localHdfsBlock;
+    final AtomicLong prevTotalCount = new AtomicLong();
+    final AtomicLong prevLocalCount = new AtomicLong();
+    return new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          long[] counts = runReport();
+          long total = counts[0];
+          long local = counts[1];
+          long prevTotal = prevTotalCount.get();
+          long prevLocal = prevLocalCount.get();
+
+          totalHdfsBlock.inc(total - prevTotal);
+          localHdfsBlock.inc(local - prevLocal);
+
+          prevTotalCount.set(total);
+          prevLocalCount.set(local);
+        } catch (Exception e) {
+          LOG.error("Unknown error.", e);
+        }
+      }
+    };
+  }
+
+  protected long[] runReport() throws IOException {
+    long total = 0;
+    long local = 0;
+    Collection<FSDataInputRandomAccess> values = _inputMap.values();
+    for (FSDataInputRandomAccess inputRandomAccess : values) {
+      Path path = inputRandomAccess.getPath();
+      long length = inputRandomAccess.length();
+      BlockLocation[] blockLocations = _fileSystem.getFileBlockLocations(path, 0, length);
+      for (BlockLocation blockLocation : blockLocations) {
+        if (isLocal(blockLocation)) {
+          local++;
+        }
+        total++;
+      }
+    }
+    return new long[] { total, local };
+  }
+
+  private boolean isLocal(BlockLocation blockLocation) throws IOException {
+    String[] hosts = blockLocation.getHosts();
+    for (String host : hosts) {
+      if (host.equals(_hostname)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private void addToCache(FileStatus fileStatus) throws IOException {
     if (!fileStatus.isDir()) {
       Path p = fileStatus.getPath();
@@ -272,6 +339,8 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     MetricName readStreamThroughputName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Stream
Throughput", scope);
     MetricName readSeekName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Stream Seeks",
scope);
     MetricName writeThroughputName = new MetricName(ORG_APACHE_BLUR, HDFS, "Write Throughput",
scope);
+    MetricName totalHdfsBlocks = new MetricName(ORG_APACHE_BLUR, HDFS, "Hdfs Blocks Total",
scope);
+    MetricName localHdfsBlocks = new MetricName(ORG_APACHE_BLUR, HDFS, "Hdfs Blocks Local",
scope);
 
     Histogram readRandomAccess = Metrics.newHistogram(readRandomAccessName);
     Histogram readStreamAccess = Metrics.newHistogram(readStreamAccessName);
@@ -280,8 +349,11 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     Meter readStreamThroughput = Metrics.newMeter(readStreamThroughputName, "Read Stream
Bytes", TimeUnit.SECONDS);
     Meter readStreamSeek = Metrics.newMeter(readSeekName, "Read Stream Seeks", TimeUnit.SECONDS);
     Meter writeThroughput = Metrics.newMeter(writeThroughputName, "Write Bytes", TimeUnit.SECONDS);
+    Counter totalHdfsBlock = Metrics.newCounter(totalHdfsBlocks);
+    Counter localHdfsBlock = Metrics.newCounter(localHdfsBlocks);
+
     return new MetricsGroup(readRandomAccess, readStreamAccess, writeAccess, readRandomThroughput,
-        readStreamThroughput, readStreamSeek, writeThroughput);
+        readStreamThroughput, readStreamSeek, writeThroughput, totalHdfsBlock, localHdfsBlock);
   }
 
   @Override
@@ -318,8 +390,9 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       @Override
       public void close() throws IOException {
         super.close();
+        long length = outputStream.getPos();
         if (_useCache) {
-          _fileStatusMap.put(name, new FStat(System.currentTimeMillis(), outputStream.getPos()));
+          _fileStatusMap.put(name, new FStat(System.currentTimeMillis(), length));
         }
         // This exists because HDFS is so slow to close files. There are
         // built-in sleeps during the close call.
@@ -329,7 +402,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         } else {
           outputStream.close();
         }
-        openForInput(name);
+        openForInput(name, length);
       }
 
       @Override
@@ -349,25 +422,20 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
   }
 
-  private Path getCopyOutputTmpPath(String name) throws IOException {
-    long fileModified = getFileModified(name);
-    return new Path(_path, name + "~" + fileModified + "_" + System.currentTimeMillis() +
TMP);
-  }
-
   @Override
   public IndexInput openInput(String name, IOContext context) throws IOException {
     LOG.debug("openInput [{0}] [{1}] [{2}]", name, context, getPath());
     if (!fileExists(name)) {
       throw new FileNotFoundException("File [" + name + "] not found.");
     }
-    FSDataInputRandomAccess inputRandomAccess = openForInput(name);
     long fileLength = fileLength(name);
+    FSDataInputRandomAccess inputRandomAccess = openForInput(name, fileLength);
     HdfsIndexInput input = new HdfsIndexInput(this, inputRandomAccess, fileLength, _metricsGroup,
name,
         _sequentialReadControl.clone());
     return input;
   }
 
-  protected synchronized FSDataInputRandomAccess openForInput(String name) throws IOException
{
+  protected synchronized FSDataInputRandomAccess openForInput(String name, long length) throws
IOException {
     final Path path = getPath(name);
     FSDataInputRandomAccess input = _inputMap.get(path);
     if (input != null) {
@@ -376,7 +444,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     Tracer trace = Trace.trace("filesystem - open", Trace.param("path", path));
     try {
       final FSDataInputStream inputStream = _fileSystem.open(path);
-      FSDataInputRandomAccess randomInputStream = toFSDataInputRandomAccess(path, inputStream);
+      FSDataInputRandomAccess randomInputStream = toFSDataInputRandomAccess(path, inputStream,
length);
       _inputMap.put(name, randomInputStream);
       return randomInputStream;
     } finally {
@@ -384,7 +452,8 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
   }
 
-  private FSDataInputRandomAccess toFSDataInputRandomAccess(final Path path, final FSDataInputStream
inputStream) {
+  protected FSDataInputRandomAccess toFSDataInputRandomAccess(final Path path, final FSDataInputStream
inputStream,
+      final long length) {
     FSDataInputRandomAccess randomInputStream = new FSDataInputRandomAccess() {
 
       @Override
@@ -402,6 +471,16 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         return path.toString();
       }
 
+      @Override
+      public Path getPath() {
+        return path;
+      }
+
+      @Override
+      public long length() {
+        return length;
+      }
+
     };
     return randomInputStream;
   }
@@ -531,7 +610,8 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
 
   @Override
   public void close() throws IOException {
-
+    _reportOnBlockLocality.cancel();
+    TIMER.purge();
   }
 
   public Path getPath() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/32ef7b39/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
index 9cd6ca6..30af7fd 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
@@ -17,6 +17,7 @@
 
 package org.apache.blur.store.hdfs;
 
+import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.Meter;
 
@@ -28,9 +29,12 @@ public class MetricsGroup {
   final Meter readStreamThroughput;
   final Meter readStreamSeek;
   final Meter writeThroughput;
+  final Counter totalHdfsBlock;
+  final Counter localHdfsBlock;
 
   MetricsGroup(Histogram readRandomAccess, Histogram readStreamAccess, Histogram writeAccess,
-      Meter readRandomThroughput, Meter readStreamThroughput, Meter readStreamSeek, Meter
writeThroughput) {
+      Meter readRandomThroughput, Meter readStreamThroughput, Meter readStreamSeek, Meter
writeThroughput,
+      Counter totalHdfsBlock, Counter localHdfsBlock) {
     this.readRandomAccess = readRandomAccess;
     this.readStreamAccess = readStreamAccess;
     this.writeAccess = writeAccess;
@@ -38,5 +42,7 @@ public class MetricsGroup {
     this.readStreamThroughput = readStreamThroughput;
     this.writeThroughput = writeThroughput;
     this.readStreamSeek = readStreamSeek;
+    this.totalHdfsBlock = totalHdfsBlock;
+    this.localHdfsBlock = localHdfsBlock;
   }
 }
\ No newline at end of file


Mime
View raw message