flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1702: HDFSEventSink should write to a hidden file as opposed to a .tmp file
Date Tue, 18 Dec 2012 20:04:36 GMT
Updated Branches:
  refs/heads/trunk e320842da -> ae62279f8


FLUME-1702: HDFSEventSink should write to a hidden file as opposed to a .tmp file

(Jarek Jarcec Cecho via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ae62279f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ae62279f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ae62279f

Branch: refs/heads/trunk
Commit: ae62279f8066b1e2728a579ce8384eca36180e9d
Parents: e320842
Author: Brock Noland <brock@apache.org>
Authored: Tue Dec 18 14:04:09 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Tue Dec 18 14:04:09 2012 -0600

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    2 +
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   81 +++++++++------
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |   38 +++++---
 .../apache/flume/sink/hdfs/TestBucketWriter.java   |   77 +++++++++++---
 .../apache/flume/sink/hdfs/TestHDFSEventSink.java  |   12 ++-
 5 files changed, 145 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ae62279f/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 9507413..69860f7 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1275,6 +1275,8 @@ Name                    Default       Description
 **hdfs.path**           --            HDFS directory path (eg hdfs://namenode/flume/webdata/)
 hdfs.filePrefix         FlumeData     Name prefixed to files created by Flume in hdfs directory
 hdfs.fileSuffix         --            Suffix to append to file (eg ``.avro`` - *NOTE: period
is not automatically added*)
+hdfs.inUsePrefix        --            Prefix that is used for temporal files that flume actively
writes into
+hdfs.inUseSuffix        ``.tmp``      Suffix that is used for temporal files that flume actively
writes into
 hdfs.rollInterval       30            Number of seconds to wait before rolling current file
                                       (0 = never roll based on time interval)
 hdfs.rollSize           1024          File size to trigger roll, in bytes (0: never roll
based on file size)

http://git-wip-us.apache.org/repos/asf/flume/blob/ae62279f/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index d0ff6e3..002d48d 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -55,7 +54,8 @@ class BucketWriter {
   private static final Logger LOG = LoggerFactory
       .getLogger(BucketWriter.class);
 
-  static final String IN_USE_EXT = ".tmp";
+  private static String DIRECTORY_DELIMITER = System.getProperty("file.separator");
+
   /**
    * This lock ensures that only one thread can open a file at a time.
    */
@@ -69,7 +69,6 @@ class BucketWriter {
   private final long batchSize;
   private final CompressionCodec codeC;
   private final CompressionType compType;
-  private final Context context;
   private final ScheduledExecutorService timedRollerPool;
   private final UserGroupInformation user;
 
@@ -81,15 +80,20 @@ class BucketWriter {
   private FileSystem fileSystem;
 
   private volatile String filePath;
+  private volatile String fileName;
+  private volatile String inUsePrefix;
+  private volatile String inUseSuffix;
   private volatile String fileSuffix;
   private volatile String bucketPath;
+  private volatile String targetPath;
   private volatile long batchCounter;
   private volatile boolean isOpen;
   private volatile ScheduledFuture<Void> timedRollFuture;
   private SinkCounter sinkCounter;
-  private final WriterCallback onIdleCallback;
   private final int idleTimeout;
   private volatile ScheduledFuture<Void> idleFuture;
+  private final WriterCallback onIdleCallback;
+  private final String onIdleCallbackPath;
 
   private Clock clock = new SystemClock();
 
@@ -98,16 +102,20 @@ class BucketWriter {
   protected boolean idleClosed = false;
 
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
-      Context context, String filePath, String fileSuffix, CompressionCodec codeC,
+      Context context, String filePath, String fileName, String inUsePrefix,
+      String inUseSuffix, String fileSuffix, CompressionCodec codeC,
       CompressionType compType, HDFSWriter writer, FlumeFormatter formatter,
       ScheduledExecutorService timedRollerPool, UserGroupInformation user,
-      SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback) {
+      SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback,
+      String onIdleCallbackPath) {
     this.rollInterval = rollInterval;
     this.rollSize = rollSize;
     this.rollCount = rollCount;
     this.batchSize = batchSize;
-    this.context = context;
     this.filePath = filePath;
+    this.fileName = fileName;
+    this.inUsePrefix = inUsePrefix;
+    this.inUseSuffix = inUseSuffix;
     this.fileSuffix = fileSuffix;
     this.codeC = codeC;
     this.compType = compType;
@@ -116,8 +124,9 @@ class BucketWriter {
     this.timedRollerPool = timedRollerPool;
     this.user = user;
     this.sinkCounter = sinkCounter;
-    this.onIdleCallback = onIdleCallback;
     this.idleTimeout = idleTimeout;
+    this.onIdleCallback = onIdleCallback;
+    this.onIdleCallbackPath = onIdleCallbackPath;
 
     fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
 
@@ -197,26 +206,34 @@ class BucketWriter {
     // which caused deadlocks. See FLUME-1231.
     synchronized (staticLock) {
       checkAndThrowInterruptedException();
+
       try {
         long counter = fileExtensionCounter.incrementAndGet();
+
+        String fullFileName = fileName + "." + counter;
+
+        if (codeC == null && fileSuffix != null && fileSuffix.length() >
0) {
+          fullFileName += fileSuffix;
+        }
+
+        if(codeC != null) {
+          fullFileName += codeC.getDefaultExtension();
+        }
+
+        bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix
+          + fullFileName + inUseSuffix;
+        targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;
+
+        LOG.info("Creating " + bucketPath);
         if (codeC == null) {
-          bucketPath = filePath + "." + counter;
-          // FLUME-1645 - add suffix if specified
-          if (fileSuffix != null && fileSuffix.length() > 0) {
-            bucketPath += fileSuffix;
-          }
           // Need to get reference to FS using above config before underlying
           // writer does in order to avoid shutdown hook & IllegalStateExceptions
           fileSystem = new Path(bucketPath).getFileSystem(config);
-          LOG.info("Creating " + bucketPath + IN_USE_EXT);
-          writer.open(bucketPath + IN_USE_EXT, formatter);
+          writer.open(bucketPath, formatter);
         } else {
-          bucketPath = filePath + "." + counter
-              + codeC.getDefaultExtension();
           // need to get reference to FS before writer does to avoid shutdown hook
           fileSystem = new Path(bucketPath).getFileSystem(config);
-          LOG.info("Creating " + bucketPath + IN_USE_EXT);
-          writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter);
+          writer.open(bucketPath, codeC, compType, formatter);
         }
       } catch (Exception ex) {
         sinkCounter.incrementConnectionFailedCount();
@@ -235,7 +252,7 @@ class BucketWriter {
       Callable<Void> action = new Callable<Void>() {
         public Void call() throws Exception {
           LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
-              bucketPath + IN_USE_EXT, rollInterval);
+              bucketPath, rollInterval);
           try {
             close();
           } catch(Throwable t) {
@@ -273,19 +290,19 @@ class BucketWriter {
    * @throws IOException
    */
   private void doClose() throws IOException {
-    LOG.debug("Closing {}", bucketPath + IN_USE_EXT);
+    LOG.debug("Closing {}", bucketPath);
     if (isOpen) {
       try {
         writer.close(); // could block
         sinkCounter.incrementConnectionClosedCount();
       } catch (IOException e) {
         LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
-            IN_USE_EXT + "). Exception follows.", e);
+            "). Exception follows.", e);
         sinkCounter.incrementConnectionFailedCount();
       }
       isOpen = false;
     } else {
-      LOG.info("HDFSWriter is already closed: {}", bucketPath + IN_USE_EXT);
+      LOG.info("HDFSWriter is already closed: {}", bucketPath);
     }
 
     // NOTE: timed rolls go through this codepath as well as other roll types
@@ -327,11 +344,11 @@ class BucketWriter {
           Callable<Void> idleAction = new Callable<Void>() {
             public Void call() throws Exception {
               try {
-                LOG.info("Closing idle bucketWriter {}", filePath);
+                LOG.info("Closing idle bucketWriter {}", bucketPath);
                 idleClosed = true;
                 close();
                 if(onIdleCallback != null)
-                  onIdleCallback.run(filePath);
+                  onIdleCallback.run(onIdleCallbackPath);
               } catch(Throwable t) {
                 LOG.error("Unexpected error", t);
               }
@@ -389,13 +406,13 @@ class BucketWriter {
       writer.append(event, formatter); // could block
     } catch (IOException e) {
       LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
-          bucketPath + IN_USE_EXT + ") and rethrowing exception.",
+          bucketPath + ") and rethrowing exception.",
           e.getMessage());
       try {
         close();
       } catch (IOException e2) {
         LOG.warn("Caught IOException while closing file (" +
-             bucketPath + IN_USE_EXT + "). Exception follows.", e2);
+             bucketPath + "). Exception follows.", e2);
       }
       throw e;
     }
@@ -433,8 +450,12 @@ class BucketWriter {
    * Rename bucketPath file from .tmp to permanent location.
    */
   private void renameBucket() throws IOException {
-    Path srcPath = new Path(bucketPath + IN_USE_EXT);
-    Path dstPath = new Path(bucketPath);
+    if(bucketPath.equals(targetPath)) {
+      return;
+    }
+
+    Path srcPath = new Path(bucketPath);
+    Path dstPath = new Path(targetPath);
 
     if(fileSystem.exists(srcPath)) { // could block
       LOG.info("Renaming " + srcPath + " to " + dstPath);
@@ -444,7 +465,7 @@ class BucketWriter {
 
   @Override
   public String toString() {
-    return "[ " + this.getClass().getSimpleName() + " filePath = " + filePath +
+    return "[ " + this.getClass().getSimpleName() + " targetPath = " + targetPath +
         ", bucketPath = " + bucketPath + " ]";
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/ae62279f/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 64ac2d7..230488e 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -69,15 +69,20 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
   private static final Logger LOG = LoggerFactory
       .getLogger(HDFSEventSink.class);
 
+  private static String DIRECTORY_DELIMITER = System.getProperty("file.separator");
+
   private static final long defaultRollInterval = 30;
   private static final long defaultRollSize = 1024;
   private static final long defaultRollCount = 10;
   private static final String defaultFileName = "FlumeData";
   private static final String defaultSuffix = "";
+  private static final String defaultInUsePrefix = "";
+  private static final String defaultInUseSuffix = ".tmp";
   private static final long defaultBatchSize = 100;
   private static final long defaultTxnEventMax = 100;
   private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
   private static final int defaultMaxOpenFiles = 5000;
+
   /**
    * Default length of time we wait for blocking BucketWriter calls
    * before timing out the operation. Intended to prevent server hangs.
@@ -112,8 +117,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
   private CompressionCodec codeC;
   private CompressionType compType;
   private String fileType;
-  private String path;
+  private String filePath;
+  private String fileName;
   private String suffix;
+  private String inUsePrefix;
+  private String inUseSuffix;
   private TimeZone timeZone;
   private int maxOpenFiles;
   private String writeFormat;
@@ -181,12 +189,12 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
   public void configure(Context context) {
     this.context = context;
 
-    String dirpath = Preconditions.checkNotNull(
+    filePath = Preconditions.checkNotNull(
         context.getString("hdfs.path"), "hdfs.path is required");
-    String fileName = context.getString("hdfs.filePrefix", defaultFileName);
-    // FLUME-1645: add suffix support
+    fileName = context.getString("hdfs.filePrefix", defaultFileName);
     this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
-    this.path = dirpath + System.getProperty("file.separator") + fileName;
+    inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);
+    inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);
     String tzName = context.getString("hdfs.timeZone");
     timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
     rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);
@@ -244,7 +252,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
       }
     }
 
-    if (!authenticate(path)) {
+    if (!authenticate()) {
       LOG.error("Failed to authenticate!");
     }
     needRounding = context.getBoolean("hdfs.round", false);
@@ -394,9 +402,13 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
         }
 
         // reconstruct the path name by substituting place holders
-        String realPath = BucketPath.escapeString(path, event.getHeaders(),
+        String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
             timeZone, needRounding, roundUnit, roundValue);
-        BucketWriter bucketWriter = sfWriters.get(realPath);
+        String realName = BucketPath.escapeString(fileName, event.getHeaders(),
+          timeZone, needRounding, roundUnit, roundValue);
+
+        String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
+        BucketWriter bucketWriter = sfWriters.get(lookupPath);
 
         // we haven't seen this file yet, so open it and cache the handle
         if (bucketWriter == null) {
@@ -414,11 +426,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
             };
           }
           bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
-              batchSize, context, realPath, suffix, codeC, compType, hdfsWriter,
-              formatter, timedRollerPool, proxyTicket, sinkCounter, idleTimeout,
-              idleCallback);
+              batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
+              suffix, codeC, compType, hdfsWriter, formatter, timedRollerPool,
+              proxyTicket, sinkCounter, idleTimeout, idleCallback, lookupPath);
 
-          sfWriters.put(realPath, bucketWriter);
+          sfWriters.put(lookupPath, bucketWriter);
         }
 
         // track the buckets getting written in this transaction
@@ -523,7 +535,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
     super.start();
   }
 
-  private boolean authenticate(String hdfsPath) {
+  private boolean authenticate() {
 
     // logic for kerberos login
     boolean useSecurity = UserGroupInformation.isSecurityEnabled();

http://git-wip-us.apache.org/repos/asf/flume/blob/ae62279f/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index b191ef3..5e5d5d1 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -68,9 +68,10 @@ public class TestBucketWriter {
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
-        "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter, timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null);
+        "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
+        hdfsWriter, formatter, timedRollerPool, null,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
+        null, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -92,10 +93,10 @@ public class TestBucketWriter {
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
-        "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter, timedRollerPool, null,
+        "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
+        hdfsWriter, formatter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null);
+        0, null, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -119,10 +120,10 @@ public class TestBucketWriter {
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-        "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter, timedRollerPool, null,
+        "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
+        hdfsWriter, formatter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null);
+        0, null, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     long startNanos = System.nanoTime();
@@ -201,11 +202,14 @@ public class TestBucketWriter {
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     File tmpFile = File.createTempFile("flume", "test");
     tmpFile.deleteOnExit();
+    String path = tmpFile.getParent();
+    String name = tmpFile.getName();
+
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-        tmpFile.getName(), null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        path, name, "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         formatter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null);
+        0, null, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < NUM_EVENTS - 1; i++) {
@@ -226,10 +230,10 @@ public class TestBucketWriter {
       MockHDFSWriter hdfsWriter = new MockHDFSWriter();
       HDFSTextFormatter formatter = new HDFSTextFormatter();
       BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-          "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
+          "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
           formatter, timedRollerPool, null,
           new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-          0, null);
+          0, null, null);
 
       // Need to override system time use for test so we know what to expect
       final long testTime = System.currentTimeMillis();
@@ -243,7 +247,7 @@ public class TestBucketWriter {
       Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
       bucketWriter.append(e);
 
-      Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1)
+ BucketWriter.IN_USE_EXT));
+      Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1)
+ ".tmp"));
   }
 
     @Test
@@ -254,10 +258,10 @@ public class TestBucketWriter {
         MockHDFSWriter hdfsWriter = new MockHDFSWriter();
         HDFSTextFormatter formatter = new HDFSTextFormatter();
         BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-            "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
+            "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE,
hdfsWriter,
             formatter, timedRollerPool, null,
             new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-            0, null);
+            0, null, null);
 
         // Need to override system time use for test so we know what to expect
 
@@ -273,6 +277,45 @@ public class TestBucketWriter {
         Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
         bucketWriter.append(e);
 
-        Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1)
+ suffix + BucketWriter.IN_USE_EXT));
+        Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1)
+ suffix + ".tmp"));
     }
+
+  @Test
+  public void testInUsePrefix() throws IOException, InterruptedException {
+    final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of
test
+    final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC";
+
+    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+    HDFSTextFormatter formatter = new HDFSTextFormatter();
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
+        "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        formatter, timedRollerPool, null,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+        0, null, null);
+
+    Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+    bucketWriter.append(e);
+
+    Assert.assertTrue("Incorrect in use prefix", hdfsWriter.getOpenedFilePath().contains(PREFIX));
+  }
+
+  @Test
+  public void testInUseSuffix() throws IOException, InterruptedException {
+    final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of
test
+    final String SUFFIX = "WELCOME_TO_THE_HELLMOUNTH";
+
+    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+    HDFSTextFormatter formatter = new HDFSTextFormatter();
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
+        "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        formatter, timedRollerPool, null,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+        0, null, null);
+
+    Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+    bucketWriter.append(e);
+
+    Assert.assertTrue("Incorrect in use suffix", hdfsWriter.getOpenedFilePath().contains(SUFFIX));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/ae62279f/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index d23f09d..1035ac3 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -33,6 +33,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -222,7 +223,7 @@ public class TestHDFSEventSink {
     Assert.assertEquals(Status.BACKOFF, sink.process());
     sink.stop();
   }
-  
+
   @Test
   public void testKerbFileAccess() throws InterruptedException,
       LifecycleException, EventDeliveryException, IOException {
@@ -246,7 +247,7 @@ public class TestHDFSEventSink {
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
     context.put("hdfs.kerberosPrincipal", kerbConfPrincipal);
-    context.put("hdfs.kerberosKeytab", kerbKeytab);    
+    context.put("hdfs.kerberosKeytab", kerbKeytab);
 
     try {
       Configurables.configure(sink, context);
@@ -261,7 +262,7 @@ public class TestHDFSEventSink {
       UserGroupInformation.setConfiguration(conf);
     }
   }
-  
+
   @Test
   public void testTextAppend() throws InterruptedException, LifecycleException,
       EventDeliveryException, IOException {
@@ -1036,9 +1037,10 @@ public class TestHDFSEventSink {
     sink.stop();
     FileStatus[] dirStat = fs.listStatus(dirPath);
     Path[] fList = FileUtil.stat2Paths(dirStat);
-    Assert.assertEquals(2, fList.length);
+    Assert.assertEquals("Incorrect content of the directory " + StringUtils.join(fList, ","),
+      2, fList.length);
     Assert.assertTrue(!fList[0].getName().endsWith(".tmp") &&
-        !fList[1].getName().endsWith(".tmp"));
+      !fList[1].getName().endsWith(".tmp"));
     fs.close();
   }
 }


Mime
View raw message