incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding some fixes. Adding a metric for the hdfs kv store cache. Also adding a GC method to the fast directory so that old files may be cleaned up when errors occur.
Date Tue, 06 May 2014 03:26:34 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/apache-blur-0.2 0f7c84841 -> 225d54c9b


Adding some fixes.  Adding a metric for the hdfs kv store cache.  Also adding a GC method
to the fast directory so that old files may be cleaned up when errors occur.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/225d54c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/225d54c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/225d54c9

Branch: refs/heads/apache-blur-0.2
Commit: 225d54c9bd508e722f6be248929ce6257740fa90
Parents: 0f7c848
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon May 5 23:26:27 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon May 5 23:26:27 2014 -0400

----------------------------------------------------------------------
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      | 55 +++++++++++++++++++-
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 48 ++++++++++++++---
 .../apache/blur/metrics/MetricsConstants.java   |  1 +
 3 files changed, 95 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/225d54c9/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
index 21e3248..c7cb326 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -18,11 +18,15 @@ package org.apache.blur.store.hdfs_v2;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -38,6 +42,8 @@ import org.apache.lucene.util.BytesRef;
 
 public class FastHdfsKeyValueDirectory extends Directory implements LastModified {
 
+  private static final long GC_DELAY = TimeUnit.HOURS.toMillis(1);
+
   private static final Log LOG = LogFactory.getLog(FastHdfsKeyValueDirectory.class);
 
   private static final String LASTMOD = "/lastmod";
@@ -48,8 +54,11 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
   private final Map<String, Long> _files = new ConcurrentHashMap<String, Long>();
   private final HdfsKeyValueStore _store;
   private final int _blockSize = 4096;
+  private final Path _path;
+  private long _lastGc;
 
   public FastHdfsKeyValueDirectory(Configuration configuration, Path path) throws IOException
{
+    _path = path;
     _store = new HdfsKeyValueStore(configuration, path);
     BytesRef value = new BytesRef();
     if (_store.get(FILES, value)) {
@@ -63,12 +72,40 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
         if (_store.get(key, value)) {
           _files.put(file, Long.parseLong(value.utf8ToString()));
         } else {
-          _files.put(file, 0L);
+          // _files.put(file, 0L);
           LOG.warn("Missing meta data for file [{0}], setting length to '0'.", file);
         }
       }
     }
     setLockFactory(NoLockFactory.getNoLockFactory());
+    writeFilesNames();
+    gc();
+  }
+
+  public void gc() throws IOException {
+    LOG.info("Running GC over the hdfs kv directory [{0}].", _path);
+    Iterable<Entry<BytesRef, BytesRef>> scan = _store.scan(null);
+    List<BytesRef> toBeDeleted = new ArrayList<BytesRef>();
+    for (Entry<BytesRef, BytesRef> e : scan) {
+      BytesRef bytesRef = e.getKey();
+      if (bytesRef.equals(FILES)) {
+        continue;
+      }
+      String key = bytesRef.utf8ToString();
+      int indexOf = key.indexOf('/');
+      if (indexOf < 0) {
+        LOG.error("Unknown key type in hdfs kv store [" + key + "]");
+      } else {
+        String filename = key.substring(0, indexOf);
+        if (!_files.containsKey(filename)) {
+          toBeDeleted.add(bytesRef);
+        }
+      }
+    }
+    for (BytesRef key : toBeDeleted) {
+      _store.delete(key);
+    }
+    _lastGc = System.currentTimeMillis();
   }
 
   public void writeBlock(String name, long blockId, byte[] b, int offset, int length) throws
