flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: FLUME-2235. idleFuture should be cancelled at the start of append
Date Thu, 07 Nov 2013 22:55:24 GMT
Updated Branches:
  refs/heads/flume-1.5 b17626f72 -> ad612c28b


FLUME-2235. idleFuture should be cancelled at the start of append

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ad612c28
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ad612c28
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ad612c28

Branch: refs/heads/flume-1.5
Commit: ad612c28bec3845775e12d6bc51ef724e6f78f06
Parents: b17626f
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Thu Nov 7 14:53:04 2013 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Thu Nov 7 14:54:09 2013 -0800

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/BucketWriter.java    | 21 ++++++++++++++++++++
 1 file changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ad612c28/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 65f4d2c..200d457 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -375,6 +375,27 @@ class BucketWriter {
   public synchronized void append(final Event event)
           throws IOException, InterruptedException {
     checkAndThrowInterruptedException();
+    // If idleFuture is not null, cancel it before we move forward to avoid a
+    // close call in the middle of the append.
+    if(idleFuture != null) {
+      idleFuture.cancel(false);
+      // There is still a small race condition - if the idleFuture is already
+      // running, interrupting it can cause HDFS close operation to throw -
+      // so we cannot interrupt it while running. If the future could not be
+      // cancelled, it is already running - wait for it to finish before
+      // attempting to write.
+      if(!idleFuture.isDone()) {
+        try {
+          idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException ex) {
+          LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +
+            " file close may have failed", ex);
+        } catch (Exception ex) {
+          LOG.warn("Error while trying to cancel closing of idle file. ", ex);
+        }
+      }
+      idleFuture = null;
+    }
     if (!isOpen) {
       if(idleClosed) {
         throw new IOException("This bucket writer was closed due to idling and this handle
" +


Mime
View raw message