flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1347216 - in /incubator/flume/trunk: flume-ng-doc/sphinx/ flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/ flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/
Date Thu, 07 Jun 2012 00:28:30 GMT
Author: arvind
Date: Thu Jun  7 00:28:30 2012
New Revision: 1347216

URL: http://svn.apache.org/viewvc?rev=1347216&view=rev
Log:
FLUME-1238. Support active rolling of files created by HDFS Event Sink.

(Mike Percy via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java

Modified: incubator/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst?rev=1347216&r1=1347215&r2=1347216&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst (original)
+++ incubator/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst Thu Jun  7 00:28:30 2012
@@ -812,7 +812,8 @@ hdfs.maxOpenFiles       5000
 hdfs.writeFormat        --            "Text" or "Writable"
 hdfs.appendTimeout      1000
 hdfs.callTimeout        5000
-hdfs.threadsPoolSize    10
+hdfs.threadsPoolSize    10            Number of threads per HDFS sink for HDFS IO ops (open,
write, etc.)
+hdfs.rollTimerPoolSize  1             Number of threads per HDFS sink for scheduling timed
file rolling
 hdfs.kerberosPrincipal  --            Kerberos user principal for accessing secure HDFS
 hdfs.kerberosKeytab     --            Kerberos keytab for accessing secure HDFS
 serializer              ``TEXT``      Other possible options include ``AVRO_EVENT`` or the

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1347216&r1=1347215&r2=1347216&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
(original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
Thu Jun  7 00:28:30 2012
@@ -19,8 +19,12 @@
 
 package org.apache.flume.sink.hdfs;
 
-
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -30,13 +34,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Internal API intended for HDFSSink use.
  * This class does file rolling and handles file formats and serialization.
- * The methods in this class are NOT THREAD SAFE.
+ * Only the public methods in this class are thread safe.
  */
 class BucketWriter {
 
@@ -48,59 +53,111 @@ class BucketWriter {
    * This lock ensures that only one thread can open a file at a time.
    */
   private static final Integer staticLock = new Integer(1);
-  private HDFSWriter writer;
-  private FlumeFormatter formatter;
+
+  private final HDFSWriter writer;
+  private final FlumeFormatter formatter;
+  private final long rollInterval;
+  private final long rollSize;
+  private final long rollCount;
+  private final long batchSize;
+  private final CompressionCodec codeC;
+  private final CompressionType compType;
+  private final Context context;
+  private final ScheduledExecutorService timedRollerPool;
+  private final UserGroupInformation user;
+
+  private final AtomicLong fileExtensionCounter;
+
   private long eventCounter;
   private long processSize;
-  private long lastRollTime;
-  private long rollInterval;
-  private long rollSize;
-  private long rollCount;
-  private long batchSize;
-  private CompressionCodec codeC;
-  private CompressionType compType;
+
   private FileSystem fileSystem;
-  private Context context;
 
   private volatile String filePath;
   private volatile String bucketPath;
   private volatile long batchCounter;
   private volatile boolean isOpen;
+  private volatile ScheduledFuture<Void> timedRollFuture;
 
-  private final AtomicLong fileExtensionCounter;
+  BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
+      Context context, String filePath, CompressionCodec codeC,
+      CompressionType compType, HDFSWriter writer, FlumeFormatter formatter,
+      ScheduledExecutorService timedRollerPool, UserGroupInformation user) {
+    this.rollInterval = rollInterval;
+    this.rollSize = rollSize;
+    this.rollCount = rollCount;
+    this.batchSize = batchSize;
+    this.context = context;
+    this.filePath = filePath;
+    this.codeC = codeC;
+    this.compType = compType;
+    this.writer = writer;
+    this.formatter = formatter;
+    this.timedRollerPool = timedRollerPool;
+    this.user = user;
+
+    fileExtensionCounter = new AtomicLong(System.currentTimeMillis());
+
+    isOpen = false;
+    writer.configure(context);
+  }
+
+  /**
+   * Allow methods to act as another user (typically used for HDFS Kerberos)
+   * @param <T>
+   * @param action
+   * @return
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
+      throws IOException, InterruptedException {
+
+    if (user != null) {
+      return user.doAs(action);
+    } else {
+      try {
+        return action.run();
+      } catch (IOException ex) {
+        throw ex;
+      } catch (InterruptedException ex) {
+        throw ex;
+      } catch (RuntimeException ex) {
+        throw ex;
+      } catch (Exception ex) {
+        throw new RuntimeException("Unexpected exception.", ex);
+      }
+    }
+  }
 
-  // clear the class counters
+  /**
+   * Clear the class counters
+   */
   private void resetCounters() {
     eventCounter = 0;
     processSize = 0;
-    lastRollTime = System.currentTimeMillis();
     batchCounter = 0;
   }
 
-  BucketWriter(long rollInt, long rollSz, long rollCnt, long bSize,
-      Context ctx, String fPath, CompressionCodec codec, CompressionType cType,
-      HDFSWriter hWriter, FlumeFormatter fmt) {
-    rollInterval = rollInt;
-    rollSize = rollSz;
-    rollCount = rollCnt;
-    batchSize = bSize;
-    context = ctx;
-    filePath = fPath;
-    codeC = codec;
-    compType = cType;
-    writer = hWriter;
-    formatter = fmt;
-    isOpen = false;
-
-    fileExtensionCounter = new AtomicLong(System.currentTimeMillis());
-    writer.configure(context);
+  /**
+   * open() is called by append()
+   * @throws IOException
+   */
+  private void open() throws IOException, InterruptedException {
+    runPrivileged(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        doOpen();
+        return null;
+      }
+    });
   }
 
   /**
-   * open() is called by append()
+   * doOpen() must only be called by open()
    * @throws IOException
    */
-  private void open() throws IOException {
+  private void doOpen() throws IOException {
     if ((filePath == null) || (writer == null) || (formatter == null)) {
       throw new IOException("Invalid file settings");
     }
@@ -134,6 +191,22 @@ class BucketWriter {
     }
 
     resetCounters();
+
+    // if time-based rolling is enabled, schedule the roll
+    if (rollInterval > 0) {
+      Callable<Void> action = new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
+              bucketPath + IN_USE_EXT, rollInterval);
+          close();
+          return null;
+        }
+      };
+      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
+          TimeUnit.SECONDS);
+    }
+
     isOpen = true;
   }
 
