flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2320. Fixed Deadlock in DatasetSink
Date Thu, 27 Feb 2014 19:07:23 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.5 3f5db796f -> 2fc69f54b


FLUME-2320. Fixed Deadlock in DatasetSink

(Ryan Blue via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2fc69f54
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2fc69f54
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2fc69f54

Branch: refs/heads/flume-1.5
Commit: 2fc69f54b45588a40fca06942d4530eb17ce51a0
Parents: 3f5db79
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Feb 27 11:06:25 2014 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Feb 27 11:06:25 2014 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   5 +-
 .../org/apache/flume/sink/kite/DatasetSink.java | 128 ++++++-------------
 .../flume/sink/kite/DatasetSinkConstants.java   |   5 -
 .../apache/flume/sink/kite/TestDatasetSink.java |   2 -
 4 files changed, 44 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2fc69f54/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 8390cd2..cd43634 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2038,7 +2038,10 @@ may be found (``hdfs:/...`` URIs are supported). This is compatible
with the
 Log4jAppender flume client and the spooling directory source's Avro
 deserializer using ``deserializer.schemaType = LITERAL``.
 
-Note: The ``flume.avro.schema.hash`` header is **not supported**.
+Note 1: The ``flume.avro.schema.hash`` header is **not supported**.
+Note 2: In some cases, file rolling may occur slightly after the roll interval
+has been exceeded. However, this delay will not exceed 5 seconds. In most
+cases, the delay is neglegible.
 
 =====================  =======  ===========================================================
 Property Name          Default  Description

http://git-wip-us.apache.org/repos/asf/flume/blob/2fc69f54/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
index 9a00fb1..1ee0a1f 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
@@ -18,13 +18,13 @@
 
 package org.apache.flume.sink.kite;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
@@ -32,10 +32,6 @@ import java.net.URL;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.avro.Schema;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
@@ -70,11 +66,6 @@ public class DatasetSink extends AbstractSink implements Configurable {
 
   static Configuration conf = new Configuration();
 
-  /**
-   * Lock used to protect access to the current writer
-   */
-  private final ReentrantLock writerLock = new ReentrantLock(true);
-
   private String repositoryURI = null;
   private String datasetName = null;
   private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE;
@@ -83,8 +74,8 @@ public class DatasetSink extends AbstractSink implements Configurable {
   private SinkCounter counter = null;
 
   // for rolling files at a given interval
-  private ScheduledExecutorService rollTimer;
-  private int rollInterval = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL;
+  private int rollIntervalS = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL;
+  private long lastRolledMs = 0l;
 
   // for working with avro serialized records
   private Object datum = null;
@@ -156,7 +147,7 @@ public class DatasetSink extends AbstractSink implements Configurable
{
     this.batchSize = context.getLong(
         DatasetSinkConstants.CONFIG_KITE_BATCH_SIZE,
         DatasetSinkConstants.DEFAULT_BATCH_SIZE);
-    this.rollInterval = context.getInteger(
+    this.rollIntervalS = context.getInteger(
         DatasetSinkConstants.CONFIG_KITE_ROLL_INTERVAL,
         DatasetSinkConstants.DEFAULT_ROLL_INTERVAL);
 
@@ -166,67 +157,30 @@ public class DatasetSink extends AbstractSink implements Configurable
{
   @Override
   public synchronized void start() {
     this.writer = openWriter(targetDataset);
-    if (rollInterval > 0) {
-      this.rollTimer = Executors.newSingleThreadScheduledExecutor(
-          new ThreadFactoryBuilder()
-              .setNameFormat(getName() + "-timed-roll-thread")
-              .build());
-      rollTimer.scheduleWithFixedDelay(new Runnable() {
-        @Override
-        public void run() {
-          roll();
-        }
-      }, rollInterval, rollInterval, TimeUnit.SECONDS);
-    }
+    this.lastRolledMs = System.currentTimeMillis();
     counter.start();
     // signal that this sink is ready to process
     LOG.info("Started DatasetSink " + getName());
     super.start();
   }
 
-  void roll() {
-    // if the writer is null, nothing to do
-    if (writer == null) {
-      return;
-    }
-
-    // no need to open/close while the lock is held, just replace the reference
-    DatasetWriter toClose = null;
-    DatasetWriter newWriter = openWriter(targetDataset);
-
-    writerLock.lock();
-    try {
-      toClose = writer;
-      this.writer = newWriter;
-    } finally {
-      writerLock.unlock();
-    }
-
-    LOG.info("Rolled writer for dataset: " + datasetName);
-    toClose.close();
+  /**
+   * Causes the sink to roll at the next {@link #process()} call.
+   */
+  @VisibleForTesting
+  public void roll() {
+    this.lastRolledMs = 0l;
   }
 
   @Override
   public synchronized void stop() {
     counter.stop();
-    if (rollTimer != null) {
-      rollTimer.shutdown();
-      try {
-        while (!rollTimer.isTerminated()) {
-          rollTimer.awaitTermination(
-              DatasetSinkConstants.DEFAULT_TERMINATION_INTERVAL,
-              TimeUnit.MILLISECONDS);
-        }
-      } catch (InterruptedException ex) {
-        LOG.warn("Interrupted while waiting for shutdown: " + rollTimer);
-        Thread.interrupted();
-      }
-    }
 
     if (writer != null) {
       // any write problems invalidate the writer, which is immediately closed
       writer.close();
       this.writer = null;
+      this.lastRolledMs = System.currentTimeMillis();
     }
 
     // signal that this sink has stopped
@@ -241,37 +195,41 @@ public class DatasetSink extends AbstractSink implements Configurable
{
           "Cannot recover after previous failure");
     }
 
+    // handle file rolling
+    if ((System.currentTimeMillis() - lastRolledMs) / 1000 > rollIntervalS) {
+      // close the current writer and get a new one
+      writer.close();
+      this.writer = openWriter(targetDataset);
+      this.lastRolledMs = System.currentTimeMillis();
+      LOG.info("Rolled writer for dataset: " + datasetName);
+    }
+
     Channel channel = getChannel();
     Transaction transaction = null;
     try {
       long processedEvents = 0;
 
-      // coarse locking to avoid waiting within the loop
-      writerLock.lock();
       transaction = channel.getTransaction();
       transaction.begin();
-      try {
-        for (; processedEvents < batchSize; processedEvents += 1) {
-          Event event = channel.take();
-          if (event == null) {
-            // no events available in the channel
-            break;
-          }
+      for (; processedEvents < batchSize; processedEvents += 1) {
+        Event event = channel.take();
+        if (event == null) {
+          // no events available in the channel
+          break;
+        }
 
-          this.datum = deserialize(event, datum);
+        this.datum = deserialize(event, datum);
 
-          // writeEncoded would be an optimization in some cases, but HBase
-          // will not support it and partitioned Datasets need to get partition
-          // info from the entity Object. We may be able to avoid the
-          // serialization round-trip otherwise.
-          writer.write(datum);
-        }
-        // TODO: Add option to sync, depends on CDK-203
-        writer.flush();
-      } finally {
-        writerLock.unlock();
+        // writeEncoded would be an optimization in some cases, but HBase
+        // will not support it and partitioned Datasets need to get partition
+        // info from the entity Object. We may be able to avoid the
+        // serialization round-trip otherwise.
+        writer.write(datum);
       }
 
+      // TODO: Add option to sync, depends on CDK-203
+      writer.flush();
+
       // commit after data has been written and flushed
       transaction.commit();
 
@@ -300,16 +258,10 @@ public class DatasetSink extends AbstractSink implements Configurable
{
         }
       }
 
-      // remove the writer's reference and close it
-      DatasetWriter toClose = null;
-      writerLock.lock();
-      try {
-        toClose = writer;
-        this.writer = null;
-      } finally {
-        writerLock.unlock();
-      }
-      toClose.close();
+      // close the writer and remove the its reference
+      writer.close();
+      this.writer = null;
+      this.lastRolledMs = System.currentTimeMillis();
 
       // handle the exception
       Throwables.propagateIfInstanceOf(th, Error.class);

http://git-wip-us.apache.org/repos/asf/flume/blob/2fc69f54/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
index 5087352..13c776e 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
@@ -42,11 +42,6 @@ public class DatasetSinkConstants {
   public static int DEFAULT_ROLL_INTERVAL = 30; // seconds
 
   /**
-   * Interval to wait for thread termination
-   */
-  public static final int DEFAULT_TERMINATION_INTERVAL = 10000; // milliseconds
-
-  /**
    * Headers with avro schema information is expected.
    */
   public static final String AVRO_SCHEMA_LITERAL_HEADER =

http://git-wip-us.apache.org/repos/asf/flume/blob/2fc69f54/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index bd0e1dc..ac275db 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -28,12 +28,10 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.net.URI;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;


Mime
View raw message