Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BFAEFD194 for ; Tue, 18 Dec 2012 20:04:37 +0000 (UTC) Received: (qmail 62925 invoked by uid 500); 18 Dec 2012 20:04:37 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 62872 invoked by uid 500); 18 Dec 2012 20:04:36 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 62852 invoked by uid 99); 18 Dec 2012 20:04:36 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Dec 2012 20:04:36 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 338FD81E764; Tue, 18 Dec 2012 20:04:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brock@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1702: HDFSEventSink should write to a hidden file as opposed to a .tmp file Message-Id: <20121218200436.338FD81E764@tyr.zones.apache.org> Date: Tue, 18 Dec 2012 20:04:36 +0000 (UTC) Updated Branches: refs/heads/trunk e320842da -> ae62279f8 FLUME-1702: HDFSEventSink should write to a hidden file as opposed to a .tmp file (Jarek Jarcec Cecho via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ae62279f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ae62279f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ae62279f Branch: refs/heads/trunk Commit: ae62279f8066b1e2728a579ce8384eca36180e9d Parents: e320842 Author: Brock Noland Authored: Tue Dec 18 14:04:09 2012 -0600 Committer: Brock Noland Committed: Tue Dec 18 14:04:09 2012 -0600 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 + .../org/apache/flume/sink/hdfs/BucketWriter.java | 81 +++++++++------ .../org/apache/flume/sink/hdfs/HDFSEventSink.java | 38 +++++--- .../apache/flume/sink/hdfs/TestBucketWriter.java | 77 +++++++++++--- .../apache/flume/sink/hdfs/TestHDFSEventSink.java | 12 ++- 5 files changed, 145 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ae62279f/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 9507413..69860f7 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1275,6 +1275,8 @@ Name Default Description **hdfs.path** -- HDFS directory path (eg hdfs://namenode/flume/webdata/) hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory hdfs.fileSuffix -- Suffix to append to file (eg ``.avro`` - *NOTE: period is not automatically added*) +hdfs.inUsePrefix -- Prefix that is used for temporal files that flume actively writes into +hdfs.inUseSuffix ``.tmp`` Suffix that is used for temporal files that flume actively writes into hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval) hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size) http://git-wip-us.apache.org/repos/asf/flume/blob/ae62279f/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index d0ff6e3..002d48d 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -1,4 +1,3 @@ - /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -55,7 +54,8 @@ class BucketWriter { private static final Logger LOG = LoggerFactory .getLogger(BucketWriter.class); - static final String IN_USE_EXT = ".tmp"; + private static String DIRECTORY_DELIMITER = System.getProperty("file.separator"); + /** * This lock ensures that only one thread can open a file at a time. */ @@ -69,7 +69,6 @@ class BucketWriter { private final long batchSize; private final CompressionCodec codeC; private final CompressionType compType; - private final Context context; private final ScheduledExecutorService timedRollerPool; private final UserGroupInformation user; @@ -81,15 +80,20 @@ class BucketWriter { private FileSystem fileSystem; private volatile String filePath; + private volatile String fileName; + private volatile String inUsePrefix; + private volatile String inUseSuffix; private volatile String fileSuffix; private volatile String bucketPath; + private volatile String targetPath; private volatile long batchCounter; private volatile boolean isOpen; private volatile ScheduledFuture timedRollFuture; private SinkCounter sinkCounter; - private final WriterCallback onIdleCallback; private final int idleTimeout; private volatile ScheduledFuture idleFuture; + private final WriterCallback onIdleCallback; + private final String onIdleCallbackPath; private Clock clock = new SystemClock(); @@ -98,16 +102,20 @@ class BucketWriter { protected boolean idleClosed = false; BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, - Context context, String filePath, String fileSuffix, CompressionCodec codeC, + Context context, String filePath, String fileName, String inUsePrefix, + String inUseSuffix, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, FlumeFormatter formatter, ScheduledExecutorService timedRollerPool, UserGroupInformation user, - SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback) { + SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback, + String onIdleCallbackPath) { this.rollInterval = rollInterval; this.rollSize = rollSize; this.rollCount = rollCount; this.batchSize = batchSize; - this.context = context; this.filePath = filePath; + this.fileName = fileName; + this.inUsePrefix = inUsePrefix; + this.inUseSuffix = inUseSuffix; this.fileSuffix = fileSuffix; this.codeC = codeC; this.compType = compType; @@ -116,8 +124,9 @@ class BucketWriter { this.timedRollerPool = timedRollerPool; this.user = user; this.sinkCounter = sinkCounter; - this.onIdleCallback = onIdleCallback; this.idleTimeout = idleTimeout; + this.onIdleCallback = onIdleCallback; + this.onIdleCallbackPath = onIdleCallbackPath; fileExtensionCounter = new AtomicLong(clock.currentTimeMillis()); @@ -197,26 +206,34 @@ class BucketWriter { // which caused deadlocks. See FLUME-1231. synchronized (staticLock) { checkAndThrowInterruptedException(); + try { long counter = fileExtensionCounter.incrementAndGet(); + + String fullFileName = fileName + "." + counter; + + if (codeC == null && fileSuffix != null && fileSuffix.length() > 0) { + fullFileName += fileSuffix; + } + + if(codeC != null) { + fullFileName += codeC.getDefaultExtension(); + } + + bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix + + fullFileName + inUseSuffix; + targetPath = filePath + DIRECTORY_DELIMITER + fullFileName; + + LOG.info("Creating " + bucketPath); if (codeC == null) { - bucketPath = filePath + "." + counter; - // FLUME-1645 - add suffix if specified - if (fileSuffix != null && fileSuffix.length() > 0) { - bucketPath += fileSuffix; - } // Need to get reference to FS using above config before underlying // writer does in order to avoid shutdown hook & IllegalStateExceptions fileSystem = new Path(bucketPath).getFileSystem(config); - LOG.info("Creating " + bucketPath + IN_USE_EXT); - writer.open(bucketPath + IN_USE_EXT, formatter); + writer.open(bucketPath, formatter); } else { - bucketPath = filePath + "." + counter - + codeC.getDefaultExtension(); // need to get reference to FS before writer does to avoid shutdown hook fileSystem = new Path(bucketPath).getFileSystem(config); - LOG.info("Creating " + bucketPath + IN_USE_EXT); - writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter); + writer.open(bucketPath, codeC, compType, formatter); } } catch (Exception ex) { sinkCounter.incrementConnectionFailedCount(); @@ -235,7 +252,7 @@ class BucketWriter { Callable action = new Callable() { public Void call() throws Exception { LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.", - bucketPath + IN_USE_EXT, rollInterval); + bucketPath, rollInterval); try { close(); } catch(Throwable t) { @@ -273,19 +290,19 @@ class BucketWriter { * @throws IOException */ private void doClose() throws IOException { - LOG.debug("Closing {}", bucketPath + IN_USE_EXT); + LOG.debug("Closing {}", bucketPath); if (isOpen) { try { writer.close(); // could block sinkCounter.incrementConnectionClosedCount(); } catch (IOException e) { LOG.warn("failed to close() HDFSWriter for file (" + bucketPath + - IN_USE_EXT + "). Exception follows.", e); + "). Exception follows.", e); sinkCounter.incrementConnectionFailedCount(); } isOpen = false; } else { - LOG.info("HDFSWriter is already closed: {}", bucketPath + IN_USE_EXT); + LOG.info("HDFSWriter is already closed: {}", bucketPath); } // NOTE: timed rolls go through this codepath as well as other roll types @@ -327,11 +344,11 @@ class BucketWriter { Callable idleAction = new Callable() { public Void call() throws Exception { try { - LOG.info("Closing idle bucketWriter {}", filePath); + LOG.info("Closing idle bucketWriter {}", bucketPath); idleClosed = true; close(); if(onIdleCallback != null) - onIdleCallback.run(filePath); + onIdleCallback.run(onIdleCallbackPath); } catch(Throwable t) { LOG.error("Unexpected error", t); } @@ -389,13 +406,13 @@ class BucketWriter { writer.append(event, formatter); // could block } catch (IOException e) { LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + - bucketPath + IN_USE_EXT + ") and rethrowing exception.", + bucketPath + ") and rethrowing exception.", e.getMessage()); try { close(); } catch (IOException e2) { LOG.warn("Caught IOException while closing file (" + - bucketPath + IN_USE_EXT + "). Exception follows.", e2); + bucketPath + "). Exception follows.", e2); } throw e; } @@ -433,8 +450,12 @@ class BucketWriter { * Rename bucketPath file from .tmp to permanent location. */ private void renameBucket() throws IOException { - Path srcPath = new Path(bucketPath + IN_USE_EXT); - Path dstPath = new Path(bucketPath); + if(bucketPath.equals(targetPath)) { + return; + } + + Path srcPath = new Path(bucketPath); + Path dstPath = new Path(targetPath); if(fileSystem.exists(srcPath)) { // could block LOG.info("Renaming " + srcPath + " to " + dstPath); @@ -444,7 +465,7 @@ class BucketWriter { @Override public String toString() { - return "[ " + this.getClass().getSimpleName() + " filePath = " + filePath + + return "[ " + this.getClass().getSimpleName() + " targetPath = " + targetPath + ", bucketPath = " + bucketPath + " ]"; } http://git-wip-us.apache.org/repos/asf/flume/blob/ae62279f/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 64ac2d7..230488e 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -69,15 +69,20 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private static final Logger LOG = LoggerFactory .getLogger(HDFSEventSink.class); + private static String DIRECTORY_DELIMITER = System.getProperty("file.separator"); + private static final long defaultRollInterval = 30; private static final long defaultRollSize = 1024; private static final long defaultRollCount = 10; private static final String defaultFileName = "FlumeData"; private static final String defaultSuffix = ""; + private static final String defaultInUsePrefix = ""; + private static final String defaultInUseSuffix = ".tmp"; private static final long defaultBatchSize = 100; private static final long defaultTxnEventMax = 100; private static final String defaultFileType = HDFSWriterFactory.SequenceFileType; private static final int defaultMaxOpenFiles = 5000; + /** * Default length of time we wait for blocking BucketWriter calls * before timing out the operation. Intended to prevent server hangs. @@ -112,8 +117,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private CompressionCodec codeC; private CompressionType compType; private String fileType; - private String path; + private String filePath; + private String fileName; private String suffix; + private String inUsePrefix; + private String inUseSuffix; private TimeZone timeZone; private int maxOpenFiles; private String writeFormat; @@ -181,12 +189,12 @@ public class HDFSEventSink extends AbstractSink implements Configurable { public void configure(Context context) { this.context = context; - String dirpath = Preconditions.checkNotNull( + filePath = Preconditions.checkNotNull( context.getString("hdfs.path"), "hdfs.path is required"); - String fileName = context.getString("hdfs.filePrefix", defaultFileName); - // FLUME-1645: add suffix support + fileName = context.getString("hdfs.filePrefix", defaultFileName); this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix); - this.path = dirpath + System.getProperty("file.separator") + fileName; + inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix); + inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix); String tzName = context.getString("hdfs.timeZone"); timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName); rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval); @@ -244,7 +252,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } } - if (!authenticate(path)) { + if (!authenticate()) { LOG.error("Failed to authenticate!"); } needRounding = context.getBoolean("hdfs.round", false); @@ -394,9 +402,13 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } // reconstruct the path name by substituting place holders - String realPath = BucketPath.escapeString(path, event.getHeaders(), + String realPath = BucketPath.escapeString(filePath, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue); - BucketWriter bucketWriter = sfWriters.get(realPath); + String realName = BucketPath.escapeString(fileName, event.getHeaders(), + timeZone, needRounding, roundUnit, roundValue); + + String lookupPath = realPath + DIRECTORY_DELIMITER + realName; + BucketWriter bucketWriter = sfWriters.get(lookupPath); // we haven't seen this file yet, so open it and cache the handle if (bucketWriter == null) { @@ -414,11 +426,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable { }; } bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, - batchSize, context, realPath, suffix, codeC, compType, hdfsWriter, - formatter, timedRollerPool, proxyTicket, sinkCounter, idleTimeout, - idleCallback); + batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, + suffix, codeC, compType, hdfsWriter, formatter, timedRollerPool, + proxyTicket, sinkCounter, idleTimeout, idleCallback, lookupPath); - sfWriters.put(realPath, bucketWriter); + sfWriters.put(lookupPath, bucketWriter); } // track the buckets getting written in this transaction @@ -523,7 +535,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { super.start(); } - private boolean authenticate(String hdfsPath) { + private boolean authenticate() { // logic for kerberos login boolean useSecurity = UserGroupInformation.isSecurityEnabled(); http://git-wip-us.apache.org/repos/asf/flume/blob/ae62279f/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index b191ef3..5e5d5d1 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -68,9 +68,10 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextFormatter formatter = new HDFSTextFormatter(); BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx, - "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, - formatter, timedRollerPool, null, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null); + "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, + hdfsWriter, formatter, timedRollerPool, null, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, + null, null); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { @@ -92,10 +93,10 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextFormatter formatter = new HDFSTextFormatter(); BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx, - "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, - formatter, timedRollerPool, null, + "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, + hdfsWriter, formatter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null); + 0, null, null); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { @@ -119,10 +120,10 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextFormatter formatter = new HDFSTextFormatter(); BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, - "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, - formatter, timedRollerPool, null, + "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, + hdfsWriter, formatter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null); + 0, null, null); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); long startNanos = System.nanoTime(); @@ -201,11 +202,14 @@ public class TestBucketWriter { HDFSTextFormatter formatter = new HDFSTextFormatter(); File tmpFile = File.createTempFile("flume", "test"); tmpFile.deleteOnExit(); + String path = tmpFile.getParent(); + String name = tmpFile.getName(); + BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, - tmpFile.getName(), null, null, SequenceFile.CompressionType.NONE, hdfsWriter, + path, name, "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null); + 0, null, null); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < NUM_EVENTS - 1; i++) { @@ -226,10 +230,10 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextFormatter formatter = new HDFSTextFormatter(); BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, - "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, + "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null); + 0, null, null); // Need to override system time use for test so we know what to expect final long testTime = System.currentTimeMillis(); @@ -243,7 +247,7 @@ public class TestBucketWriter { Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); - Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + BucketWriter.IN_USE_EXT)); + Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + ".tmp")); } @Test @@ -254,10 +258,10 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextFormatter formatter = new HDFSTextFormatter(); BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, - "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, + "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null); + 0, null, null); // Need to override system time use for test so we know what to expect @@ -273,6 +277,45 @@ public class TestBucketWriter { Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); - Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + suffix + BucketWriter.IN_USE_EXT)); + Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + suffix + ".tmp")); } + + @Test + public void testInUsePrefix() throws IOException, InterruptedException { + final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test + final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC"; + + MockHDFSWriter hdfsWriter = new MockHDFSWriter(); + HDFSTextFormatter formatter = new HDFSTextFormatter(); + BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, + "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, + formatter, timedRollerPool, null, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null, null); + + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); + bucketWriter.append(e); + + Assert.assertTrue("Incorrect in use prefix", hdfsWriter.getOpenedFilePath().contains(PREFIX)); + } + + @Test + public void testInUseSuffix() throws IOException, InterruptedException { + final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test + final String SUFFIX = "WELCOME_TO_THE_HELLMOUNTH"; + + MockHDFSWriter hdfsWriter = new MockHDFSWriter(); + HDFSTextFormatter formatter = new HDFSTextFormatter(); + BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, + "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter, + formatter, timedRollerPool, null, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null, null); + + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); + bucketWriter.append(e); + + Assert.assertTrue("Incorrect in use suffix", hdfsWriter.getOpenedFilePath().contains(SUFFIX)); + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/ae62279f/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index d23f09d..1035ac3 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -33,6 +33,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; +import org.apache.commons.lang.StringUtils; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -222,7 +223,7 @@ public class TestHDFSEventSink { Assert.assertEquals(Status.BACKOFF, sink.process()); sink.stop(); } - + @Test public void testKerbFileAccess() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { @@ -246,7 +247,7 @@ public class TestHDFSEventSink { context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); context.put("hdfs.kerberosPrincipal", kerbConfPrincipal); - context.put("hdfs.kerberosKeytab", kerbKeytab); + context.put("hdfs.kerberosKeytab", kerbKeytab); try { Configurables.configure(sink, context); @@ -261,7 +262,7 @@ public class TestHDFSEventSink { UserGroupInformation.setConfiguration(conf); } } - + @Test public void testTextAppend() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { @@ -1036,9 +1037,10 @@ public class TestHDFSEventSink { sink.stop(); FileStatus[] dirStat = fs.listStatus(dirPath); Path[] fList = FileUtil.stat2Paths(dirStat); - Assert.assertEquals(2, fList.length); + Assert.assertEquals("Incorrect content of the directory " + StringUtils.join(fList, ","), + 2, fList.length); Assert.assertTrue(!fList[0].getName().endsWith(".tmp") && - !fList[1].getName().endsWith(".tmp")); + !fList[1].getName().endsWith(".tmp")); fs.close(); } }