flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From juha...@apache.org
Subject git commit: FLUME-1930: Inflights should clean up executors on close
Date Thu, 14 Mar 2013 06:57:50 GMT
Updated Branches:
  refs/heads/flume-1.4 3dc9069a3 -> 8fad6b880


FLUME-1930: Inflights should clean up executors on close

(Hari Shreedharan via Juhani Connolly)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8fad6b88
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8fad6b88
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8fad6b88

Branch: refs/heads/flume-1.4
Commit: 8fad6b880aafb5191b120144d560f20e9ca6cf42
Parents: 3dc9069
Author: Juhani Connolly <juhani_connolly@cyberagent.co.jp>
Authored: Thu Mar 14 15:57:23 2013 +0900
Committer: Juhani Connolly <juhani_connolly@cyberagent.co.jp>
Committed: Thu Mar 14 15:57:23 2013 +0900

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/FileChannel.java |    8 ++++-
 .../apache/flume/channel/file/FlumeEventQueue.java |   28 ++++++--------
 .../java/org/apache/flume/channel/file/Log.java    |    2 +-
 .../org/apache/flume/channel/file/TestLog.java     |    2 +-
 4 files changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/8fad6b88/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index d98209b..ff42d19 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -380,7 +381,12 @@ public class FileChannel extends BasicChannelSemantics {
   void close() {
     if(open) {
       open = false;
-      log.close();
+      try {
+        log.close();
+      } catch (Exception e) {
+        LOG.error("Error while trying to close the log.", e);
+        Throwables.propagate(e);
+      }
       log = null;
       queueRemaining = null;
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/8fad6b88/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index 72d9425..1ed9547 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -30,10 +30,12 @@ import java.util.Collection;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -322,9 +324,11 @@ final class FlumeEventQueue {
     return backingStore.getCapacity();
   }
 
-  synchronized void close() {
+  synchronized void close() throws IOException {
     try {
       backingStore.close();
+      inflightPuts.close();
+      inflightTakes.close();
     } catch (IOException e) {
       LOG.warn("Error closing backing store", e);
     }
@@ -442,21 +446,9 @@ final class FlumeEventQueue {
         }
         byte[] checksum = digest.digest(buffer.array());
         file.write(checksum);
-        future = Executors.newSingleThreadExecutor().submit(
-                new Runnable() {
-                  @Override
-                  public void run() {
-                    try {
-                      buffer.position(0);
-                      fileChannel.write(buffer);
-                      fileChannel.force(true);
-                    } catch (IOException ex) {
-                      LOG.error("Error while writing inflight events to "
-                              + "inflights file: "
-                              + inflightEventsFile.getName());
-                    }
-                  }
-                });
+        buffer.position(0);
+        fileChannel.write(buffer);
+        fileChannel.force(true);
         syncRequired = false;
       } catch (IOException ex) {
         LOG.error("Error while writing checkpoint to disk.", ex);
@@ -527,5 +519,9 @@ final class FlumeEventQueue {
     public Collection<Long> getInFlightPointers() {
       return inflightEvents.values();
     }
+
+    public void close() throws IOException {
+      file.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/8fad6b88/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 7da8c49..6ffc824 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -677,7 +677,7 @@ class Log {
    * Synchronization not required since this method gets the write lock,
    * so checkpoint and this method cannot run at the same time.
    */
-  void close() {
+  void close() throws IOException{
     lockExclusive();
     try {
       open = false;

http://git-wip-us.apache.org/repos/asf/flume/blob/8fad6b88/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index 6751714..54978f8 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -60,7 +60,7 @@ public class TestLog {
     log.replay();
   }
   @After
-  public void cleanup() {
+  public void cleanup() throws Exception{
     if(log != null) {
       log.close();
     }


Mime
View raw message