flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1660. Close "idle" hdfs handles.
Date Mon, 19 Nov 2012 08:18:14 GMT
Updated Branches:
  refs/heads/trunk 881180b7e -> d7747cfac


FLUME-1660. Close "idle" hdfs handles.

(Juhani Connolly via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d7747cfa
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d7747cfa
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d7747cfa

Branch: refs/heads/trunk
Commit: d7747cfac8f64704fb39924c365a8ac343244b40
Parents: 881180b
Author: Mike Percy <mpercy@apache.org>
Authored: Mon Nov 19 00:15:55 2012 -0800
Committer: Mike Percy <mpercy@apache.org>
Committed: Mon Nov 19 00:15:55 2012 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    2 +
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   44 ++++++++++-
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |   19 ++++-
 .../apache/flume/sink/hdfs/TestBucketWriter.java   |   17 +++--
 .../apache/flume/sink/hdfs/TestHDFSEventSink.java  |   65 +++++++++++++++
 5 files changed, 139 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d7747cfa/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index bcb679e..b4a8868 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1216,6 +1216,8 @@ hdfs.rollInterval       30            Number of seconds to wait before
rolling c
 hdfs.rollSize           1024          File size to trigger roll, in bytes (0: never roll
based on file size)
 hdfs.rollCount          10            Number of events written to file before it rolled
                                       (0 = never roll based on number of events)
+hdfs.idleTimeout        0             Timeout after which inactive files get closed
+                                      (0 = disable automatic closing of idle files)
 hdfs.batchSize          100           number of events written to file before it is flushed
to HDFS
 hdfs.codeC              --            Compression codec. one of following : gzip, bzip2,
lzo, snappy
 hdfs.fileType           SequenceFile  File format: currently ``SequenceFile``, ``DataStream``
or ``CompressedStream``

http://git-wip-us.apache.org/repos/asf/flume/blob/d7747cfa/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 9f2c763..58ebe49 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -33,6 +33,7 @@ import org.apache.flume.Event;
 import org.apache.flume.SystemClock;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.FlumeFormatter;
+import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -86,14 +87,21 @@ class BucketWriter {
   private volatile boolean isOpen;
   private volatile ScheduledFuture<Void> timedRollFuture;
   private SinkCounter sinkCounter;
+  private final WriterCallback onIdleCallback;
+  private final int idleTimeout;
+  private volatile ScheduledFuture<Void> idleFuture;
 
   private Clock clock = new SystemClock();
 
+  // flag that the bucket writer was closed due to idling and thus shouldn't be
+  // reopened. Not ideal, but avoids internals of owners
+  protected boolean idleClosed = false;
+
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
       Context context, String filePath, String fileSuffix, CompressionCodec codeC,
       CompressionType compType, HDFSWriter writer, FlumeFormatter formatter,
       ScheduledExecutorService timedRollerPool, UserGroupInformation user,
-      SinkCounter sinkCounter) {
+      SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback) {
     this.rollInterval = rollInterval;
     this.rollSize = rollSize;
     this.rollCount = rollCount;
@@ -108,6 +116,8 @@ class BucketWriter {
     this.timedRollerPool = timedRollerPool;
     this.user = user;
     this.sinkCounter = sinkCounter;
+    this.onIdleCallback = onIdleCallback;
+    this.idleTimeout = idleTimeout;
 
     fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
 
@@ -279,6 +289,11 @@ class BucketWriter {
       timedRollFuture = null;
     }
 
+    if(idleFuture != null && !idleFuture.isDone()) {
+      idleFuture.cancel(false);
+      idleFuture = null;
+    }
+
     if (bucketPath != null && fileSystem != null) {
       renameBucket(); // could block or throw IOException
       fileSystem = null;
@@ -296,6 +311,29 @@ class BucketWriter {
           return null;
         }
       });
+
+      if(idleTimeout > 0) {
+        // if the future exists and couldn't be cancelled, that would mean it has already
run
+        // or been cancelled
+        if(idleFuture == null || idleFuture.cancel(false)) {
+          Callable<Void> idleAction = new Callable<Void>() {
+            public Void call() throws Exception {
+              try {
+                LOG.info("Closing idle bucketWriter {}", filePath);
+                idleClosed = true;
+                close();
+                if(onIdleCallback != null)
+                  onIdleCallback.run(filePath);
+              } catch(Throwable t) {
+                LOG.error("Unexpected error", t);
+              }
+              return null;
+            }
+          };
+          idleFuture = timedRollerPool.schedule(idleAction, idleTimeout,
+              TimeUnit.SECONDS);
+        }
+      }
     }
   }
 
