flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-2357. HDFS sink should retry closing files that previously had close errors
Date Tue, 29 Apr 2014 00:20:39 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.5 964ce67a8 -> e984ffa7e


FLUME-2357. HDFS sink should retry closing files that previously had close errors

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e984ffa7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e984ffa7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e984ffa7

Branch: refs/heads/flume-1.5
Commit: e984ffa7e86f983953eeaf6fa3f0de3d699dc73b
Parents: 964ce67
Author: Mike Percy <mpercy@cloudera.com>
Authored: Mon Apr 28 17:14:40 2014 -0700
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Mon Apr 28 17:18:04 2014 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   7 +
 .../flume/sink/hdfs/AbstractHDFSWriter.java     |  51 ------
 .../apache/flume/sink/hdfs/BucketWriter.java    | 182 ++++++++++++++++---
 .../sink/hdfs/HDFSCompressedDataStream.java     |   3 +-
 .../apache/flume/sink/hdfs/HDFSDataStream.java  |  26 ++-
 .../apache/flume/sink/hdfs/HDFSEventSink.java   |  51 +++++-
 .../flume/sink/hdfs/HDFSSequenceFile.java       |   3 +-
 .../apache/flume/sink/hdfs/MockDataStream.java  |  39 ++++
 .../apache/flume/sink/hdfs/MockFileSystem.java  | 140 ++++++++++++++
 .../hdfs/MockFileSystemCloseRetryWrapper.java   | 142 ---------------
 .../flume/sink/hdfs/MockFsDataOutputStream.java |  61 +++++++
 ...MockFsDataOutputStreamCloseRetryWrapper.java |  73 --------
 .../flume/sink/hdfs/TestBucketWriter.java       | 166 ++++++++++++-----
 .../flume/sink/hdfs/TestHDFSEventSink.java      | 128 +++++++++++++
 .../sink/hdfs/TestUseRawLocalFileSystem.java    |  62 -------
 15 files changed, 721 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 7b918ed..b24f8af 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1597,6 +1597,13 @@ hdfs.roundValue         1             Rounded down to the highest multiple of th
 hdfs.roundUnit          second        The unit of the round down value - ``second``, ``minute`` or ``hour``.
 hdfs.timeZone           Local Time    Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
 hdfs.useLocalTimeStamp  false         Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
+hdfs.closeTries         0             Number of times the sink must try to close a file. If set to 1, this sink will not re-try a failed close
+                                      (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension.
+                                      If set to 0, the sink will try to close the file until the file is eventually closed
+                                      (there is no limit on the number of times it would try).
+hdfs.retryInterval      180           Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode,
+                                      so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not
+                                      attempt to close the file if the first attempt fails, and may leave the file open or with a ".tmp" extension.
 serializer              ``TEXT``      Other possible options include ``avro_event`` or the
                                       fully-qualified class name of an implementation of the
                                       ``EventSerializer.Builder`` interface.

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
index da0466d..043ca6c 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -226,54 +225,4 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
     }
     return m;
   }