IOException {
@@ -118,13 +155,16 @@ public class FastHdfsKeyValueDirectory extends Directory implements
LastModified
 
   @Override
   public boolean fileExists(String name) throws IOException {
-    return _files.containsKey(name);
+    boolean containsKey = _files.containsKey(name);
+    LOG.debug("FileExists [{0}] [{1}].", name, containsKey);
+    return containsKey;
   }
 
   @Override
   public void deleteFile(String name) throws IOException {
     Long length = _files.remove(name);
     if (length != null) {
+      LOG.debug("Removing file [{0}] with length [{1}].", name, length);
       long blocks = length / _blockSize;
       _store.delete(new BytesRef(name + LENGTH));
       _store.delete(new BytesRef(name + LASTMOD));
@@ -144,7 +184,18 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
 
   @Override
   public void sync(Collection<String> names) throws IOException {
+    writeFilesNames();
     _store.sync();
+    if (shouldPerformGC()) {
+      gc();
+    }
+  }
+
+  private boolean shouldPerformGC() {
+    if (_lastGc + GC_DELAY < System.currentTimeMillis()) {
+      return true;
+    }
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/225d54c9/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index b89387f..71fd10e 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -16,6 +16,10 @@
  */
 package org.apache.blur.store.hdfs_v2;
 
+import static org.apache.blur.metrics.MetricsConstants.HDFS_KV;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+import static org.apache.blur.metrics.MetricsConstants.SIZE;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.FilterInputStream;
@@ -56,6 +60,10 @@ import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.lucene.util.BytesRef;
 
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+
 public class HdfsKeyValueStore implements Store {
 
   private static final String BLUR_KEY_VALUE = "blur_key_value";
@@ -147,6 +155,7 @@ public class HdfsKeyValueStore implements Store {
   private final WriteLock _writeLock;
   private final ReadLock _readLock;
   private final Thread _daemon;
+  private final AtomicLong _size = new AtomicLong();
 
   private final AtomicBoolean _running = new AtomicBoolean(true);
   private final AtomicLong _lastWrite = new AtomicLong();
@@ -176,6 +185,12 @@ public class HdfsKeyValueStore implements Store {
     openWriter();
     _daemon = startDaemon();
     cleanupOldFiles();
+    Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, HDFS_KV, SIZE, path.getParent().toString()),
new Gauge<Long>() {
+      @Override
+      public Long value() {
+        return _size.get();
+      }
+    });
   }
 
   private Thread startDaemon() {
@@ -296,7 +311,12 @@ public class HdfsKeyValueStore implements Store {
     try {
       Operation op = getPutOperation(OperationType.PUT, key, value);
       Path path = write(op);
-      _pointers.put(BytesRef.deepCopyOf(key), new Value(BytesRef.deepCopyOf(value), path));
+      BytesRef deepCopyOf = BytesRef.deepCopyOf(value);
+      _size.addAndGet(deepCopyOf.bytes.length);
+      Value old = _pointers.put(BytesRef.deepCopyOf(key), new Value(deepCopyOf, path));
+      if (old != null) {
+        _size.addAndGet(-old._bytesRef.bytes.length);
+      }
       if (!path.equals(_outputPath)) {
         cleanupOldFiles();
       }
@@ -325,11 +345,16 @@ public class HdfsKeyValueStore implements Store {
   }
 
   private void rollFile() throws IOException {
+    logSize();
     LOG.info("Rolling file [" + _outputPath + "]");
     _output.close();
     openWriter();
   }
 
+  private void logSize() {
+
+  }
+
   public void cleanupOldFiles() throws IOException {
     SortedSet<FileStatus> fileStatusSet = getSortedSet(_path);
     if (fileStatusSet == null || fileStatusSet.size() < 1) {
@@ -394,7 +419,10 @@ public class HdfsKeyValueStore implements Store {
     try {
       Operation op = getDeleteOperation(OperationType.DELETE, key);
       write(op);
-      _pointers.remove(key);
+      Value old = _pointers.remove(key);
+      if (old != null) {
+        _size.addAndGet(-old._bytesRef.bytes.length);
+      }
     } catch (RemoteException e) {
       throw new IOException("Another HDFS KeyStore has likely taken ownership of this key
value store.", e);
     } catch (LeaseExpiredException e) {
@@ -424,6 +452,7 @@ public class HdfsKeyValueStore implements Store {
   }
 
   private void openWriter() throws IOException {
+    logSize();
     long nextSegment = _currentFileCounter.incrementAndGet();
     String name = buffer(nextSegment);
     _outputPath = new Path(_path, name);
@@ -497,17 +526,22 @@ public class HdfsKeyValueStore implements Store {
   }
 
   private void loadIndex(Path path, Operation operation) {
+    Value old;
     switch (operation.type) {
     case PUT:
-      _pointers.put(BytesRef.deepCopyOf(getKey(operation.key)), new Value(BytesRef.deepCopyOf(getKey(operation.value)),
-          path));
-      return;
+      BytesRef deepCopyOf = BytesRef.deepCopyOf(getKey(operation.value));
+      _size.addAndGet(deepCopyOf.bytes.length);
+      old = _pointers.put(BytesRef.deepCopyOf(getKey(operation.key)), new Value(deepCopyOf,
path));
+      break;
     case DELETE:
-      _pointers.remove(getKey(operation.key));
-      return;
+      old = _pointers.remove(getKey(operation.key));
+      break;
     default:
       throw new RuntimeException("Not supported [" + operation.type + "]");
     }
+    if (old != null) {
+      _size.addAndGet(-old._bytesRef.bytes.length);
+    }
   }
 
   private BytesRef getKey(BytesWritable key) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/225d54c9/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java b/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
index 166b4cf..e0609c2 100644
--- a/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
@@ -45,6 +45,7 @@ public class MetricsConstants {
   public static final String REMOVAL = "Removal";
   public static final String MISS = "Miss";
   public static final String CACHE = "Cache";
+  public static final String HDFS_KV = "HDFS-KV";
   public static final String DEEP_PAGING_CACHE = "DeepPagingCache";
   public static final String CACHE_POOL = "CachePool";
   public static final String JVM = "JVM";


Mime
View raw message