flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject flume git commit: FLUME-2586. HDFS Sink must try to rename files even if close fails.
Date Wed, 28 Jan 2015 21:05:16 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 82631f811 -> 0d6eccad2


FLUME-2586. HDFS Sink must try to rename files even if close fails.

(Johny Rufus via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0d6eccad
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0d6eccad
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0d6eccad

Branch: refs/heads/trunk
Commit: 0d6eccad2781884fe0f000f74b8f964cbdd7971f
Parents: 82631f8
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Wed Jan 28 13:02:50 2015 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Wed Jan 28 13:04:58 2015 -0800

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/BucketWriter.java    | 64 ++++++++------------
 .../apache/flume/sink/hdfs/MockFileSystem.java  | 44 ++++++++------
 .../flume/sink/hdfs/MockFsDataOutputStream.java | 20 ++----
 .../flume/sink/hdfs/TestBucketWriter.java       | 32 +++++-----
 .../flume/sink/hdfs/TestHDFSEventSink.java      | 18 ++++--
 5 files changed, 83 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/0d6eccad/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index f9e39ac..62f4eee 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -109,12 +109,12 @@ class BucketWriter {
 
   private Clock clock = new SystemClock();
   private final long retryInterval;
-  private final int maxCloseTries;
+  private final int maxRenameTries;
 
   // flag that the bucket writer was closed due to idling and thus shouldn't be
   // reopened. Not ideal, but avoids internals of owners
   protected boolean closed = false;
-  AtomicInteger closeTries = new AtomicInteger(0);
+  AtomicInteger renameTries = new AtomicInteger(0);
 
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
     Context context, String filePath, String fileName, String inUsePrefix,
@@ -148,7 +148,7 @@ class BucketWriter {
     fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
 
     this.retryInterval = retryInterval;
-    this.maxCloseTries = maxCloseTries;
+    this.maxRenameTries = maxCloseTries;
     isOpen = false;
     isUnderReplicated = false;
     this.writer.configure(context);
@@ -336,55 +336,32 @@ class BucketWriter {
       private final HDFSWriter localWriter = writer;
       @Override
       public Void call() throws Exception {
-        LOG.info("Close tries incremented");
-        closeTries.incrementAndGet();
         localWriter.close(); // could block
         return null;
       }
     };
   }
 
-  private Callable<Void> createScheduledCloseCallable(
-    final CallRunner<Void> closeCallRunner) {
+  private Callable<Void> createScheduledRenameCallable() {
 
     return new Callable<Void>() {
       private final String path = bucketPath;
       private final String finalPath = targetPath;
       private FileSystem fs = fileSystem;
-      private boolean closeSuccess = false;
-      private Path tmpFilePath = new Path(path);
-      private int closeTries = 1; // one attempt is already done
-      private final CallRunner<Void> closeCall = closeCallRunner;
+      private int renameTries = 1; // one attempt is already done
 
       @Override
       public Void call() throws Exception {
-        if (closeTries >= maxCloseTries) {
-          LOG.warn("Unsuccessfully attempted to close " + path + " " +
-            maxCloseTries + " times. File may be open, " +
-            "or may not have been renamed." );
+        if (renameTries >= maxRenameTries) {
+          LOG.warn("Unsuccessfully attempted to rename " + path + " " +
+            maxRenameTries + " times. File may still be open.");
           return null;
         }
-        closeTries++;
+        renameTries++;
         try {
-          if (!closeSuccess) {
-            if (isClosedMethod == null) {
-              LOG.debug("isFileClosed method is not available in " +
-                "the version of HDFS client being used. " +
-                "Not attempting to close file again");
-              return null;
-            }
-            if (!isFileClosed(fs, tmpFilePath)) {
-              callWithTimeout(closeCall);
-            }
-            // It is possible rename failing causes this thread
-            // to get rescheduled. In that case,
-            // don't check with NN if close succeeded as we know
-            // it did. This helps avoid an unnecessary RPC call.
-            closeSuccess = true;
-          }
           renameBucket(path, finalPath, fs);
         } catch (Exception e) {
-          LOG.warn("Closing file: " + path + " failed. Will " +
+          LOG.warn("Renaming file: " + path + " failed. Will " +
             "retry again in " + retryInterval + " seconds.", e);
           timedRollerPool.schedule(this, retryInterval,
             TimeUnit.SECONDS);
@@ -422,10 +399,6 @@ class BucketWriter {
             "). Exception follows.", e);
         sinkCounter.incrementConnectionFailedCount();
         failedToClose = true;
-        final Callable<Void> scheduledClose =
-          createScheduledCloseCallable(closeCallRunner);
-        timedRollerPool.schedule(scheduledClose, retryInterval,
-          TimeUnit.SECONDS);
       }
       isOpen = false;
     } else {
@@ -443,10 +416,20 @@ class BucketWriter {
       idleFuture = null;
     }
 
-    // Don't rename file if this failed to close
-    if (bucketPath != null && fileSystem != null && !failedToClose) {
+    if (bucketPath != null && fileSystem != null) {
       // could block or throw IOException
-      renameBucket(bucketPath, targetPath, fileSystem);
+      try {
+        renameBucket(bucketPath, targetPath, fileSystem);
+      } catch(Exception e) {
+        LOG.warn(
+          "failed to rename() file (" + bucketPath +
+          "). Exception follows.", e);
+        sinkCounter.incrementConnectionFailedCount();
+        final Callable<Void> scheduledRename =
+                createScheduledRenameCallable();
+        timedRollerPool.schedule(scheduledRename, retryInterval,
+                TimeUnit.SECONDS);
+      }
     }
     if (callCloseCallback) {
       runCloseAction();
@@ -671,6 +654,7 @@ class BucketWriter {
       public Void call() throws Exception {
         if (fs.exists(srcPath)) { // could block
           LOG.info("Renaming " + srcPath + " to " + dstPath);
+          renameTries.incrementAndGet();
           fs.rename(srcPath, dstPath); // could block
         }
         return null;

http://git-wip-us.apache.org/repos/asf/flume/blob/0d6eccad/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
index ca4f852..4443335 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
@@ -37,13 +37,22 @@ public class MockFileSystem extends FileSystem {
       LoggerFactory.getLogger(MockFileSystem.class);
 
   FileSystem fs;
-  int numberOfClosesRequired;
+  int numberOfRetriesRequired;
   MockFsDataOutputStream latestOutputStream;
+  int currentRenameAttempts;
+  boolean closeSucceed = true;
 
   public MockFileSystem(FileSystem fs,
-    int numberOfClosesRequired) {
+    int numberOfRetriesRequired) {
     this.fs = fs;
-    this.numberOfClosesRequired = numberOfClosesRequired;
+    this.numberOfRetriesRequired = numberOfRetriesRequired;
+  }
+
+  public MockFileSystem(FileSystem fs,
+                        int numberOfRetriesRequired, boolean closeSucceed) {
+    this.fs = fs;
+    this.numberOfRetriesRequired = numberOfRetriesRequired;
+    this.closeSucceed = closeSucceed;
   }
 
   @Override
@@ -51,7 +60,7 @@ public class MockFileSystem extends FileSystem {
       throws IOException {
 
     latestOutputStream = new MockFsDataOutputStream(
-      fs.append(arg0, arg1, arg2), numberOfClosesRequired);
+      fs.append(arg0, arg1, arg2), closeSucceed);
 
     return latestOutputStream;
   }
@@ -60,7 +69,7 @@ public class MockFileSystem extends FileSystem {
   public FSDataOutputStream create(Path arg0) throws IOException {
     //throw new IOException ("HI there2");
     latestOutputStream = new MockFsDataOutputStream(
-      fs.create(arg0), numberOfClosesRequired);
+      fs.create(arg0), closeSucceed);
 
     return latestOutputStream;
   }
@@ -116,8 +125,17 @@ public class MockFileSystem extends FileSystem {
 
   @Override
   public boolean rename(Path arg0, Path arg1) throws IOException {
-
-    return fs.rename(arg0, arg1);
+    currentRenameAttempts++;
+    logger.info(
+      "Attempting to Rename: '" + currentRenameAttempts + "' of '" +
+      numberOfRetriesRequired + "'");
+    if (currentRenameAttempts >= numberOfRetriesRequired ||
+      numberOfRetriesRequired == 0) {
+      logger.info("Renaming file");
+      return fs.rename(arg0, arg1);
+    } else {
+      throw new IOException("MockIOException");
+    }
   }
 
   @Override
@@ -125,16 +143,4 @@ public class MockFileSystem extends FileSystem {
     fs.setWorkingDirectory(arg0);
 
   }
-
-  public boolean isFileClosed(Path path) {
-
-    logger.info("isFileClosed: '" +
-      latestOutputStream.getCurrentCloseAttempts() + "' , '" +
-      numberOfClosesRequired + "'");
-    return latestOutputStream.getCurrentCloseAttempts() >=
-      numberOfClosesRequired || numberOfClosesRequired == 0;
-  }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/0d6eccad/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
index 5bbacae..35b034e 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
@@ -28,34 +28,24 @@ public class MockFsDataOutputStream extends FSDataOutputStream{
   private static final Logger logger =
       LoggerFactory.getLogger(MockFsDataOutputStream.class);
 
-  int currentCloseAttempts = 0;
-  int numberOfClosesRequired;
+  boolean closeSucceed;
 
   public MockFsDataOutputStream(FSDataOutputStream wrapMe,
-    int numberOfClosesRequired)
+    boolean closeSucceed)
       throws IOException {
     super(wrapMe.getWrappedStream(), null);
-
-    this.numberOfClosesRequired = numberOfClosesRequired;
-
+    this.closeSucceed = closeSucceed;
   }
 
   @Override
   public void close() throws IOException {
-    currentCloseAttempts++;
     logger.info(
-      "Attempting to Close: '" + currentCloseAttempts + "' of '" +
-        numberOfClosesRequired + "'");
-    if (currentCloseAttempts >= numberOfClosesRequired ||
-      numberOfClosesRequired == 0) {
+      "Close Succeeded - " + closeSucceed);
+    if (closeSucceed) {
       logger.info("closing file");
       super.close();
     } else {
       throw new IOException("MockIOException");
     }
   }
-
-  public int getCurrentCloseAttempts() {
-    return currentCloseAttempts;
-  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/0d6eccad/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index bcb912f..7c74b16 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -408,15 +408,18 @@ public class TestBucketWriter {
 
 
   @Test
-  public void testSequenceFileCloseRetries() throws Exception {
-    SequenceFileCloseRetryCoreTest(1);
-    SequenceFileCloseRetryCoreTest(5);
-    SequenceFileCloseRetryCoreTest(2);
+  public void testSequenceFileRenameRetries() throws Exception {
+    SequenceFileRenameRetryCoreTest(1, true);
+    SequenceFileRenameRetryCoreTest(5, true);
+    SequenceFileRenameRetryCoreTest(2, true);
 
-  }
+    SequenceFileRenameRetryCoreTest(1, false);
+    SequenceFileRenameRetryCoreTest(5, false);
+    SequenceFileRenameRetryCoreTest(2, false);
 
+  }
 
-  public void SequenceFileCloseRetryCoreTest(int numberOfClosesRequired) throws Exception
{
+  public void SequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean closeSucceed)
throws Exception {
     String hdfsPath = "file:///tmp/flume-test."
       + Calendar.getInstance().getTimeInMillis() + "."
       + Thread.currentThread().getId();
@@ -429,13 +432,13 @@ public class TestBucketWriter {
     fs.mkdirs(dirPath);
     context.put("hdfs.path", hdfsPath);
     context.put("hdfs.closeTries",
-      String.valueOf(numberOfClosesRequired));
+      String.valueOf(numberOfRetriesRequired));
     context.put("hdfs.rollCount", "1");
     context.put("hdfs.retryInterval", "1");
     context.put("hdfs.callTimeout", Long.toString(1000));
     MockFileSystem mockFs = new
       MockFileSystem(fs,
-      numberOfClosesRequired);
+      numberOfRetriesRequired, closeSucceed);
     BucketWriter bucketWriter = new BucketWriter(0, 0, 1, 1, ctx,
       hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null,
       null, new MockDataStream(mockFs),
@@ -443,7 +446,7 @@ public class TestBucketWriter {
       new SinkCounter(
         "test-bucket-writer-" + System.currentTimeMillis()),
       0, null, null, 30000, Executors.newSingleThreadExecutor(), 1,
-      numberOfClosesRequired);
+      numberOfRetriesRequired);
 
     bucketWriter.setFileSystem(mockFs);
     // At this point, we checked if isFileClosed is available in
@@ -453,12 +456,11 @@ public class TestBucketWriter {
     // This is what triggers the close, so a 2nd append is required :/
     bucketWriter.append(event);
 
-    TimeUnit.SECONDS.sleep(numberOfClosesRequired + 2);
+    TimeUnit.SECONDS.sleep(numberOfRetriesRequired + 2);
 
-    int expectedNumberOfCloses = numberOfClosesRequired;
-    Assert.assertTrue("Expected " + expectedNumberOfCloses + " " +
-      "but got " + bucketWriter.closeTries.get(),
-      bucketWriter.closeTries.get() ==
-        expectedNumberOfCloses);
+    Assert.assertTrue("Expected " + numberOfRetriesRequired + " " +
+      "but got " + bucketWriter.renameTries.get(),
+      bucketWriter.renameTries.get() ==
+        numberOfRetriesRequired);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/0d6eccad/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index f29f1f1..1b7a364 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -1375,10 +1375,16 @@ public class TestHDFSEventSink {
     Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount());
   }
   @Test
-  public void testRetryClose() throws InterruptedException,
+  public void testRetryRename() throws InterruptedException,
     LifecycleException,
     EventDeliveryException, IOException {
+    testRetryRename(true);
+    testRetryRename(false);
+  }
 
+  private void testRetryRename(boolean closeSucceed) throws InterruptedException,
+          LifecycleException,
+          EventDeliveryException, IOException {
     LOG.debug("Starting...");
     String newPath = testPath + "/retryBucket";
 
@@ -1388,7 +1394,7 @@ public class TestHDFSEventSink {
     Path dirPath = new Path(newPath);
     fs.delete(dirPath, true);
     fs.mkdirs(dirPath);
-    MockFileSystem mockFs = new MockFileSystem(fs, 3);
+    MockFileSystem mockFs = new MockFileSystem(fs, 6, closeSucceed);
 
     Context context = getContextForRetryTests();
     Configurables.configure(sink, context);
@@ -1434,15 +1440,15 @@ public class TestHDFSEventSink {
 
     Collection<BucketWriter> writers = sink.getSfWriters().values();
 
-    int totalCloseAttempts = 0;
+    int totalRenameAttempts = 0;
     for(BucketWriter writer: writers) {
-      LOG.info("Close tries = "+ writer.closeTries.get());
-      totalCloseAttempts += writer.closeTries.get();
+      LOG.info("Rename tries = "+ writer.renameTries.get());
+      totalRenameAttempts += writer.renameTries.get();
     }
     // stop clears the sfWriters map, so we need to compute the
     // close tries count before stopping the sink.
     sink.stop();
-    Assert.assertEquals(6, totalCloseAttempts);
+    Assert.assertEquals(6, totalRenameAttempts);
 
   }
 }


Mime
View raw message