flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject [7/24] git commit: FLUME-1645. Add hdfs.fileSuffix property to HDFSEventSink.
Date Tue, 30 Oct 2012 22:59:40 GMT
FLUME-1645. Add hdfs.fileSuffix property to HDFSEventSink.

(Steve Hoffman via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cb6a35ce
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cb6a35ce
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cb6a35ce

Branch: refs/heads/FLUME-1502
Commit: cb6a35ce620f2e300546a001f4562ae881aa9045
Parents: 6d083df
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri Oct 26 18:10:27 2012 -0400
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Oct 26 18:10:27 2012 -0400

----------------------------------------------------------------------
 .../src/main/java/org/apache/flume/Clock.java      |   28 ++++++
 .../main/java/org/apache/flume/SystemClock.java    |   30 ++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    1 +
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   24 +++--
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |   11 +-
 .../org/apache/flume/sink/hdfs/MockHDFSWriter.java |   13 ++-
 .../apache/flume/sink/hdfs/TestBucketWriter.java   |   76 +++++++++++++--
 7 files changed, 154 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/cb6a35ce/flume-ng-core/src/main/java/org/apache/flume/Clock.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/Clock.java b/flume-ng-core/src/main/java/org/apache/flume/Clock.java
new file mode 100644
index 0000000..fc719bc
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/Clock.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume;
+
+/**
+ * Facade for System.currentTimeMillis for Testing
+ */
+public interface Clock {
+
+    public long currentTimeMillis();
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cb6a35ce/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java b/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java
new file mode 100644
index 0000000..f176807
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume;
+
+/**
+ * Default implementation of Clock which uses System
+ */
+public class SystemClock implements Clock {
+
+    public long currentTimeMillis() {
+        return System.currentTimeMillis();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/cb6a35ce/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index d24868d..0c0951b 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1066,6 +1066,7 @@ Name                    Default       Description
 **type**                --            The component type name, needs to be ``hdfs``
 **hdfs.path**           --            HDFS directory path (eg hdfs://namenode/flume/webdata/)
 hdfs.filePrefix         FlumeData     Name prefixed to files created by Flume in hdfs directory
+hdfs.fileSuffix         --            Suffix to append to file (eg ``.avro`` - *NOTE: period
is not automatically added*)
 hdfs.rollInterval       30            Number of seconds to wait before rolling current file
                                       (0 = never roll based on time interval)
 hdfs.rollSize           1024          File size to trigger roll, in bytes (0: never roll
based on file size)

http://git-wip-us.apache.org/repos/asf/flume/blob/cb6a35ce/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index bce8e11..9f2c763 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -27,8 +27,10 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.SystemClock;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
@@ -52,7 +54,7 @@ class BucketWriter {
   private static final Logger LOG = LoggerFactory
       .getLogger(BucketWriter.class);
 
-  private static final String IN_USE_EXT = ".tmp";
+  static final String IN_USE_EXT = ".tmp";
   /**
    * This lock ensures that only one thread can open a file at a time.
    */
@@ -78,14 +80,17 @@ class BucketWriter {
   private FileSystem fileSystem;
 
   private volatile String filePath;
+  private volatile String fileSuffix;
   private volatile String bucketPath;
   private volatile long batchCounter;
   private volatile boolean isOpen;
   private volatile ScheduledFuture<Void> timedRollFuture;
   private SinkCounter sinkCounter;
 
+  private Clock clock = new SystemClock();
+
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
-      Context context, String filePath, CompressionCodec codeC,
+      Context context, String filePath, String fileSuffix, CompressionCodec codeC,
       CompressionType compType, HDFSWriter writer, FlumeFormatter formatter,
       ScheduledExecutorService timedRollerPool, UserGroupInformation user,
       SinkCounter sinkCounter) {
@@ -95,6 +100,7 @@ class BucketWriter {
     this.batchSize = batchSize;
     this.context = context;
     this.filePath = filePath;
+    this.fileSuffix = fileSuffix;
     this.codeC = codeC;
     this.compType = compType;
     this.writer = writer;
@@ -103,7 +109,7 @@ class BucketWriter {
     this.user = user;
     this.sinkCounter = sinkCounter;
 
-    fileExtensionCounter = new AtomicLong(System.currentTimeMillis());
+    fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
 
     isOpen = false;
     writer.configure(context);
@@ -152,7 +158,6 @@ class BucketWriter {
    */
   private void open() throws IOException, InterruptedException {
     runPrivileged(new PrivilegedExceptionAction<Void>() {
-      @Override
       public Void run() throws Exception {
         doOpen();
         return null;
@@ -183,6 +188,10 @@ class BucketWriter {
         long counter = fileExtensionCounter.incrementAndGet();
         if (codeC == null) {
           bucketPath = filePath + "." + counter;
+          // FLUME-1645 - add suffix if specified
+          if (fileSuffix != null && fileSuffix.length() > 0) {
+            bucketPath += fileSuffix;
+          }
           // Need to get reference to FS using above config before underlying
           // writer does in order to avoid shutdown hook & IllegalStateExceptions
           fileSystem = new Path(bucketPath).getFileSystem(config);
@@ -211,7 +220,6 @@ class BucketWriter {
     // if time-based rolling is enabled, schedule the roll
     if (rollInterval > 0) {
       Callable<Void> action = new Callable<Void>() {
-        @Override
         public Void call() throws Exception {
           LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
               bucketPath + IN_USE_EXT, rollInterval);
@@ -238,7 +246,6 @@ class BucketWriter {
   public synchronized void close() throws IOException, InterruptedException {
     flush();
     runPrivileged(new PrivilegedExceptionAction<Void>() {
-      @Override
       public Void run() throws Exception {
         doClose();
         return null;
@@ -284,7 +291,6 @@ class BucketWriter {
   public synchronized void flush() throws IOException, InterruptedException {
     if (!isBatchComplete()) {
       runPrivileged(new PrivilegedExceptionAction<Void>() {
-        @Override
         public Void run() throws Exception {
           doFlush();
           return null;
@@ -390,4 +396,8 @@ class BucketWriter {
   private boolean isBatchComplete() {
     return (batchCounter == 0);
   }
+
+  void setClock(Clock clock) {
+      this.clock = clock;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cb6a35ce/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index a6d624b..e369604 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -69,6 +69,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
   private static final long defaultRollSize = 1024;
   private static final long defaultRollCount = 10;
   private static final String defaultFileName = "FlumeData";
+  private static final String defaultSuffix = "";
   private static final long defaultBatchSize = 100;
   private static final long defaultTxnEventMax = 100;
   private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
@@ -108,6 +109,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
   private CompressionType compType;
   private String fileType;
   private String path;
+  private String suffix;
   private TimeZone timeZone;
   private int maxOpenFiles;
   private String writeFormat;
@@ -170,13 +172,14 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
   }
 
     // read configuration and setup thresholds
-  @Override
   public void configure(Context context) {
     this.context = context;
 
     String dirpath = Preconditions.checkNotNull(
         context.getString("hdfs.path"), "hdfs.path is required");
     String fileName = context.getString("hdfs.filePrefix", defaultFileName);
+    // FLUME-1645: add suffix support
+    this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
     this.path = dirpath + System.getProperty("file.separator") + fileName;
     String tzName = context.getString("hdfs.timeZone");
     timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
@@ -370,7 +373,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
    * HDFS. <br/>
    * This method is not thread safe.
    */
-  @Override
   public Status process() throws EventDeliveryException {
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
@@ -396,7 +398,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
               .getFormatter(writeFormat);
 
           bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
-              batchSize, context, realPath, codeC, compType, hdfsWriter,
+              batchSize, context, realPath, suffix, codeC, compType, hdfsWriter,
               formatter, timedRollerPool, proxyTicket, sinkCounter);
 
           sfWriters.put(realPath, bucketWriter);
@@ -706,7 +708,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
 
     // Write the data to HDFS
     callWithTimeout(new Callable<Void>() {
-      @Override
       public Void call() throws Exception {
         bucketWriter.append(event);
         return null;
@@ -721,7 +722,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
       throws IOException, InterruptedException {
 
     callWithTimeout(new Callable<Void>() {
-      @Override
       public Void call() throws Exception {
         bucketWriter.flush();
         return null;
@@ -736,7 +736,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
       throws IOException, InterruptedException {
 
     callWithTimeout(new Callable<Void>() {
-      @Override
       public Void call() throws Exception {
         bucketWriter.close();
         return null;

http://git-wip-us.apache.org/repos/asf/flume/blob/cb6a35ce/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
index 24a7cbf..0b7910a 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
@@ -31,6 +31,7 @@ public class MockHDFSWriter implements HDFSWriter {
   private int filesClosed = 0;
   private int bytesWritten = 0;
   private int eventsWritten = 0;
+  private String filePath = null;
 
   public int getFilesOpened() {
     return filesOpened;
@@ -48,6 +49,10 @@ public class MockHDFSWriter implements HDFSWriter {
     return eventsWritten;
   }
 
+  public String getOpenedFilePath() {
+    return filePath;
+  }
+
   public void clear() {
     filesOpened = 0;
     filesClosed = 0;
@@ -55,33 +60,29 @@ public class MockHDFSWriter implements HDFSWriter {
     eventsWritten = 0;
   }
 
-  @Override
   public void configure(Context context) {
     // no-op
   }
 
-  @Override
   public void open(String filePath, FlumeFormatter fmt) throws IOException {
+    this.filePath = filePath;
     filesOpened++;
   }
 
-  @Override
   public void open(String filePath, CompressionCodec codec, CompressionType cType, FlumeFormatter
fmt) throws IOException {
+    this.filePath = filePath;
     filesOpened++;
   }
 
-  @Override
   public void append(Event e, FlumeFormatter fmt) throws IOException {
     eventsWritten++;
     bytesWritten += e.getBody().length;
   }
 
-  @Override
   public void sync() throws IOException {
     // does nothing
   }
 
-  @Override
   public void close() throws IOException {
     filesClosed++;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cb6a35ce/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 60f1830..6a8072e 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
@@ -67,7 +68,7 @@ public class TestBucketWriter {
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
-        "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         formatter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
 
@@ -91,7 +92,7 @@ public class TestBucketWriter {
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
-        "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         formatter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
 
@@ -117,7 +118,7 @@ public class TestBucketWriter {
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-        "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         formatter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
 
@@ -166,30 +167,30 @@ public class TestBucketWriter {
 
     HDFSWriter hdfsWriter = new HDFSWriter() {
       private volatile boolean open = false;
-      @Override
+
       public void configure(Context context) {
 
       }
-      @Override
+
       public void sync() throws IOException {
         if(!open) {
           throw new IOException("closed");
         }
       }
-      @Override
+
       public void open(String filePath, CompressionCodec codec,
           CompressionType cType, FlumeFormatter fmt) throws IOException {
         open = true;
       }
-      @Override
+
       public void open(String filePath, FlumeFormatter fmt) throws IOException {
         open = true;
       }
-      @Override
+
       public void close() throws IOException {
         open = false;
       }
-      @Override
+
       public void append(Event e, FlumeFormatter fmt) throws IOException {
         // we just re-open in append if closed
         open = true;
@@ -199,7 +200,7 @@ public class TestBucketWriter {
     File tmpFile = File.createTempFile("flume", "test");
     tmpFile.deleteOnExit();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-        tmpFile.getName(), null, SequenceFile.CompressionType.NONE, hdfsWriter,
+        tmpFile.getName(), null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         formatter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
 
@@ -214,4 +215,59 @@ public class TestBucketWriter {
     bucketWriter.flush(); // throws closed exception
   }
 
+  @Test
+  public void testFileSuffixNotGiven() throws IOException, InterruptedException {
+      final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of
test
+      final String suffix = null;
+
+      MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+      HDFSTextFormatter formatter = new HDFSTextFormatter();
+      BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
+          "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
+          formatter, timedRollerPool, null,
+          new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
+
+      // Need to override system time use for test so we know what to expect
+      final long testTime = System.currentTimeMillis();
+      Clock testClock = new Clock() {
+          public long currentTimeMillis() {
+              return testTime;
+          }
+      };
+      bucketWriter.setClock(testClock);
+
+      Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+      bucketWriter.append(e);
+
+      Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1)
+ BucketWriter.IN_USE_EXT));
+  }
+
+    @Test
+    public void testFileSuffixGiven() throws IOException, InterruptedException {
+        final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course
of test
+        final String suffix = ".avro";
+
+        MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+        HDFSTextFormatter formatter = new HDFSTextFormatter();
+        BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
+            "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
+            formatter, timedRollerPool, null,
+            new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
+
+        // Need to override system time use for test so we know what to expect
+
+        final long testTime = System.currentTimeMillis();
+
+        Clock testClock = new Clock() {
+            public long currentTimeMillis() {
+                return testTime;
+            }
+        };
+        bucketWriter.setClock(testClock);
+
+        Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+        bucketWriter.append(e);
+
+        Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1)
+ suffix + BucketWriter.IN_USE_EXT));
+    }
 }


Mime
View raw message