flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [48/50] [abbrv] git commit: FLUME-1401: Asynchbase sink should be configurable to support timeout
Date Fri, 07 Sep 2012 23:28:53 GMT
FLUME-1401: Asynchbase sink should be configurable to support timeout

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/be92478e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/be92478e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/be92478e

Branch: refs/heads/cdh-1.2.0+24_intuit
Commit: be92478e464c47ee015b9cf039f1a00a3815ac84
Parents: 534b6af
Author: Brock Noland <brock@apache.org>
Authored: Mon Jul 30 15:02:08 2012 -0500
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Sep 7 13:08:14 2012 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   20 ++--
 .../apache/flume/sink/hbase/AsyncHBaseSink.java    |  111 ++++++++-------
 .../hbase/HBaseSinkConfigurationConstants.java     |    3 +
 3 files changed, 75 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/be92478e/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index e2b7b83..40e385f 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -30,7 +30,7 @@ different sources to a centralized data store.
 
 Apache Flume is a top level project at the Apache Software Foundation.
 There are currently two release code lines available, versions 0.9.x and 1.x.
-This documentation applies to the 1.x codeline.  
+This documentation applies to the 1.x codeline.
 Please click here for
 `the Flume 0.9.x User Guide <http://archive.cloudera.com/cdh/3/flume/UserGuide/>`_.
 
@@ -155,7 +155,7 @@ A simple example
 Here, we give an example configuration file, describing a single-node Flume deployment. This
configuration lets a user generate events and subsequently logs them to the console.
 
 .. code-block:: properties
-   
+
   # example.conf: A single-node Flume configuration
 
   # Name the components on this agent
@@ -175,7 +175,7 @@ Here, we give an example configuration file, describing a single-node
Flume depl
   agent1.channels.channel1.type = memory
   agent1.channels.channel1.capacity = 1000
   agent1.channels.channel1.transactionCapactiy = 100
- 
+
   # Bind the source and sink to the channel
   agent1.sources.source1.channels = channel1
   agent1.sinks.sink1.channel = channel1
@@ -643,7 +643,7 @@ interceptors.*
              of indicating to the application writing the log file that it needs to
              retain the log or that the event hasn't been sent, for some reason. If
              this doesn't make sense, you need only know this: Your application can
-             never guarantee data has been received when using a unidirectional 
+             never guarantee data has been received when using a unidirectional
              asynchronous interface such as ExecSource! As an extension of this
              warning - and to be completely clear - there is absolutely zero guarantee
              of event delivery when using this source. You have been warned.
@@ -1204,17 +1204,19 @@ This sink is still experimental.
 The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.
 Required properties are in **bold**.
 
-================  ============================================================  =============================================================================
+================  ============================================================  ====================================================================================
 Property Name     Default                                                       Description
-================  ============================================================  =============================================================================
+================  ============================================================  ====================================================================================
 **channel**       --
 **type**          --                                                            The component
type name, needs to be ``org.apache.flume.sink.AsyncHBaseSink``
 **table**         --                                                            The name
of the table in Hbase to write to.
 **columnFamily**  --                                                            The column
family in Hbase to write to.
 batchSize         100                                                           Number of
events to be written per txn.
+timeout           --                                                            The length
of time (in milliseconds) the sink waits for acks from hbase for
+                                                                                all events
in a transaction. If no timeout is specified, the sink will wait forever.
 serializer        org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
 serializer.*      --                                                            Properties
to be passed to the serializer.
-================  ============================================================  =============================================================================
+================  ============================================================  ====================================================================================
 
 Example for agent named **agent_foo**:
 
@@ -1361,8 +1363,8 @@ keep-alive            3                                 Amount of time
(in sec)
 write-timeout         3                                 Amount of time (in sec) to wait for
a write operation
 ====================  ================================  ========================================================
 
-.. note:: By default the File Channel uses paths for checkpoint and data 
-          directories that are within the user home as specified above. 
+.. note:: By default the File Channel uses paths for checkpoint and data
+          directories that are within the user home as specified above.
           As a result if you have more than one File Channel instances
           active within the agent, only one will be able to lock the
           directories and cause the other channel initialization to fail.