-
-  /**
-   * This will
-   * @param outputStream
-   * @throws IOException
-   */
-  protected void closeHDFSOutputStream(OutputStream outputStream)
-      throws IOException {
-    try {
-      outputStream.close();
-
-      if (numberOfCloseRetries > 0) {
-        try {
-          Method isFileClosedMethod = getIsFileClosedMethod();
-          int closeAttemptsMade = 0;
-          if (isFileClosedMethod != null) {
-            while (closeAttemptsMade < numberOfCloseRetries.intValue() &&
-                Boolean.FALSE.equals(isFileClosedMethod.invoke(fs, destPath))) {
-              closeAttemptsMade++;
-              logger.debug("Waiting: '" + timeBetweenCloseRetries + "' before retry close");
-              Thread.sleep(timeBetweenCloseRetries);
-              try {
-                outputStream.close();
-              } catch (IOException e) {
-                logger.error("Unable to close HDFS file: '" + destPath + "'");
-              }
-            }
-            if (closeAttemptsMade == numberOfCloseRetries.intValue()) {
-              logger.warn("Failed to close '" + destPath + "' is " +
-                numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds");
-            }
-          }
-        } catch (Exception e) {
-          logger.error("Failed to close '" + destPath + "' is " +
-              numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds", e);
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Unable to close HDFS file: '" + destPath + "'");
-    }
-  }
-
-  private Method getIsFileClosedMethod() {
-    try {
-      return fs.getClass().getMethod("isFileClosed", Path.class);
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index e82d13d..fba3f66 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -19,6 +19,7 @@
 package org.apache.flume.sink.hdfs;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
@@ -29,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -63,8 +65,9 @@ class BucketWriter {
    * This lock ensures that only one thread can open a file at a time.
    */
   private static final Integer staticLock = new Integer(1);
+  private Method isClosedMethod = null;
 
-  private final HDFSWriter writer;
+  private HDFSWriter writer;
   private final long rollInterval;
   private final long rollSize;
   private final long rollCount;
@@ -102,11 +105,16 @@ class BucketWriter {
   private final ExecutorService callTimeoutPool;
   private final int maxConsecUnderReplRotations = 30; // make this config'able?
 
+  private boolean mockFsInjected = false;
+
   private Clock clock = new SystemClock();
+  private final long retryInterval;
+  private final int maxCloseTries;
 
   // flag that the bucket writer was closed due to idling and thus shouldn't be
   // reopened. Not ideal, but avoids internals of owners
   protected boolean closed = false;
+  AtomicInteger closeTries = new AtomicInteger(0);
 
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
     Context context, String filePath, String fileName, String inUsePrefix,
@@ -115,7 +123,8 @@ class BucketWriter {
     ScheduledExecutorService timedRollerPool, UserGroupInformation user,
     SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
     String onCloseCallbackPath, long callTimeout,
-    ExecutorService callTimeoutPool) {
+    ExecutorService callTimeoutPool, long retryInterval,
+    int maxCloseTries) {
     this.rollInterval = rollInterval;
     this.rollSize = rollSize;
     this.rollCount = rollCount;
@@ -138,11 +147,24 @@ class BucketWriter {
     this.callTimeoutPool = callTimeoutPool;
     fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
 
+    this.retryInterval = retryInterval;
+    this.maxCloseTries = maxCloseTries;
     isOpen = false;
     isUnderReplicated = false;
     this.writer.configure(context);
   }
 
+  @VisibleForTesting
+  void setFileSystem(FileSystem fs) {
+    this.fileSystem = fs;
+    mockFsInjected = true;
+  }
+
+  @VisibleForTesting
+  void setMockStream(HDFSWriter dataWriter) {
+    this.writer = dataWriter;
+  }
+
   /**
    * Allow methods to act as another user (typically used for HDFS Kerberos)
    * @param <T>
@@ -180,6 +202,25 @@ class BucketWriter {
     batchCounter = 0;
   }
 
+  private Method getRefIsClosed() {
+    try {
+      return fileSystem.getClass().getMethod("isFileClosed",
+        Path.class);
+    } catch (Exception e) {
+      LOG.warn("isFileClosed is not available in the " +
+        "version of HDFS being used. Flume will not " +
+        "attempt to close files if the close fails on " +
+        "the first attempt",e);
+      return null;
+    }
+  }
+
+  private Boolean isFileClosed(FileSystem fs,
+    Path tmpFilePath) throws Exception {
+    return (Boolean)(isClosedMethod.invoke(fs,
+      tmpFilePath));
+  }
+
   /**
    * open() is called by append()
    * @throws IOException
@@ -223,12 +264,20 @@ class BucketWriter {
           public Void call() throws Exception {
             if (codeC == null) {
               // Need to get reference to FS using above config before underlying
-              // writer does in order to avoid shutdown hook & IllegalStateExceptions
-              fileSystem = new Path(bucketPath).getFileSystem(config);
+              // writer does in order to avoid shutdown hook &
+              // IllegalStateExceptions
+              if(!mockFsInjected) {
+                fileSystem = new Path(bucketPath).getFileSystem(
+                  config);
+              }
               writer.open(bucketPath);
             } else {
-              // need to get reference to FS before writer does to avoid shutdown hook
-              fileSystem = new Path(bucketPath).getFileSystem(config);
+              // need to get reference to FS before writer does to
+              // avoid shutdown hook
+              if(!mockFsInjected) {
+                fileSystem = new Path(bucketPath).getFileSystem(
+                  config);
+              }
               writer.open(bucketPath, codeC, compType);
             }
             return null;
@@ -243,6 +292,7 @@ class BucketWriter {
         }
       }
     }
+    isClosedMethod = getRefIsClosed();
     sinkCounter.incrementConnectionCreatedCount();
     resetCounters();
 
@@ -280,6 +330,71 @@ class BucketWriter {
   public synchronized void close() throws IOException, InterruptedException {
     close(false);
   }
+
+  private CallRunner<Void> createCloseCallRunner() {
+    return new CallRunner<Void>() {
+      private final HDFSWriter localWriter = writer;
+      @Override
+      public Void call() throws Exception {
+        LOG.info("Close tries incremented");
+        closeTries.incrementAndGet();
+        localWriter.close(); // could block
+        return null;
+      }
+    };
+  }
+
+  private Callable<Void> createScheduledCloseCallable(
+    final CallRunner<Void> closeCallRunner) {
+
+    return new Callable<Void>() {
+      private final String path = bucketPath;
+      private final String finalPath = targetPath;
+      private FileSystem fs = fileSystem;
+      private boolean closeSuccess = false;
+      private Path tmpFilePath = new Path(path);
+      private int closeTries = 1; // one attempt is already done
+      private final CallRunner<Void> closeCall = closeCallRunner;
+
+      @Override
+      public Void call() throws Exception {
+        if (closeTries >= maxCloseTries) {
+          LOG.warn("Unsuccessfully attempted to close " + path + " " +
+            maxCloseTries + " times. File may be open, " +
+            "or may not have been renamed." );
+          return null;
+        }
+        closeTries++;
+        try {
+          if (!closeSuccess) {
+            if (isClosedMethod == null) {
+              LOG.debug("isFileClosed method is not available in " +
+                "the version of HDFS client being used. " +
+                "Not attempting to close file again");
+              return null;
+            }
+            if (!isFileClosed(fs, tmpFilePath)) {
+              callWithTimeout(closeCall);
+            }
+            // It is possible rename failing causes this thread
+            // to get rescheduled. In that case,
+            // don't check with NN if close succeeded as we know
+            // it did. This helps avoid an unnecessary RPC call.
+            closeSuccess = true;
+          }
+          renameBucket(path, finalPath, fs);
+        } catch (Exception e) {
+          LOG.warn("Closing file: " + path + " failed. Will " +
+            "retry again in " + retryInterval + " seconds.", e);
+          timedRollerPool.schedule(this, retryInterval,
+            TimeUnit.SECONDS);
+          return null;
+        }
+        return null;
+      }
+    };
+
+  }
   /**
    * Close the file handle and rename the temp file to the permanent filename.
    * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
@@ -290,21 +405,23 @@ class BucketWriter {
     throws IOException, InterruptedException {
     checkAndThrowInterruptedException();
     flush();
-    LOG.debug("Closing {}", bucketPath);
+    boolean failedToClose = false;
+    LOG.info("Closing {}", bucketPath);
+    CallRunner<Void> closeCallRunner = createCloseCallRunner();
     if (isOpen) {
       try {
-        callWithTimeout(new CallRunner<Void>() {
-          @Override
-          public Void call() throws Exception {
-            writer.close(); // could block
-            return null;
-          }
-        });
+        callWithTimeout(closeCallRunner);
         sinkCounter.incrementConnectionClosedCount();
       } catch (IOException e) {
-        LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
+        LOG.warn(
+          "failed to close() HDFSWriter for file (" + bucketPath +
             "). Exception follows.", e);
         sinkCounter.incrementConnectionFailedCount();
+        failedToClose = true;
+        final Callable<Void> scheduledClose =
+          createScheduledCloseCallable(closeCallRunner);
+        timedRollerPool.schedule(scheduledClose, retryInterval,
+          TimeUnit.SECONDS);
       }
       isOpen = false;
     } else {
@@ -322,14 +439,15 @@ class BucketWriter {
       idleFuture = null;
     }
 
-    if (bucketPath != null && fileSystem != null) {
-      renameBucket(); // could block or throw IOException
-      fileSystem = null;
+    // Don't rename file if this failed to close
+    if (bucketPath != null && fileSystem != null && !failedToClose) {
+      // could block or throw IOException
+      renameBucket(bucketPath, targetPath, fileSystem);
     }
     if (callCloseCallback) {
       runCloseAction();
       closed = true;
-    }    
+    }
   }
 
   /**
@@ -525,7 +643,18 @@ class BucketWriter {
   /**
    * Rename bucketPath file from .tmp to permanent location.
    */
-  private void renameBucket() throws IOException, InterruptedException {
+  // When this bucket writer is rolled based on rollCount or
+  // rollSize, the same instance is reused for the new file. But if
+  // the previous file was not closed/renamed,
+  // the bucket writer fields no longer point to it and hence need
+  // to be passed in from the thread attempting to close it. Even
+  // when the bucket writer is closed due to close timeout,
+  // this method can get called from the scheduled thread so the
+  // file gets closed later - so an implicit reference to this
+  // bucket writer would still be alive in the Callable instance.
+  private void renameBucket(String bucketPath,
+    String targetPath, final FileSystem fs) throws IOException,
+    InterruptedException {
     if(bucketPath.equals(targetPath)) {
       return;
     }
@@ -533,12 +662,12 @@ class BucketWriter {
     final Path srcPath = new Path(bucketPath);
     final Path dstPath = new Path(targetPath);
 
-    callWithTimeout(new CallRunner<Object>() {
+    callWithTimeout(new CallRunner<Void>() {
       @Override
-      public Object call() throws Exception {
-        if(fileSystem.exists(srcPath)) { // could block
+      public Void call() throws Exception {
+        if (fs.exists(srcPath)) { // could block
           LOG.info("Renaming " + srcPath + " to " + dstPath);
-          fileSystem.rename(srcPath, dstPath); // could block
+          fs.rename(srcPath, dstPath); // could block
         }
         return null;
       }
@@ -600,9 +729,8 @@ class BucketWriter {
     } catch (TimeoutException eT) {
       future.cancel(true);
       sinkCounter.incrementConnectionFailedCount();
-      throw new IOException("Callable timed out after " + callTimeout + " ms" +
-          " on file: " + bucketPath,
-        eT);
+      throw new IOException("Callable timed out after " +
+        callTimeout + " ms" + " on file: " + bucketPath, eT);
     } catch (ExecutionException e1) {
       sinkCounter.incrementConnectionFailedCount();
       Throwable cause = e1.getCause();

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index 5518547..fe857c3 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -147,8 +147,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
     }
     fsOut.flush();
     fsOut.sync();
-    closeHDFSOutputStream(cmpOut);
-
+    cmpOut.close();
     unregisterCurrentStream();
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
index e20d1ee..6fa12eb 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
@@ -19,6 +19,8 @@
 package org.apache.flume.sink.hdfs;
 
 import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.serialization.EventSerializer;
@@ -57,11 +59,15 @@ public class HDFSDataStream extends AbstractHDFSWriter {
         + useRawLocalFileSystem);
   }
 
-  @Override
-  public void open(String filePath) throws IOException {
-    Configuration conf = new Configuration();
-    Path dstPath = new Path(filePath);
-    FileSystem hdfs = dstPath.getFileSystem(conf);
+  @VisibleForTesting
+  protected FileSystem getDfs(Configuration conf,
+    Path dstPath) throws IOException{
+    return  dstPath.getFileSystem(conf);
+  }
+
+  protected void doOpen(Configuration conf,
+    Path dstPath, FileSystem hdfs) throws
+    IOException {
     if(useRawLocalFileSystem) {
       if(hdfs instanceof LocalFileSystem) {
         hdfs = ((LocalFileSystem)hdfs).getRaw();
@@ -100,6 +106,14 @@ public class HDFSDataStream extends AbstractHDFSWriter {
   }
 
   @Override
+  public void open(String filePath) throws IOException {
+    Configuration conf = new Configuration();
+    Path dstPath = new Path(filePath);
+    FileSystem hdfs = getDfs(conf, dstPath);
+    doOpen(conf, dstPath, hdfs);
+  }
+
+  @Override
   public void open(String filePath, CompressionCodec codec,
                    CompressionType cType) throws IOException {
     open(filePath);
@@ -123,7 +137,7 @@ public class HDFSDataStream extends AbstractHDFSWriter {
     serializer.beforeClose();
     outStream.flush();
     outStream.sync();
-    closeHDFSOutputStream(outStream);
+    outStream.close();
 
     unregisterCurrentStream();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 4ea78c1..4f3b3f0 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -46,6 +46,7 @@ import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -79,6 +80,10 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
   private static final long defaultBatchSize = 100;
   private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
   private static final int defaultMaxOpenFiles = 5000;
+  // Time between close retries, in seconds
+  private static final long defaultRetryInterval = 180;
+  // Retry forever.
+  private static final int defaultTryCount = Integer.MAX_VALUE;
 
   /**
    * Default length of time we wait for blocking BucketWriter calls
@@ -140,7 +145,12 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
 
   private volatile int idleTimeout;
   private Clock clock;
+  private FileSystem mockFs;
+  private HDFSWriter mockWriter;
   private final Object sfWritersLock = new Object();
+  private long retryInterval;
+  private int tryCount;
+
 
   /*
    * Extended Java LinkedHashMap for open file handle LRU queue.
@@ -218,6 +228,21 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
     kerbKeytab = context.getString("hdfs.kerberosKeytab", "");
     proxyUserName = context.getString("hdfs.proxyUser", "");
+    tryCount = context.getInteger("hdfs.closeTries", defaultTryCount);
+    if(tryCount <= 0) {
+      LOG.warn("Retry count value : " + tryCount + " is not " +
+        "valid. The sink will try to close the file until the file " +
+        "is eventually closed.");
+      tryCount = defaultTryCount;
+    }
+    retryInterval = context.getLong("hdfs.retryInterval",
+      defaultRetryInterval);
+    if(retryInterval <= 0) {
+      LOG.warn("Retry Interval value: " + retryInterval + " is not " +
+        "valid. If the first close of a file fails, " +
+        "it may remain open and will not be renamed.");
+      tryCount = 1;
+    }
 
     Preconditions.checkArgument(batchSize > 0,
         "batchSize must be greater than 0");
@@ -453,11 +478,18 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
   private BucketWriter initializeBucketWriter(String realPath,
     String realName, String lookupPath, HDFSWriter hdfsWriter,
     WriterCallback closeCallback) {
-    return new BucketWriter(rollInterval, rollSize, rollCount,
+    BucketWriter bucketWriter = new BucketWriter(rollInterval,
+      rollSize, rollCount,
       batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
       suffix, codeC, compType, hdfsWriter, timedRollerPool,
       proxyTicket, sinkCounter, idleTimeout, closeCallback,
-      lookupPath, callTimeout, callTimeoutPool);
+      lookupPath, callTimeout, callTimeoutPool, retryInterval,
+      tryCount);
+    if(mockFs != null) {
+      bucketWriter.setFileSystem(mockFs);
+      bucketWriter.setMockStream(mockWriter);
+    }
+    return bucketWriter;
   }
 
   @Override
@@ -716,4 +748,19 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
   void setBucketClock(Clock clock) {
     BucketPath.setClock(clock);
   }
+
+  @VisibleForTesting
+  void setMockFs(FileSystem mockFs) {
+    this.mockFs = mockFs;
+  }
+
+  @VisibleForTesting
+  void setMockWriter(HDFSWriter writer) {
+    this.mockWriter = writer;
+  }
+
+  @VisibleForTesting
+  int getTryCount() {
+    return tryCount;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index 5fe9f1b..2608987 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -116,8 +116,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter {
   @Override
   public void close() throws IOException {
     writer.close();
-    closeHDFSOutputStream(outStream);
-
+    outStream.close();
     unregisterCurrentStream();
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
new file mode 100644
index 0000000..f0c6e7e
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+class MockDataStream extends HDFSDataStream {
+  private final FileSystem fs;
+
+  MockDataStream(FileSystem fs) {
+    this.fs = fs;
+  }
+  @Override
+  protected FileSystem getDfs(Configuration conf,
+    Path dstPath) throws IOException{
+    return fs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
new file mode 100644
index 0000000..ca4f852
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockFileSystem extends FileSystem {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(MockFileSystem.class);
+
+  FileSystem fs;
+  int numberOfClosesRequired;
+  MockFsDataOutputStream latestOutputStream;
+
+  public MockFileSystem(FileSystem fs,
+    int numberOfClosesRequired) {
+    this.fs = fs;
+    this.numberOfClosesRequired = numberOfClosesRequired;
+  }
+
+  @Override
+  public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2)
+      throws IOException {
+
+    latestOutputStream = new MockFsDataOutputStream(
+      fs.append(arg0, arg1, arg2), numberOfClosesRequired);
+
+    return latestOutputStream;
+  }
+
+  @Override
+  public FSDataOutputStream create(Path arg0) throws IOException {
+    //throw new IOException ("HI there2");
+    latestOutputStream = new MockFsDataOutputStream(
+      fs.create(arg0), numberOfClosesRequired);
+
+    return latestOutputStream;
+  }
+
+  @Override
+  public FSDataOutputStream create(Path arg0, FsPermission arg1,
+    boolean arg2, int arg3, short arg4, long arg5, Progressable arg6)
+    throws IOException {
+    throw new IOException("Not a real file system");
+  }
+
+  @Override
+  @Deprecated
+  public boolean delete(Path arg0) throws IOException {
+    return fs.delete(arg0);
+  }
+
+  @Override
+  public boolean delete(Path arg0, boolean arg1) throws IOException {
+    return fs.delete(arg0, arg1);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path arg0) throws IOException {
+    return fs.getFileStatus(arg0);
+  }
+
+  @Override
+  public URI getUri() {
+    return fs.getUri();
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return fs.getWorkingDirectory();
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path arg0) throws IOException {
+    return fs.listStatus(arg0);
+  }
+
+  @Override
+  public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException {
+    // TODO Auto-generated method stub
+    return fs.mkdirs(arg0, arg1);
+  }
+
+  @Override
+  public FSDataInputStream open(Path arg0, int arg1) throws IOException {
+    return fs.open(arg0, arg1);
+  }
+
+  @Override
+  public boolean rename(Path arg0, Path arg1) throws IOException {
+
+    return fs.rename(arg0, arg1);
+  }
+
+  @Override
+  public void setWorkingDirectory(Path arg0) {
+    fs.setWorkingDirectory(arg0);
+
+  }
+
+  public boolean isFileClosed(Path path) {
+
+    logger.info("isFileClosed: '" +
+      latestOutputStream.getCurrentCloseAttempts() + "' , '" +
+      numberOfClosesRequired + "'");
+    return latestOutputStream.getCurrentCloseAttempts() >=
+      numberOfClosesRequired || numberOfClosesRequired == 0;
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
deleted file mode 100644
index b5d89e6..0000000
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.sink.hdfs;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MockFileSystemCloseRetryWrapper extends FileSystem{
-
-  private static final Logger logger =
-      LoggerFactory.getLogger(MockFileSystemCloseRetryWrapper.class);
-
-  FileSystem fs;
-  int numberOfClosesRequired;
-  boolean throwExceptionsOfFailedClose;
-  MockFsDataOutputStreamCloseRetryWrapper latestOutputStream;
-
-  public MockFileSystemCloseRetryWrapper (FileSystem fs,
-      int numberOfClosesRequired, boolean throwExceptionsOfFailedClose) {
-    this.fs = fs;
-    this.throwExceptionsOfFailedClose = throwExceptionsOfFailedClose;
-    this.numberOfClosesRequired = numberOfClosesRequired;
-  }
-
-  public MockFsDataOutputStreamCloseRetryWrapper getLastMockOutputStream() {
-    return latestOutputStream;
-  }
-
-  @Override
-  public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2)
-      throws IOException {
-
-    latestOutputStream = new MockFsDataOutputStreamCloseRetryWrapper(fs.append(arg0, arg1, arg2), numberOfClosesRequired, throwExceptionsOfFailedClose);
-
-    return latestOutputStream;
-  }
-
-  @Override
-  public FSDataOutputStream create(Path arg0) throws IOException {
-    //throw new IOException ("HI there2");
-    latestOutputStream = new MockFsDataOutputStreamCloseRetryWrapper(fs.create(arg0), numberOfClosesRequired, throwExceptionsOfFailedClose);
-
-    return latestOutputStream;
-  }
-
-  @Override
-  public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2,
-      int arg3, short arg4, long arg5, Progressable arg6) throws IOException {
-    throw new IOException ("Not a real file system");
-    //return new MockFsDataOutputStreamCloseRetryWrapper(fs.create(arg0, arg1, arg2, arg3, arg4, arg5, arg6), numberOfClosesRequired, throwExceptionsOfFailedClose);
-  }
-
-  @Override
-  @Deprecated
-  public boolean delete(Path arg0) throws IOException {
-    return fs.delete(arg0);
-  }
-
-  @Override
-  public boolean delete(Path arg0, boolean arg1) throws IOException {
-    return fs.delete(arg0, arg1);
-  }
-
-  @Override
-  public FileStatus getFileStatus(Path arg0) throws IOException {
-    return fs.getFileStatus(arg0);
-  }
-
-  @Override
-  public URI getUri() {
-    return fs.getUri();
-  }
-
-  @Override
-  public Path getWorkingDirectory() {
-    return fs.getWorkingDirectory();
-  }
-
-  @Override
-  public FileStatus[] listStatus(Path arg0) throws IOException {
-    return fs.listStatus(arg0);
-  }
-
-  @Override
-  public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException {
-    // TODO Auto-generated method stub
-    return fs.mkdirs(arg0, arg1);
-  }
-
-  @Override
-  public FSDataInputStream open(Path arg0, int arg1) throws IOException {
-    return fs.open(arg0, arg1);
-  }
-
-  @Override
-  public boolean rename(Path arg0, Path arg1) throws IOException {
-
-    return fs.rename(arg0, arg1);
-  }
-
-  @Override
-  public void setWorkingDirectory(Path arg0) {
-    fs.setWorkingDirectory(arg0);
-
-  }
-
-  public boolean isFileClosed(Path path) {
-
-    logger.info("isFileClosed: '" + latestOutputStream.getCurrentCloseAttempts() + "' , '" + numberOfClosesRequired + "'");
-
-    return latestOutputStream.getCurrentCloseAttempts() >= numberOfClosesRequired || numberOfClosesRequired == 0;
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
new file mode 100644
index 0000000..5bbacae
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java
@@ -0,0 +1,61 @@
+/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockFsDataOutputStream extends FSDataOutputStream{
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(MockFsDataOutputStream.class);
+
+  int currentCloseAttempts = 0;
+  int numberOfClosesRequired;
+
+  public MockFsDataOutputStream(FSDataOutputStream wrapMe,
+    int numberOfClosesRequired)
+      throws IOException {
+    super(wrapMe.getWrappedStream(), null);
+
+    this.numberOfClosesRequired = numberOfClosesRequired;
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    currentCloseAttempts++;
+    logger.info(
+      "Attempting to Close: '" + currentCloseAttempts + "' of '" +
+        numberOfClosesRequired + "'");
+    if (currentCloseAttempts >= numberOfClosesRequired ||
+      numberOfClosesRequired == 0) {
+      logger.info("closing file");
+      super.close();
+    } else {
+      throw new IOException("MockIOException");
+    }
+  }
+
+  public int getCurrentCloseAttempts() {
+    return currentCloseAttempts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
deleted file mode 100644
index 1d8c140..0000000
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-package org.apache.flume.sink.hdfs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MockFsDataOutputStreamCloseRetryWrapper extends FSDataOutputStream{
-
-  private static final Logger logger =
-      LoggerFactory.getLogger(MockFsDataOutputStreamCloseRetryWrapper.class);
-
-  int currentCloseAttempts = 0;
-  int numberOfClosesRequired;
-  boolean throwExceptionsOfFailedClose;
-
-  public MockFsDataOutputStreamCloseRetryWrapper(FSDataOutputStream wrapMe,
-      int numberOfClosesRequired, boolean throwExceptionsOfFailedClose)
-      throws IOException {
-    super(wrapMe.getWrappedStream(), null);
-
-    this.numberOfClosesRequired = numberOfClosesRequired;
-    this.throwExceptionsOfFailedClose = throwExceptionsOfFailedClose;
-
-  }
-
-  public MockFsDataOutputStreamCloseRetryWrapper(OutputStream out,
-      Statistics stats) throws IOException {
-    super(out, stats);
-
-  }
-
-  @Override
-  public void close() throws IOException {
-    currentCloseAttempts++;
-    logger.info("Attempting to Close: '" + currentCloseAttempts + "' of '" + numberOfClosesRequired + "'");
-    if (currentCloseAttempts > numberOfClosesRequired || numberOfClosesRequired == 0) {
-      logger.info("closing file");
-      super.close();
-    } else {
-      if (throwExceptionsOfFailedClose) {
-        logger.info("no closed and throwing exception");
-        throw new IOException("MockIOException");
-      } else {
-        logger.info("no closed and doing nothing");
-      }
-    }
-  }
-
-  public int getCurrentCloseAttempts() {
-    return currentCloseAttempts;
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index b7cc586..bcb912f 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -20,6 +20,7 @@ package org.apache.flume.sink.hdfs;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Calendar;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -30,6 +31,9 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -70,7 +74,7 @@ public class TestBucketWriter {
         "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
         hdfsWriter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
-        null, null, 30000, Executors.newSingleThreadExecutor());
+        null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -90,11 +94,12 @@ public class TestBucketWriter {
   public void testSizeRoller() throws IOException, InterruptedException {
     int maxBytes = 300;
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
-        "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
-        hdfsWriter, timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null, 30000, Executors.newSingleThreadExecutor());
+    BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0,
+      ctx, "/tmp", "file", "", ".tmp", null, null,
+      SequenceFile.CompressionType.NONE, hdfsWriter,timedRollerPool,
+      null, new SinkCounter("test-bucket-writer-" +
+      System.currentTimeMillis()),0, null, null, 30000,
+      Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -126,7 +131,7 @@ public class TestBucketWriter {
       public void run(String filePath) {
         calledBack.set(true);
       }
-    }, null, 30000, Executors.newSingleThreadExecutor());
+    }, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     long startNanos = System.nanoTime();
@@ -143,8 +148,9 @@ public class TestBucketWriter {
     bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
       "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
       hdfsWriter, timedRollerPool, null,
-      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-      0, null, null, 30000, Executors.newSingleThreadExecutor());
+      new SinkCounter("test-bucket-writer-"
+        + System.currentTimeMillis()), 0, null, null, 30000,
+      Executors.newSingleThreadExecutor(), 0, 0);
     // write one more event (to reopen a new file so we will roll again later)
     bucketWriter.append(e);
 
@@ -221,11 +227,13 @@ public class TestBucketWriter {
     String path = tmpFile.getParent();
     String name = tmpFile.getName();
 
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-        path, name, "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null, 30000, Executors.newSingleThreadExecutor());
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
+      0, ctx, path, name, "", ".tmp", null, null,
+      SequenceFile.CompressionType.NONE, hdfsWriter,
+      timedRollerPool, null, new SinkCounter("test-bucket-writer-"
+      + System.currentTimeMillis()),
+      0, null, null, 30000, Executors.newSingleThreadExecutor(),
+      0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < NUM_EVENTS - 1; i++) {
@@ -244,11 +252,12 @@ public class TestBucketWriter {
       final String suffix = null;
 
       MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-      BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-          "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-          timedRollerPool, null,
-          new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-          0, null, null, 30000, Executors.newSingleThreadExecutor());
+      BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0,
+        0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter,
+        timedRollerPool, null, new SinkCounter("test-bucket-writer-"
+        + System.currentTimeMillis()), 0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0);
 
       // Need to override system time use for test so we know what to expect
       final long testTime = System.currentTimeMillis();
@@ -270,12 +279,13 @@ public class TestBucketWriter {
         final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
         final String suffix = ".avro";
 
-        MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-        BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-            "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-            timedRollerPool, null,
-            new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-            0, null, null, 30000, Executors.newSingleThreadExecutor());
+      MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+      BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0,
+        0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter,
+        timedRollerPool, null, new SinkCounter(
+        "test-bucket-writer-" + System.currentTimeMillis()), 0,
+        null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
         // Need to override system time use for test so we know what to expect
 
@@ -291,7 +301,8 @@ public class TestBucketWriter {
         Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
         bucketWriter.append(e);
 
-        Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + suffix + ".tmp"));
+        Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(
+          Long.toString(testTime + 1) + suffix + ".tmp"));
     }
 
   @Test
@@ -301,12 +312,14 @@ public class TestBucketWriter {
     final String suffix = ".foo";
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-        "/tmp", "file", "", ".tmp", suffix, HDFSEventSink.getCodec("gzip"),
-        SequenceFile.CompressionType.BLOCK, hdfsWriter,
-        timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null, 30000, Executors.newSingleThreadExecutor());
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
+      0, ctx, "/tmp", "file", "", ".tmp", suffix,
+      HDFSEventSink.getCodec("gzip"),
+      SequenceFile.CompressionType.BLOCK, hdfsWriter,
+      timedRollerPool, null, new SinkCounter("test-bucket-writer-"
+      + System.currentTimeMillis()), 0, null, null, 30000,
+      Executors.newSingleThreadExecutor(), 0, 0
+    );
 
     // Need to override system time use for test so we know what to expect
     final long testTime = System.currentTimeMillis();
@@ -332,11 +345,12 @@ public class TestBucketWriter {
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextSerializer formatter = new HDFSTextSerializer();
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-        "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null, 30000, Executors.newSingleThreadExecutor());
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
+      0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null,
+      SequenceFile.CompressionType.NONE, hdfsWriter,
+      timedRollerPool, null, new SinkCounter(
+        "test-bucket-writer-" + System.currentTimeMillis()), 0,
+      null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -351,11 +365,12 @@ public class TestBucketWriter {
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextSerializer serializer = new HDFSTextSerializer();
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-        "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null, 30000, Executors.newSingleThreadExecutor());
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
+      0, ctx, "/tmp", "file", "", SUFFIX, null, null,
+      SequenceFile.CompressionType.NONE, hdfsWriter,
+      timedRollerPool, null, new SinkCounter(
+        "test-bucket-writer-" + System.currentTimeMillis()), 0,
+      null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -370,16 +385,18 @@ public class TestBucketWriter {
     final AtomicBoolean callbackCalled = new AtomicBoolean(false);
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-      "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE,
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0,
+      0, ctx, "/tmp", "file", "", SUFFIX, null, null,
+      SequenceFile.CompressionType.NONE,
       hdfsWriter, timedRollerPool, null,
-      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
+      new SinkCounter(
+        "test-bucket-writer-" + System.currentTimeMillis()), 0,
       new HDFSEventSink.WriterCallback() {
       @Override
       public void run(String filePath) {
         callbackCalled.set(true);
       }
-    }, "blah", 30000, Executors.newSingleThreadExecutor());
+    }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -387,4 +404,61 @@ public class TestBucketWriter {
 
     Assert.assertTrue(callbackCalled.get());
   }
+
+
+
+  @Test
+  public void testSequenceFileCloseRetries() throws Exception {
+    SequenceFileCloseRetryCoreTest(1);
+    SequenceFileCloseRetryCoreTest(5);
+    SequenceFileCloseRetryCoreTest(2);
+
+  }
+
+
+  public void SequenceFileCloseRetryCoreTest(int numberOfClosesRequired) throws Exception {
+    String hdfsPath = "file:///tmp/flume-test."
+      + Calendar.getInstance().getTimeInMillis() + "."
+      + Thread.currentThread().getId();
+
+    Context context = new Context();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(hdfsPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+    context.put("hdfs.path", hdfsPath);
+    context.put("hdfs.closeTries",
+      String.valueOf(numberOfClosesRequired));
+    context.put("hdfs.rollCount", "1");
+    context.put("hdfs.retryInterval", "1");
+    context.put("hdfs.callTimeout", Long.toString(1000));
+    MockFileSystem mockFs = new
+      MockFileSystem(fs,
+      numberOfClosesRequired);
+    BucketWriter bucketWriter = new BucketWriter(0, 0, 1, 1, ctx,
+      hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null,
+      null, new MockDataStream(mockFs),
+      timedRollerPool, null,
+      new SinkCounter(
+        "test-bucket-writer-" + System.currentTimeMillis()),
+      0, null, null, 30000, Executors.newSingleThreadExecutor(), 1,
+      numberOfClosesRequired);
+
+    bucketWriter.setFileSystem(mockFs);
+    // At this point, we checked if isFileClosed is available in
+    // this JVM, so lets make it check again.
+    Event event = EventBuilder.withBody("test", Charsets.UTF_8);
+    bucketWriter.append(event);
+    // This is what triggers the close, so a 2nd append is required :/
+    bucketWriter.append(event);
+
+    TimeUnit.SECONDS.sleep(numberOfClosesRequired + 2);
+
+    int expectedNumberOfCloses = numberOfClosesRequired;
+    Assert.assertTrue("Expected " + expectedNumberOfCloses + " " +
+      "but got " + bucketWriter.closeTries.get(),
+      bucketWriter.closeTries.get() ==
+        expectedNumberOfCloses);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index 87918d1..f29f1f1 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -24,9 +24,13 @@ import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharsetDecoder;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Maps;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
@@ -43,6 +47,7 @@ import org.apache.flume.SystemClock;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.hadoop.conf.Configuration;
@@ -1317,4 +1322,127 @@ public class TestHDFSEventSink {
       !fList[1].getName().endsWith(".tmp"));
     fs.close();
   }
+
+  private Context getContextForRetryTests() {
+    Context context = new Context();
+
+    context.put("hdfs.path", testPath + "/%{retryHeader}");
+    context.put("hdfs.filePrefix", "test");
+    context.put("hdfs.batchSize", String.valueOf(100));
+    context.put("hdfs.fileType", "DataStream");
+    context.put("hdfs.serializer", "text");
+    context.put("hdfs.closeTries","3");
+    context.put("hdfs.rollCount", "1");
+    context.put("hdfs.retryInterval", "1");
+    return context;
+  }
+
+  @Test
+  public void testBadConfigurationForRetryIntervalZero() throws
+    Exception {
+    Context context = getContextForRetryTests();
+    context.put("hdfs.retryInterval", "0");
+
+    Configurables.configure(sink, context);
+    Assert.assertEquals(1, sink.getTryCount());
+  }
+
+  @Test
+  public void testBadConfigurationForRetryIntervalNegative() throws
+    Exception {
+    Context context = getContextForRetryTests();
+    context.put("hdfs.retryInterval", "-1");
+
+    Configurables.configure(sink, context);
+    Assert.assertEquals(1, sink.getTryCount());
+  }
+  @Test
+  public void testBadConfigurationForRetryCountZero() throws
+    Exception {
+    Context context = getContextForRetryTests();
+    context.put("hdfs.closeTries" ,"0");
+
+    Configurables.configure(sink, context);
+    Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount());
+  }
+  @Test
+  public void testBadConfigurationForRetryCountNegative() throws
+    Exception {
+    Context context = getContextForRetryTests();
+    context.put("hdfs.closeTries" ,"-4");
+
+    Configurables.configure(sink, context);
+    Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount());
+  }
+  @Test
+  public void testRetryClose() throws InterruptedException,
+    LifecycleException,
+    EventDeliveryException, IOException {
+
+    LOG.debug("Starting...");
+    String newPath = testPath + "/retryBucket";
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(newPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+    MockFileSystem mockFs = new MockFileSystem(fs, 3);
+
+    Context context = getContextForRetryTests();
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.setMockFs(mockFs);
+    HDFSWriter hdfsWriter = new MockDataStream(mockFs);
+    hdfsWriter.configure(context);
+    sink.setMockWriter(hdfsWriter);
+    sink.start();
+
+    // push the event batches into channel
+    for (int i = 0; i < 2; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      Map<String, String> hdr = Maps.newHashMap();
+      hdr.put("retryHeader", "v1");
+
+      channel.put(EventBuilder.withBody("random".getBytes(), hdr));
+      txn.commit();
+      txn.close();
+
+      // execute sink to process the events
+      sink.process();
+    }
+    // push the event batches into channel
+    for (int i = 0; i < 2; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      Map<String, String> hdr = Maps.newHashMap();
+      hdr.put("retryHeader", "v2");
+      channel.put(EventBuilder.withBody("random".getBytes(), hdr));
+      txn.commit();
+      txn.close();
+      // execute sink to process the events
+      sink.process();
+    }
+
+    TimeUnit.SECONDS.sleep(5); //Sleep till all retries are done.
+
+    Collection<BucketWriter> writers = sink.getSfWriters().values();
+
+    int totalCloseAttempts = 0;
+    for(BucketWriter writer: writers) {
+      LOG.info("Close tries = "+ writer.closeTries.get());
+      totalCloseAttempts += writer.closeTries.get();
+    }
+    // stop clears the sfWriters map, so we need to compute the
+    // close tries count before stopping the sink.
+    sink.stop();
+    Assert.assertEquals(6, totalCloseAttempts);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e984ffa7/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
index 4476530..f3e7d10 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
@@ -19,29 +19,15 @@
 package org.apache.flume.sink.hdfs;
 
 import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.io.FileUtils;
-import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,52 +92,4 @@ public class TestUseRawLocalFileSystem {
     Assert.assertTrue(testFile.length() > 0);
   }
 
-  @Test
-  public void testSequenceFileCloseRetries() throws Exception {
-    SequenceFileCloseRetryCoreTest(3, 0, false);
-    SequenceFileCloseRetryCoreTest(3, 1, false);
-    SequenceFileCloseRetryCoreTest(3, 5, false);
-
-    SequenceFileCloseRetryCoreTest(3, 0, true);
-    SequenceFileCloseRetryCoreTest(3, 1, true);
-    SequenceFileCloseRetryCoreTest(3, 5, true);
-
-    SequenceFileCloseRetryCoreTest(3, 2, true);
-    SequenceFileCloseRetryCoreTest(3, 2, true);
-
-    SequenceFileCloseRetryCoreTest(0, 0, true);
-    SequenceFileCloseRetryCoreTest(1, 0, true);
-  }
-
-
-  public void SequenceFileCloseRetryCoreTest(int numberOfCloseRetriesToAttempt, int numberOfClosesRequired, boolean throwExceptionsOfFailedClose) throws Exception {
-    String file = testFile.getCanonicalPath();
-    HDFSSequenceFile stream = new HDFSSequenceFile();
-    context.put("hdfs.useRawLocalFileSystem", "true");
-    context.put("hdfs.closeTries", String.valueOf(numberOfCloseRetriesToAttempt));
-    Configuration conf = new Configuration();
-    Path dstPath = new Path(file);
-    MockFileSystemCloseRetryWrapper mockFs = new MockFileSystemCloseRetryWrapper(dstPath.getFileSystem(conf), numberOfClosesRequired, throwExceptionsOfFailedClose);
-    stream.configure(context);
-    stream.open(dstPath, null, CompressionType.NONE, conf, mockFs);
-    stream.append(event);
-    stream.sync();
-
-    stream.close();
-
-    if (throwExceptionsOfFailedClose) {
-      int expectedNumberOfCloses = 1;
-      Assert.assertTrue("Expected " + expectedNumberOfCloses + " but got " + mockFs.getLastMockOutputStream().getCurrentCloseAttempts() ,  mockFs.getLastMockOutputStream().currentCloseAttempts == expectedNumberOfCloses);
-    } else {
-      int expectedNumberOfCloses = Math.max(Math.min(numberOfClosesRequired, numberOfCloseRetriesToAttempt), 1);
-      Assert.assertTrue("Expected " + expectedNumberOfCloses + " but got " + mockFs.getLastMockOutputStream().getCurrentCloseAttempts() ,  mockFs.getLastMockOutputStream().currentCloseAttempts == expectedNumberOfCloses);
-    }
-
-
-
-
-
-  }
-
-
 }
\ No newline at end of file


Mime
View raw message