incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/3] git commit: Adding a daemon thread to the hdfs key value store so that the open writing file can be closed after there is no write activity for one minute. It will reopen when needed.
Date Tue, 06 May 2014 02:07:39 GMT
Adding a daemon thread to the hdfs key value store so that the open writing file can be closed
after there is no write activity for one minute.  It will reopen when needed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9f77203b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9f77203b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9f77203b

Branch: refs/heads/apache-blur-0.2
Commit: 9f77203b096600b1c051be0dbb51ff3352380585
Parents: 654ef10
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon May 5 22:05:33 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon May 5 22:05:33 2014 -0400

----------------------------------------------------------------------
 .../manager/writer/BlurIndexSimpleWriter.java   |  3 +
 .../manager/writer/MutationQueueProcessor.java  |  9 +++
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 72 +++++++++++++++++++-
 3 files changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f77203b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 91806c1..166ba28 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -370,6 +370,9 @@ public class BlurIndexSimpleWriter extends BlurIndex {
       for (RowMutation mutation : mutations) {
         _queue.put(mutation);
       }
+      synchronized (_queue) {
+        _queue.notifyAll();
+      }
     } catch (InterruptedException e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f77203b/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueueProcessor.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueueProcessor.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueueProcessor.java
index 4b1b701..f76f375 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueueProcessor.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutationQueueProcessor.java
@@ -152,6 +152,15 @@ public class MutationQueueProcessor implements Runnable, Closeable {
           _didMutates = false;
         }
         lst.clear();
+        if (!_didMutates) {
+          synchronized (_queue) {
+            try {
+              _queue.wait();
+            } catch (InterruptedException e) {
+              throw new IOException(e);
+            }
+          }
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f77203b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index 7d6d6ea..b89387f 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -33,6 +33,8 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -63,6 +65,8 @@ public class HdfsKeyValueStore implements Store {
   private static final Log LOG = LogFactory.getLog(HdfsKeyValueStore.class);
   private static final byte[] MAGIC;
   private static final int VERSION = 1;
+  private static final long MAX_OPEN_FOR_WRITING = TimeUnit.MINUTES.toMillis(1);
+  private static final long DAEMON_POLL_TIME = TimeUnit.SECONDS.toMillis(5);
 
   static {
     try {
@@ -142,6 +146,10 @@ public class HdfsKeyValueStore implements Store {
   private final AtomicLong _currentFileCounter = new AtomicLong();
   private final WriteLock _writeLock;
   private final ReadLock _readLock;
+  private final Thread _daemon;
+
+  private final AtomicBoolean _running = new AtomicBoolean(true);
+  private final AtomicLong _lastWrite = new AtomicLong();
   private FSDataOutputStream _output;
   private Path _outputPath;
   private final long _maxAmountAllowedPerFile;
@@ -166,12 +174,55 @@ public class HdfsKeyValueStore implements Store {
     }
     loadIndexes();
     openWriter();
+    _daemon = startDaemon();
+    cleanupOldFiles();
+  }
+
+  private Thread startDaemon() {
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (_running.get()) {
+          _writeLock.lock();
+          try {
+            if (_output != null && _lastWrite.get() + MAX_OPEN_FOR_WRITING < System.currentTimeMillis())
{
+              // Close writer
+              LOG.info("Closing KV log due to inactivity [{0}].", _path);
+              try {
+                _output.close();
+              } catch (IOException e) {
+                LOG.error("Unknown error while trying to close output file.", e);
+              } finally {
+                _output = null;
+              }
+              try {
+                cleanupOldFiles();
+              } catch (IOException e) {
+                LOG.error("Unknown error while trying to clean up old files.", e);
+              }
+            }
+          } finally {
+            _writeLock.unlock();
+          }
+          try {
+            Thread.sleep(DAEMON_POLL_TIME);
+          } catch (InterruptedException e) {
+            return;
+          }
+        }
+      }
+    });
+    thread.setName("HDFS KV Store [" + _path + "]");
+    thread.setDaemon(true);
+    thread.start();
+    return thread;
   }
 
   @Override
   public void sync() throws IOException {
     ensureOpen();
     _writeLock.lock();
+    ensureOpenForWriting();
     try {
       syncInternal();
     } catch (RemoteException e) {
@@ -241,6 +292,7 @@ public class HdfsKeyValueStore implements Store {
       return;
     }
     _writeLock.lock();
+    ensureOpenForWriting();
     try {
       Operation op = getPutOperation(OperationType.PUT, key, value);
       Path path = write(op);
@@ -257,6 +309,12 @@ public class HdfsKeyValueStore implements Store {
     }
   }
 
+  private void ensureOpenForWriting() throws IOException {
+    if (_output == null) {
+      openWriter();
+    }
+  }
+
   private Path write(Operation op) throws IOException {
     op.write(_output);
     Path p = _outputPath;
@@ -274,6 +332,9 @@ public class HdfsKeyValueStore implements Store {
 
   public void cleanupOldFiles() throws IOException {
     SortedSet<FileStatus> fileStatusSet = getSortedSet(_path);
+    if (fileStatusSet == null || fileStatusSet.size() < 1) {
+      return;
+    }
     Path newestGen = fileStatusSet.last().getPath();
     if (!newestGen.equals(_outputPath)) {
       throw new IOException("No longer the owner of [" + _path + "]");
@@ -329,6 +390,7 @@ public class HdfsKeyValueStore implements Store {
   public void delete(BytesRef key) throws IOException {
     ensureOpen();
     _writeLock.lock();
+    ensureOpenForWriting();
     try {
       Operation op = getDeleteOperation(OperationType.DELETE, key);
       write(op);
@@ -346,9 +408,13 @@ public class HdfsKeyValueStore implements Store {
   public void close() throws IOException {
     if (_isClosed) {
       _writeLock.lock();
+      _running.set(false);
+      _daemon.interrupt();
       try {
         syncInternal();
-        _output.close();
+        if (_output != null) {
+          _output.close();
+        }
         _fileSystem.close();
         _isClosed = true;
       } finally {
@@ -361,10 +427,11 @@ public class HdfsKeyValueStore implements Store {
     long nextSegment = _currentFileCounter.incrementAndGet();
     String name = buffer(nextSegment);
     _outputPath = new Path(_path, name);
+    LOG.info("Opening for writing [{0}].", _outputPath);
     _output = _fileSystem.create(_outputPath, false);
     _output.write(MAGIC);
     _output.writeInt(VERSION);
-    _output.sync();
+    syncInternal();
   }
 
   private String buffer(long number) {
@@ -397,6 +464,7 @@ public class HdfsKeyValueStore implements Store {
   private void syncInternal() throws IOException {
     _output.flush();
     _output.sync();
+    _lastWrite.set(System.currentTimeMillis());
   }
 
   private void loadIndex(Path path) throws IOException {


Mime
View raw message