flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1748: HDFS Sink should check if the thread is interrupted before performing any HDFS operations
Date Sun, 02 Dec 2012 22:40:44 GMT
Updated Branches:
  refs/heads/trunk 97ed09e6f -> aa549c4f2


FLUME-1748: HDFS Sink should check if the thread is interrupted before performing any HDFS
operations

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/aa549c4f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/aa549c4f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/aa549c4f

Branch: refs/heads/trunk
Commit: aa549c4f27db848cb8900533fd0f16562d971aa2
Parents: 97ed09e
Author: Brock Noland <brock@apache.org>
Authored: Sun Dec 2 16:39:59 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Sun Dec 2 16:39:59 2012 -0600

----------------------------------------------------------------------
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   31 ++++++++++++++-
 1 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/aa549c4f/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 58ebe49..d0ff6e3 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -165,6 +165,7 @@ class BucketWriter {
   /**
    * open() is called by append()
    * @throws IOException
+   * @throws InterruptedException
    */
   private void open() throws IOException, InterruptedException {
     runPrivileged(new PrivilegedExceptionAction<Void>() {
@@ -178,8 +179,9 @@ class BucketWriter {
   /**
    * doOpen() must only be called by open()
    * @throws IOException
+   * @throws InterruptedException
    */
-  private void doOpen() throws IOException {
+  private void doOpen() throws IOException, InterruptedException {
     if ((filePath == null) || (writer == null) || (formatter == null)) {
       throw new IOException("Invalid file settings");
     }
@@ -194,6 +196,7 @@ class BucketWriter {
     // NOTE: tried synchronizing on the underlying Kerberos principal previously
     // which caused deadlocks. See FLUME-1231.
     synchronized (staticLock) {
+      checkAndThrowInterruptedException();
       try {
         long counter = fileExtensionCounter.incrementAndGet();
         if (codeC == null) {
@@ -252,8 +255,10 @@ class BucketWriter {
    * Close the file handle and rename the temp file to the permanent filename.
    * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
    * @throws IOException On failure to rename if temp file exists.
+   * @throws InterruptedException
    */
   public synchronized void close() throws IOException, InterruptedException {
+    checkAndThrowInterruptedException();
     flush();
     runPrivileged(new PrivilegedExceptionAction<Void>() {
       public Void run() throws Exception {
@@ -302,8 +307,11 @@ class BucketWriter {
 
   /**
    * flush the data
+   * @throws IOException
+   * @throws InterruptedException
    */
   public synchronized void flush() throws IOException, InterruptedException {
+    checkAndThrowInterruptedException();
     if (!isBatchComplete()) {
       runPrivileged(new PrivilegedExceptionAction<Void>() {
         public Void run() throws Exception {
@@ -354,8 +362,13 @@ class BucketWriter {
    * We rotate before append, and not after, so that the active file rolling
    * mechanism will never roll an empty file. This also ensures that the file
    * creation time reflects when the first event was written.
+   *
+   * @throws IOException
+   * @throws InterruptedException
    */
-  public synchronized void append(Event event) throws IOException, InterruptedException {
+  public synchronized void append(Event event)
+          throws IOException, InterruptedException {
+    checkAndThrowInterruptedException();
     if (!isOpen) {
       if(idleClosed) {
         throw new IOException("This bucket writer was closed due to idling and this handle
" +
@@ -442,4 +455,18 @@ class BucketWriter {
   void setClock(Clock clock) {
       this.clock = clock;
   }
+
+  /**
+   * This method if the current thread has been interrupted and throws an
+   * exception.
+   * @throws InterruptedException
+   */
+  private static void checkAndThrowInterruptedException()
+          throws InterruptedException {
+    if (Thread.currentThread().interrupted()) {
+      throw new InterruptedException("Timed out before HDFS call was made. "
+              + "Your hdfs.callTimeout might be set too low or HDFS calls are "
+              + "taking too long.");
+    }
+  }
 }


Mime
View raw message