incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [5/7] git commit: Changing the behavior of the hdfs kv store to share a single timer instead of each instance creating a seperate thread for background operations.
Date Tue, 28 Oct 2014 01:59:32 GMT
Changing the behavior of the hdfs kv store to share a single timer instead of each instance
creating a seperate thread for background operations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/afb9abdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/afb9abdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/afb9abdd

Branch: refs/heads/master
Commit: afb9abdd7cc2e8f25b16f543af79cc07ec3f6d4e
Parents: 9831e30
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Oct 27 21:50:14 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Oct 27 21:50:26 2014 -0400

----------------------------------------------------------------------
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 86 +++++++++-----------
 1 file changed, 40 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/afb9abdd/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index ba68c33..a4108d8 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -34,11 +34,12 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -76,6 +77,7 @@ public class HdfsKeyValueStore implements Store {
   private static final long MAX_OPEN_FOR_WRITING = TimeUnit.MINUTES.toMillis(1);
   private static final long DAEMON_POLL_TIME = TimeUnit.SECONDS.toMillis(5);
   private static final int VERSION_LENGTH = 4;
+  private static final Timer HDFS_KEY_VALUE_TIMER;
 
   static {
     try {
@@ -83,6 +85,12 @@ public class HdfsKeyValueStore implements Store {
     } catch (UnsupportedEncodingException e) {
       throw new RuntimeException(e);
     }
+    HDFS_KEY_VALUE_TIMER = new Timer("HDFS KV Store", true);
+  }
+  
+  public static void stopCleanupTimer() {
+    HDFS_KEY_VALUE_TIMER.cancel();
+    HDFS_KEY_VALUE_TIMER.purge();
   }
 
   static enum OperationType {
@@ -134,7 +142,7 @@ public class HdfsKeyValueStore implements Store {
     public int compare(BytesRef b1, BytesRef b2) {
       return WritableComparator.compareBytes(b1.bytes, b1.offset, b1.length, b2.bytes, b2.offset,
b2.length);
     }
-  };;
+  };
 
   static class Value {
     Value(BytesRef bytesRef, Path path) {
@@ -155,10 +163,8 @@ public class HdfsKeyValueStore implements Store {
   private final AtomicLong _currentFileCounter = new AtomicLong();
   private final WriteLock _writeLock;
   private final ReadLock _readLock;
-  private final Thread _daemon;
   private final AtomicLong _size = new AtomicLong();
 
-  private final AtomicBoolean _running = new AtomicBoolean(true);
   private final AtomicLong _lastWrite = new AtomicLong();
   private FSDataOutputStream _output;
   private Path _outputPath;
@@ -185,7 +191,7 @@ public class HdfsKeyValueStore implements Store {
     }
     removeAnyTruncatedFiles();
     loadIndexes();
-    _daemon = startDaemon();
+    startDaemon();
     Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, HDFS_KV, SIZE, path.getParent().toString()),
new Gauge<Long>() {
       @Override
       public Long value() {
@@ -209,54 +215,44 @@ public class HdfsKeyValueStore implements Store {
     }
   }
 
-  private Thread startDaemon() {
-    Thread thread = new Thread(new Runnable() {
+  private void startDaemon() {
+    _writeLock.lock();
+    try {
+      try {
+        cleanupOldFiles();
+      } catch (IOException e) {
+        LOG.error("Unknown error while trying to clean up old files.", e);
+      }
+    } finally {
+      _writeLock.unlock();
+    }
+    HDFS_KEY_VALUE_TIMER.schedule(new TimerTask() {
       @Override
       public void run() {
         _writeLock.lock();
         try {
-          try {
-            cleanupOldFiles();
-          } catch (IOException e) {
-            LOG.error("Unknown error while trying to clean up old files.", e);
+          if (_output != null && _lastWrite.get() + MAX_OPEN_FOR_WRITING < System.currentTimeMillis())
{
+            // Close writer
+            LOG.info("Closing KV log due to inactivity [{0}].", _path);
+            try {
+              _output.close();
+            } catch (IOException e) {
+              LOG.error("Unknown error while trying to close output file.", e);
+            } finally {
+              _output = null;
+            }
+            try {
+              cleanupOldFiles();
+            } catch (IOException e) {
+              LOG.error("Unknown error while trying to clean up old files.", e);
+            }
           }
         } finally {
           _writeLock.unlock();
         }
-        while (_running.get()) {
-          _writeLock.lock();
-          try {
-            if (_output != null && _lastWrite.get() + MAX_OPEN_FOR_WRITING < System.currentTimeMillis())
{
-              // Close writer
-              LOG.info("Closing KV log due to inactivity [{0}].", _path);
-              try {
-                _output.close();
-              } catch (IOException e) {
-                LOG.error("Unknown error while trying to close output file.", e);
-              } finally {
-                _output = null;
-              }
-              try {
-                cleanupOldFiles();
-              } catch (IOException e) {
-                LOG.error("Unknown error while trying to clean up old files.", e);
-              }
-            }
-          } finally {
-            _writeLock.unlock();
-          }
-          try {
-            Thread.sleep(DAEMON_POLL_TIME);
-          } catch (InterruptedException e) {
-            return;
-          }
-        }
       }
-    });
-    thread.setName("HDFS KV Store [" + _path + "]");
-    thread.setDaemon(true);
-    thread.start();
-    return thread;
+    }, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
+
   }
 
   @Override
@@ -461,8 +457,6 @@ public class HdfsKeyValueStore implements Store {
   public void close() throws IOException {
     if (_isClosed) {
       _writeLock.lock();
-      _running.set(false);
-      _daemon.interrupt();
       try {
         syncInternal();
         if (_output != null) {


Mime
View raw message