http://git-wip-us.apache.org/repos/asf/flume/blob/be92478e/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 6df1f33..1598f26 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -43,10 +43,12 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.stumbleupon.async.Callback;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.flume.ChannelException;
 import org.apache.flume.instrumentation.SinkCounter;
 
 /**
@@ -74,6 +76,9 @@ import org.apache.flume.instrumentation.SinkCounter;
 * maximum number of events the sink will commit per transaction. The default
 * batch size is 100 events.
 * <p>
+* <tt>timeout: </tt> The length of time in milliseconds the sink waits for
+* callbacks from hbase for all events in a transaction.
+* If no timeout is specified, the sink will wait forever.<p>
 *
 * <strong>Note: </strong> Hbase does not guarantee atomic commits on multiple
 * rows. So if a subset of events in a batch are written to disk by Hbase and
@@ -99,6 +104,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable
{
   private Transaction txn;
   private volatile boolean open = false;
   private SinkCounter sinkCounter;
+  private long timeout;
 
   public AsyncHBaseSink(){
     conf = HBaseConfiguration.create();
@@ -145,35 +151,40 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable
{
 
     Status status = Status.READY;
     Channel channel = getChannel();
-    txn = channel.getTransaction();
-    txn.begin();
     int i = 0;
-    for (; i < batchSize; i++) {
-      Event event = channel.take();
-      if (event == null) {
-        status = Status.BACKOFF;
-        if (i == 0) {
-          sinkCounter.incrementBatchEmptyCount();
+    try {
+      txn = channel.getTransaction();
+      txn.begin();
+      for (; i < batchSize; i++) {
+        Event event = channel.take();
+        if (event == null) {
+          status = Status.BACKOFF;
+          if (i == 0) {
+            sinkCounter.incrementBatchEmptyCount();
+          } else {
+            sinkCounter.incrementBatchUnderflowCount();
+          }
+          break;
         } else {
-          sinkCounter.incrementBatchUnderflowCount();
-        }
-        break;
-      } else {
-        serializer.setEvent(event);
-        List<PutRequest> actions = serializer.getActions();
-        List<AtomicIncrementRequest> increments = serializer.getIncrements();
-        callbacksExpected.addAndGet(actions.size() + increments.size());
+          serializer.setEvent(event);
+          List<PutRequest> actions = serializer.getActions();
+          List<AtomicIncrementRequest> increments = serializer.getIncrements();
+          callbacksExpected.addAndGet(actions.size() + increments.size());
 
-        for (PutRequest action : actions) {
-          client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
-        }
-        for (AtomicIncrementRequest increment : increments) {
-          client.atomicIncrement(increment).addCallbacks(
-                  incrementSuccessCallback, incrementFailureCallback);
+          for (PutRequest action : actions) {
+            client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
+          }
+          for (AtomicIncrementRequest increment : increments) {
+            client.atomicIncrement(increment).addCallbacks(
+                    incrementSuccessCallback, incrementFailureCallback);
+          }
         }
       }
+    } catch (Throwable e) {
+      this.handleTransactionFailure(txn);
+      this.checkIfChannelExceptionAndThrow(e);
     }
-    if(i == batchSize) {
+    if (i == batchSize) {
       sinkCounter.incrementBatchCompleteCount();
     }
     sinkCounter.addToEventDrainAttemptCount(i);
@@ -183,14 +194,14 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable
{
       while ((callbacksReceived.get() < callbacksExpected.get())
               && !txnFail.get()) {
         try {
-          condition.await();
-        } catch (InterruptedException ex) {
-          logger.error("Interrupted while waiting for callbacks from HBase.");
-          try {
-            txn.rollback();
-          } finally {
-            txn.close();
+          if(!condition.await(timeout, TimeUnit.MILLISECONDS)){
+            txnFail.set(true);
+            logger.warn("HBase callbacks timed out. "
+                    + "Transaction will be rolled back.");
           }
+        } catch (Exception ex) {
+          logger.error("Exception while waiting for callbacks from HBase.");
+          this.handleTransactionFailure(txn);
           Throwables.propagate(ex);
         }
       }
@@ -215,28 +226,11 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable
{
     } else {
       try{
         txn.commit();
+        txn.close();
         sinkCounter.addToEventDrainSuccessCount(i);
       } catch (Throwable e) {
-        try{
-          txn.rollback();
-        } catch (Exception e2) {
-          logger.error("Exception in rollback. Rollback might not have been" +
-              "successful." , e2);
-        }
-        logger.error("Failed to commit transaction." +
-            "Transaction rolled back.", e);
-        if(e instanceof Error || e instanceof RuntimeException){
-          logger.error("Failed to commit transaction." +
-              "Transaction rolled back.", e);
-          Throwables.propagate(e);
-        } else {
-          logger.error("Failed to commit transaction." +
-              "Transaction rolled back.", e);
-          throw new EventDeliveryException("Failed to commit transaction." +
-              "Transaction rolled back.", e);
-        }
-      } finally {
-        txn.close();
+        this.handleTransactionFailure(txn);
+        this.checkIfChannelExceptionAndThrow(e);
       }
     }
 
@@ -283,6 +277,13 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable
{
     if(sinkCounter == null) {
       sinkCounter = new SinkCounter(this.getName());
     }
+    timeout = context.getLong(HBaseSinkConfigurationConstants.CONFIG_TIMEOUT,
+            HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT);
+    if(timeout <= 0){
+      logger.warn("Timeout should be positive for Hbase sink. "
+              + "Sink will not timeout.");
+      timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT;
+    }
   }
   @Override
   public void start(){
@@ -419,4 +420,14 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable
{
       return null;
     }
   }
+
+  private void checkIfChannelExceptionAndThrow(Throwable e)
+          throws EventDeliveryException {
+    if (e instanceof ChannelException) {
+      throw new EventDeliveryException("Error in processing transaction.", e);
+    } else if (e instanceof Error || e instanceof RuntimeException) {
+      Throwables.propagate(e);
+    }
+    throw new EventDeliveryException("Error in processing transaction.", e);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/be92478e/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index a16cda8..62f7097 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -44,5 +44,8 @@ public class HBaseSinkConfigurationConstants {
    */
   public static final String CONFIG_SERIALIZER_PREFIX = CONFIG_SERIALIZER + ".";
 
+  public static final String CONFIG_TIMEOUT = "timeout";
+
+  public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
 
 }


Mime
View raw message