@@ -142,7 +215,21 @@ class BucketWriter {
    * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
    * @throws IOException On failure to rename if temp file exists.
    */
-  public synchronized void close() throws IOException {
+  public synchronized void close() throws IOException, InterruptedException {
+    runPrivileged(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        doClose();
+        return null;
+      }
+    });
+  }
+
+  /**
+   * doClose() must only be called by close()
+   * @throws IOException
+   */
+  private void doClose() throws IOException {
     LOG.debug("Closing {}", bucketPath + IN_USE_EXT);
     if (isOpen) {
       try {
@@ -155,6 +242,13 @@ class BucketWriter {
     } else {
       LOG.info("HDFSWriter is already closed: {}", bucketPath + IN_USE_EXT);
     }
+
+    // NOTE: timed rolls go through this codepath as well as other roll types
+    if (timedRollFuture != null && !timedRollFuture.isDone()) {
+      timedRollFuture.cancel(false); // do not cancel myself if running!
+      timedRollFuture = null;
+    }
+
     if (bucketPath != null && fileSystem != null) {
       renameBucket(); // could block or throw IOException
       fileSystem = null;
@@ -164,7 +258,21 @@ class BucketWriter {
   /**
    * flush the data
    */
-  public synchronized void flush() throws IOException {
+  public synchronized void flush() throws IOException, InterruptedException {
+    runPrivileged(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        doFlush();
+        return null;
+      }
+    });
+  }
+
+  /**
+   * doFlush() must only be called by flush()
+   * @throws IOException
+   */
+  private void doFlush() throws IOException {
     writer.sync(); // could block
     batchCounter = 0;
   }
@@ -174,11 +282,11 @@ class BucketWriter {
    * batching / flushing. <br />
    * If the write fails, the file is implicitly closed and then the IOException
    * is rethrown. <br />
-   * We rotate before append, and not after, so that the lastRollTime counter
-   * that is reset by the open() call approximately reflects when the first
-   * event was written to it.
+   * We rotate before append, and not after, so that the active file rolling
+   * mechanism will never roll an empty file. This also ensures that the file
+   * creation time reflects when the first event was written.
    */
-  public synchronized void append(Event event) throws IOException {
+  public synchronized void append(Event event) throws IOException, InterruptedException {
     if (!isOpen) {
       open();
     }
@@ -221,12 +329,6 @@ class BucketWriter {
   private boolean shouldRotate() {
     boolean doRotate = false;
 
-    long elapsed = (System.currentTimeMillis() - lastRollTime) / 1000L;
-    if ((rollInterval > 0) && (rollInterval <= elapsed)) {
-      LOG.debug("rolling: rollTime: {}, elapsed: {}", rollInterval, elapsed);
-      doRotate = true;
-    }
-
     if ((rollCount > 0) && (rollCount <= eventCounter)) {
       LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
       doRotate = true;

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1347216&r1=1347215&r2=1347216&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
(original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
Thu Jun  7 00:28:30 2012
@@ -56,6 +56,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ScheduledExecutorService;
 
 public class HDFSEventSink extends AbstractSink implements Configurable {
   private static final Logger LOG = LoggerFactory
@@ -82,6 +84,7 @@ public class HDFSEventSink extends Abstr
    * case we create a new file and move on.
    */
   private static final int defaultThreadPoolSize = 10;
+  private static final int defaultRollTimerPoolSize = 1;
 
   /**
    * Singleton credential manager that manages static credentials for the
@@ -91,7 +94,7 @@ public class HDFSEventSink extends Abstr
       = new AtomicReference<KerberosUser>();
 
   private final HDFSWriterFactory writerFactory;
-  private final WriterLinkedHashMap sfWriters;
+  private WriterLinkedHashMap sfWriters;
 
   private long rollInterval;
   private long rollSize;
@@ -99,13 +102,15 @@ public class HDFSEventSink extends Abstr
   private long txnEventMax;
   private long batchSize;
   private int threadsPoolSize;
+  private int rollTimerPoolSize;
   private CompressionCodec codeC;
   private CompressionType compType;
   private String fileType;
   private String path;
   private int maxOpenFiles;
   private String writeFormat;
-  private ExecutorService executor;
+  private ExecutorService callTimeoutPool;
+  private ScheduledExecutorService timedRollerPool;
 
   private String kerbConfPrincipal;
   private String kerbKeytab;
@@ -120,25 +125,31 @@ public class HDFSEventSink extends Abstr
   private Context context;
 
   /*
-   * Extended Java LinkedHashMap for open file handle LRU queue We want to clear
-   * the oldest file handle if there are too many open ones
+   * Extended Java LinkedHashMap for open file handle LRU queue.
+   * We want to clear the oldest file handle if there are too many open ones.
    */
-  private class WriterLinkedHashMap extends LinkedHashMap<String, BucketWriter> {
-    private static final long serialVersionUID = 1L;
+  private static class WriterLinkedHashMap
+      extends LinkedHashMap<String, BucketWriter> {
+
+    private final int maxOpenFiles;
+
+    public WriterLinkedHashMap(int maxOpenFiles) {
+      super(16, 0.75f, true); // stock initial capacity/load, access ordering
+      this.maxOpenFiles = maxOpenFiles;
+    }
 
     @Override
     protected boolean removeEldestEntry(Entry<String, BucketWriter> eldest) {
-      /*
-       * FIXME: We probably shouldn't shared state this way. Make this class
-       * private static and explicitly expose maxOpenFiles.
-       */
-      if (super.size() > maxOpenFiles) {
+      if (size() > maxOpenFiles) {
         // If we have more that max open files, then close the last one and
         // return true
         try {
           eldest.getValue().close();
-        } catch (IOException eI) {
-          LOG.warn(eldest.getKey().toString(), eI);
+        } catch (IOException e) {
+          LOG.warn(eldest.getKey().toString(), e);
+        } catch (InterruptedException e) {
+          LOG.warn(eldest.getKey().toString(), e);
+          Thread.currentThread().interrupt();
         }
         return true;
       } else {
@@ -147,43 +158,12 @@ public class HDFSEventSink extends Abstr
     }
   }
 
-  /**
-   * Helper class to wrap authentication calls.
-   * @param <T> generally should be {@link Void}
-   */
-  private static abstract class ProxyCallable<T> implements Callable<T> {
-    private UserGroupInformation proxyTicket;
-
-    public ProxyCallable(UserGroupInformation proxyTicket) {
-      this.proxyTicket = proxyTicket;
-    }
-
-    @Override
-    public T call() throws Exception {
-      if (proxyTicket == null) {
-        return doCall();
-      } else {
-        return proxyTicket.doAs(new PrivilegedExceptionAction<T>() {
-
-          @Override
-          public T run() throws Exception {
-            return doCall();
-          }
-        });
-      }
-    }
-
-    abstract public T doCall() throws Exception;
-  }
-
-
   public HDFSEventSink() {
     this(new HDFSWriterFactory());
   }
 
   public HDFSEventSink(HDFSWriterFactory writerFactory) {
     this.writerFactory = writerFactory;
-    this.sfWriters = new WriterLinkedHashMap();
   }
 
     // read configuration and setup thresholds
@@ -206,7 +186,10 @@ public class HDFSEventSink extends Abstr
     maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
     writeFormat = context.getString("hdfs.writeFormat");
     callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
-    threadsPoolSize = context.getInteger("hdfs.threadsPoolSize", defaultThreadPoolSize);
+    threadsPoolSize = context.getInteger("hdfs.threadsPoolSize",
+        defaultThreadPoolSize);
+    rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
+        defaultRollTimerPoolSize);
     kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
     kerbKeytab = context.getString("hdfs.kerberosKeytab", "");
     proxyUserName = context.getString("hdfs.proxyUser", "");
@@ -323,33 +306,14 @@ public class HDFSEventSink extends Abstr
   /**
    * Execute the callable on a separate thread and wait for the completion
    * for the specified amount of time in milliseconds. In case of timeout
-   * or any other error, log error and return null.
-   */
-  private static <T> T callWithTimeoutLogError(final ExecutorService executor,
-      long timeout, String name, final Callable<T> callable) {
-    try {
-      return callWithTimeout(executor, timeout, callable);
-    } catch (Exception e) {
-      LOG.error(name + "; called " + callable, e);
-      if(e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Execute the callable on a separate thread and wait for the completion
-   * for the specified amount of time in milliseconds. In case of timeout
    * cancel the callable and throw an IOException
    */
-  private static <T> T callWithTimeout(final ExecutorService executor,
-      long timeout, final Callable<T> callable)
+  private <T> T callWithTimeout(Callable<T> callable)
       throws IOException, InterruptedException {
-    Future<T> future = executor.submit(callable);
+    Future<T> future = callTimeoutPool.submit(callable);
     try {
-      if (timeout > 0) {
-        return future.get(timeout, TimeUnit.MILLISECONDS);
+      if (callTimeout > 0) {
+        return future.get(callTimeout, TimeUnit.MILLISECONDS);
       } else {
         return future.get();
       }
@@ -405,14 +369,13 @@ public class HDFSEventSink extends Abstr
 
         // we haven't seen this file yet, so open it and cache the handle
         if (bucketWriter == null) {
-
           HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
           FlumeFormatter formatter = HDFSFormatterFactory
               .getFormatter(writeFormat);
 
           bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
               batchSize, context, realPath, codeC, compType, hdfsWriter,
-              formatter);
+              formatter, timedRollerPool, proxyTicket);
 
           sfWriters.put(realPath, bucketWriter);
         }
@@ -423,32 +386,14 @@ public class HDFSEventSink extends Abstr
         }
 
         // Write the data to HDFS
-        final BucketWriter callableWriter = bucketWriter;
-        final Event callableEvent = event;
-        callWithTimeout(executor, callTimeout,
-            new ProxyCallable<Void>(proxyTicket) {
-          @Override
-          public Void doCall() throws Exception {
-            callableWriter.append(callableEvent);
-            return null;
-          }
-        });
+        append(bucketWriter, event);
       }
 
       // flush all pending buckets before committing the transaction
-      for (BucketWriter writer : writers) {
-        if (writer.isBatchComplete()) {
-          continue;
-        }
-        final BucketWriter callableWriter = writer;
-        callWithTimeout(executor, callTimeout,
-            new ProxyCallable<Void>(proxyTicket) {
-          @Override
-          public Void doCall() throws Exception {
-            callableWriter.flush();
-            return null;
-          }
-        });
+      for (BucketWriter bucketWriter : writers) {
+        if (!bucketWriter.isBatchComplete()) {
+          flush(bucketWriter);
+        }
       }
 
       transaction.commit();
@@ -477,36 +422,54 @@ public class HDFSEventSink extends Abstr
   public void stop() {
     // do not constrain close() calls with a timeout
     for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
-      LOG.info("Closing " + entry.getKey());
-      final BucketWriter callableWriter = entry.getValue();
-      callWithTimeoutLogError(executor, callTimeout, "close on " +
-          entry.getKey(), new ProxyCallable<Void>(proxyTicket) {
+      LOG.info("Closing {}", entry.getKey());
 
-        @Override
-        public Void doCall() throws Exception {
-          callableWriter.close();
-          return null;
+      final BucketWriter callableWriter = entry.getValue();
+      try {
+        close(entry.getValue());
+      } catch (Exception ex) {
+        LOG.warn("Exception while closing " + entry.getKey() + ". " +
+            "Exception follows.", ex);
+        if (ex instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
         }
-      });
+      }
     }
 
-    sfWriters.clear();
-    executor.shutdown();
-    try {
-      while (executor.isTerminated() == false) {
-        executor.awaitTermination(Math.max(defaultCallTimeout, callTimeout),
-            TimeUnit.MILLISECONDS);
+    // shut down all our thread pools
+    ExecutorService toShutdown[] = { callTimeoutPool, timedRollerPool };
+    for (ExecutorService execService : toShutdown) {
+      execService.shutdown();
+      try {
+        while (execService.isTerminated() == false) {
+          execService.awaitTermination(
+              Math.max(defaultCallTimeout, callTimeout), TimeUnit.MILLISECONDS);
+        }
+      } catch (InterruptedException ex) {
+        LOG.warn("shutdown interrupted on " + execService, ex);
       }
-    } catch (InterruptedException ex) {
-      LOG.warn("shutdown interrupted", ex);
     }
-    executor = null;
+
+    callTimeoutPool = null;
+    timedRollerPool = null;
+
+    sfWriters.clear();
+    sfWriters = null;
+
     super.stop();
   }
 
   @Override
   public void start() {
-    executor = Executors.newFixedThreadPool(threadsPoolSize);
+    String timeoutName = "hdfs-" + getName() + "-call-runner-%d";
+    callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
+        new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+
+    String rollerName = "hdfs-" + getName() + "-roll-timer-%d";
+    timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
+        new ThreadFactoryBuilder().setNameFormat(rollerName).build());
+
+    this.sfWriters = new WriterLinkedHashMap(maxOpenFiles);
     super.start();
   }
 
@@ -695,4 +658,49 @@ public class HDFSEventSink extends Abstr
         " }";
   }
 
+  /**
+   * Append to bucket writer with timeout enforced
+   */
+  private void append(final BucketWriter bucketWriter, final Event event)
+      throws IOException, InterruptedException {
+
+    // Write the data to HDFS
+    callWithTimeout(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        bucketWriter.append(event);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Flush bucket writer with timeout enforced
+   */
+  private void flush(final BucketWriter bucketWriter)
+      throws IOException, InterruptedException {
+
+    callWithTimeout(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        bucketWriter.flush();
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Close bucket writer with timeout enforced
+   */
+  private void close(final BucketWriter bucketWriter)
+      throws IOException, InterruptedException {
+
+    callWithTimeout(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        bucketWriter.close();
+        return null;
+      }
+    });
+  }
 }

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java?rev=1347216&r1=1347215&r2=1347216&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
(original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
Thu Jun  7 00:28:30 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.compress.Com
 public class MockHDFSWriter implements HDFSWriter {
 
   private int filesOpened = 0;
+  private int filesClosed = 0;
   private int bytesWritten = 0;
   private int eventsWritten = 0;
 
@@ -35,6 +36,10 @@ public class MockHDFSWriter implements H
     return filesOpened;
   }
 
+  public int getFilesClosed() {
+    return filesClosed;
+  }
+
   public int getBytesWritten() {
     return bytesWritten;
   }
@@ -45,6 +50,7 @@ public class MockHDFSWriter implements H
 
   public void clear() {
     filesOpened = 0;
+    filesClosed = 0;
     bytesWritten = 0;
     eventsWritten = 0;
   }
@@ -77,7 +83,7 @@ public class MockHDFSWriter implements H
 
   @Override
   public void close() throws IOException {
-    // does nothing
+    filesClosed++;
   }
 
 }

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java?rev=1347216&r1=1347215&r2=1347216&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
(original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
Thu Jun  7 00:28:30 2012
@@ -20,12 +20,16 @@ package org.apache.flume.sink.hdfs;
 
 import com.google.common.base.Charsets;
 import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 import org.apache.hadoop.io.SequenceFile;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,14 +40,28 @@ public class TestBucketWriter {
       LoggerFactory.getLogger(TestBucketWriter.class);
   private Context ctx = new Context();
 
+  private static ScheduledExecutorService timedRollerPool;
+
+  @BeforeClass
+  public static void setup() {
+    timedRollerPool = Executors.newSingleThreadScheduledExecutor();
+  }
+
+  @AfterClass
+  public static void teardown() throws InterruptedException {
+    timedRollerPool.shutdown();
+    timedRollerPool.awaitTermination(2, TimeUnit.SECONDS);
+    timedRollerPool.shutdownNow();
+  }
+
   @Test
-  public void testEventCountingRoller() throws IOException {
+  public void testEventCountingRoller() throws IOException, InterruptedException {
     int maxEvents = 100;
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
         "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter);
+        formatter, timedRollerPool, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -60,13 +78,13 @@ public class TestBucketWriter {
   }
 
   @Test
-  public void testSizeRoller() throws IOException {
+  public void testSizeRoller() throws IOException, InterruptedException {
     int maxBytes = 300;
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
         "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter);
+        formatter, timedRollerPool, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -84,22 +102,27 @@ public class TestBucketWriter {
 
   @Test
   public void testIntervalRoller() throws IOException, InterruptedException {
-    int rollInterval = 2; // seconds
+    final int ROLL_INTERVAL = 1; // seconds
+    final int NUM_EVENTS = 10;
+
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     HDFSTextFormatter formatter = new HDFSTextFormatter();
-    BucketWriter bucketWriter = new BucketWriter(rollInterval, 0, 0, 0, ctx,
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
         "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter);
+        formatter, timedRollerPool, null);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     long startNanos = System.nanoTime();
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < NUM_EVENTS - 1; i++) {
       bucketWriter.append(e);
-      if (i % 100 == 0) {
-        Thread.sleep(500L);
-      }
     }
 
+    // sleep to force a roll... wait 2x interval just to be sure
+    Thread.sleep(2 * ROLL_INTERVAL * 1000L);
+
+    // write one more event (to reopen a new file so we will roll again later)
+    bucketWriter.append(e);
+
     long elapsedMillis = TimeUnit.MILLISECONDS.convert(
         System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
     long elapsedSeconds = elapsedMillis / 1000L;
@@ -108,11 +131,22 @@ public class TestBucketWriter {
     logger.info("Number of events written: {}", hdfsWriter.getEventsWritten());
     logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
     logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
+    logger.info("Number of files closed: {}", hdfsWriter.getFilesClosed());
 
-    Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
-    Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
-    Assert.assertEquals("files opened", elapsedSeconds/2 + 1,
-        hdfsWriter.getFilesOpened());
+    Assert.assertEquals("events written", NUM_EVENTS,
+        hdfsWriter.getEventsWritten());
+    Assert.assertEquals("bytes written", e.getBody().length * NUM_EVENTS,
+        hdfsWriter.getBytesWritten());
+    Assert.assertEquals("files opened", 2, hdfsWriter.getFilesOpened());
+
+    // before auto-roll
+    Assert.assertEquals("files closed", 1, hdfsWriter.getFilesClosed());
+
+    logger.info("Waiting for roll...");
+    Thread.sleep(2 * ROLL_INTERVAL * 1000L);
+
+    logger.info("Number of files closed: {}", hdfsWriter.getFilesClosed());
+    Assert.assertEquals("files closed", 2, hdfsWriter.getFilesClosed());
   }
 
 }



Mime
View raw message