flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-2044. HDFS Sink impersonation fails after the first file.
Date Wed, 15 May 2013 05:33:37 GMT
Updated Branches:
  refs/heads/trunk e9719a889 -> 34621d7ea


FLUME-2044. HDFS Sink impersonation fails after the first file.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/34621d7e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/34621d7e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/34621d7e

Branch: refs/heads/trunk
Commit: 34621d7ea81b3472ffac4bf03c2cece5a86da30f
Parents: e9719a8
Author: Mike Percy <mpercy@apache.org>
Authored: Tue May 14 22:32:30 2013 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue May 14 22:32:30 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   68 ++++++---------
 1 files changed, 28 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/34621d7e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 0897c97..af65167 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -187,20 +187,6 @@ class BucketWriter {
    * @throws InterruptedException
    */
   private void open() throws IOException, InterruptedException {
-    runPrivileged(new PrivilegedExceptionAction<Void>() {
-      public Void run() throws Exception {
-        doOpen();
-        return null;
-      }
-    });
-  }
-
-  /**
-   * doOpen() must only be called by open()
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private void doOpen() throws IOException, InterruptedException {
     if ((filePath == null) || (writer == null)) {
       throw new IOException("Invalid file settings");
     }
@@ -233,7 +219,7 @@ class BucketWriter {
         targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;
 
         LOG.info("Creating " + bucketPath);
-        callWithTimeout(new Callable<Void>() {
+        callWithTimeout(new CallRunner<Void>() {
           @Override
           public Void call() throws Exception {
             if (codeC == null) {
@@ -291,23 +277,10 @@ class BucketWriter {
   public synchronized void close() throws IOException, InterruptedException {
     checkAndThrowInterruptedException();
     flush();
-    runPrivileged(new PrivilegedExceptionAction<Void>() {
-      public Void run() throws Exception {
-        doClose();
-        return null;
-      }
-    });
-  }
-
-  /**
-   * doClose() must only be called by close()
-   * @throws IOException
-   */
-  private void doClose() throws IOException, InterruptedException {
     LOG.debug("Closing {}", bucketPath);
     if (isOpen) {
       try {
-        callWithTimeout(new Callable<Void>() {
+        callWithTimeout(new CallRunner<Void>() {
           @Override
           public Void call() throws Exception {
             writer.close(); // could block
@@ -345,12 +318,7 @@ class BucketWriter {
   public synchronized void flush() throws IOException, InterruptedException {
     checkAndThrowInterruptedException();
     if (!isBatchComplete()) {
-      runPrivileged(new PrivilegedExceptionAction<Void>() {
-        public Void run() throws Exception {
-          doFlush();
-          return null;
-        }
-      });
+      doFlush();
 
       if(idleTimeout > 0) {
         // if the future exists and couldn't be cancelled, that would mean it has already
run
@@ -384,7 +352,7 @@ class BucketWriter {
    * @throws IOException
    */
   private void doFlush() throws IOException, InterruptedException {
-    callWithTimeout(new Callable<Void>() {
+    callWithTimeout(new CallRunner<Void>() {
       @Override
       public Void call() throws Exception {
         writer.sync(); // could block
@@ -447,7 +415,7 @@ class BucketWriter {
     // write the event
     try {
       sinkCounter.incrementEventDrainAttemptCount();
-      callWithTimeout(new Callable<Void>() {
+      callWithTimeout(new CallRunner<Void>() {
         @Override
         public Void call() throws Exception {
           writer.append(event); // could block
@@ -514,7 +482,7 @@ class BucketWriter {
     final Path srcPath = new Path(bucketPath);
     final Path dstPath = new Path(targetPath);
 
-    callWithTimeout(new Callable<Object>() {
+    callWithTimeout(new CallRunner<Object>() {
       @Override
       public Object call() throws Exception {
         if(fileSystem.exists(srcPath)) { // could block
@@ -559,9 +527,19 @@ class BucketWriter {
    * for the specified amount of time in milliseconds. In case of timeout
    * cancel the callable and throw an IOException
    */
-  private <T> T callWithTimeout(Callable<T> callable)
+  private <T> T callWithTimeout(final CallRunner<T> callRunner)
     throws IOException, InterruptedException {
-    Future<T> future = callTimeoutPool.submit(callable);
+    Future<T> future = callTimeoutPool.submit(new Callable<T>() {
+      @Override
+      public T call() throws Exception {
+        return runPrivileged(new PrivilegedExceptionAction<T>() {
+          @Override
+          public T run() throws Exception {
+            return callRunner.call();
+          }
+        });
+      }
+    });
     try {
       if (callTimeout > 0) {
         return future.get(callTimeout, TimeUnit.MILLISECONDS);
@@ -597,4 +575,14 @@ class BucketWriter {
     }
   }
 
+  /**
+   * Simple interface whose <tt>call</tt> method is called by
+   * {#callWithTimeout} in a new thread inside a
+   * {@linkplain java.security.PrivilegedExceptionAction#run()} call.
+   * @param <T>
+   */
+  private interface CallRunner<T> {
+    T call() throws Exception;
+  }
+
 }


Mime
View raw message