@@ -319,6 +357,10 @@ class BucketWriter {
    */
   public synchronized void append(Event event) throws IOException, InterruptedException {
     if (!isOpen) {
+      if(idleClosed) {
+        throw new IOException("This bucket writer was closed due to idling and this handle
" +
+            "is thus no longer valid");
+      }
       open();
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/d7747cfa/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index e369604..64ac2d7 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -62,6 +62,10 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class HDFSEventSink extends AbstractSink implements Configurable {
+  public interface WriterCallback {
+    public void run(String filePath);
+  }
+
   private static final Logger LOG = LoggerFactory
       .getLogger(HDFSEventSink.class);
 
@@ -129,6 +133,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
   private Context context;
   private SinkCounter sinkCounter;
 
+  private volatile int idleTimeout;
+
   /*
    * Extended Java LinkedHashMap for open file handle LRU queue.
    * We want to clear the oldest file handle if there are too many open ones.
@@ -187,6 +193,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
     rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
     rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
     batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
+    idleTimeout = context.getInteger("hdfs.idleTimeout", 0);
     String codecName = context.getString("hdfs.codeC");
     fileType = context.getString("hdfs.fileType", defaultFileType);
     maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
@@ -397,9 +404,19 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
           FlumeFormatter formatter = HDFSFormatterFactory
               .getFormatter(writeFormat);
 
+          WriterCallback idleCallback = null;
+          if(idleTimeout != 0) {
+            idleCallback = new WriterCallback() {
+              @Override
+              public void run(String bucketPath) {
+                sfWriters.remove(bucketPath);
+              }
+            };
+          }
           bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
               batchSize, context, realPath, suffix, codeC, compType, hdfsWriter,
-              formatter, timedRollerPool, proxyTicket, sinkCounter);
+              formatter, timedRollerPool, proxyTicket, sinkCounter, idleTimeout,
+              idleCallback);
 
           sfWriters.put(realPath, bucketWriter);
         }

http://git-wip-us.apache.org/repos/asf/flume/blob/d7747cfa/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 6a8072e..b191ef3 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -70,7 +70,7 @@ public class TestBucketWriter {
     BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
         "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         formatter, timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -94,7 +94,8 @@ public class TestBucketWriter {
     BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
         "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         formatter, timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+        0, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -120,7 +121,8 @@ public class TestBucketWriter {
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
         "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         formatter, timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+        0, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     long startNanos = System.nanoTime();
@@ -202,7 +204,8 @@ public class TestBucketWriter {
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
         tmpFile.getName(), null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         formatter, timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+        0, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < NUM_EVENTS - 1; i++) {
@@ -225,7 +228,8 @@ public class TestBucketWriter {
       BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
           "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
           formatter, timedRollerPool, null,
-          new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
+          new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+          0, null);
 
       // Need to override system time use for test so we know what to expect
       final long testTime = System.currentTimeMillis();
@@ -252,7 +256,8 @@ public class TestBucketWriter {
         BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
             "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
             formatter, timedRollerPool, null,
-            new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
+            new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+            0, null);
 
         // Need to override system time use for test so we know what to expect
 

http://git-wip-us.apache.org/repos/asf/flume/blob/d7747cfa/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index fee4c8b..f93cfca 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -979,4 +979,69 @@ public class TestHDFSEventSink {
     LOG.debug("Starting...");
     slowAppendTestHelper(0);
   }
+
+  @Test
+  public void testCloseOnIdle() throws IOException, EventDeliveryException, InterruptedException
{
+    String hdfsPath = testPath + "/idleClose";
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(hdfsPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+    sink = new HDFSEventSink(new HDFSWriterFactory());
+    Context context = new Context();
+    context.put("hdfs.path", hdfsPath);
+    context.put("hdfs.rollCount", "0");
+    context.put("hdfs.rollSize", "0");
+    context.put("hdfs.rollInterval", "0");
+    context.put("hdfs.batchSize", "2");
+    context.put("hdfs.idleTimeout", "1");
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for(int i=0; i < 10; i++) {
+      Event event = new SimpleEvent();
+      event.setBody(("test event " + i).getBytes());
+      channel.put(event);
+    }
+    txn.commit();
+    txn.close();
+
+    sink.process();
+    sink.process();
+    Thread.sleep(1001);
+    // previous file should have timed out now
+    // this can throw an IOException(from the bucketWriter having idleClosed)
+    // this is not an issue as the sink will retry and get a fresh bucketWriter
+    // so long as the onIdleClose handler properly removes bucket writers that
+    // were closed due to idling
+    sink.process();
+    sink.process();
+    Thread.sleep(500); // shouldn't be enough for a timeout to occur
+    sink.process();
+    sink.process();
+
+    FileStatus[] dirStat = fs.listStatus(dirPath);
+    Path[] fList = FileUtil.stat2Paths(dirStat);
+    Assert.assertEquals(2, fList.length);
+    // one should be tmp and the other not
+    Assert.assertTrue(fList[0].getName().endsWith(".tmp") ^
+        fList[1].getName().endsWith(".tmp"));
+
+    sink.stop();
+    dirStat = fs.listStatus(dirPath);
+    fList = FileUtil.stat2Paths(dirStat);
+    Assert.assertEquals(2, fList.length);
+    Assert.assertTrue(!fList[0].getName().endsWith(".tmp") &&
+        !fList[1].getName().endsWith(".tmp"));
+    fs.close();
+  }
 }


Mime
View raw message