flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2154. Reducing duplicate events caused by reset-connection-interval
Date Fri, 09 Aug 2013 06:46:34 GMT
Updated Branches:
  refs/heads/trunk 5d49eeb73 -> 99db32ccd


FLUME-2154. Reducing duplicate events caused by reset-connection-interval

(Juhani Connolly via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/99db32cc
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/99db32cc
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/99db32cc

Branch: refs/heads/trunk
Commit: 99db32ccd163daf9d7685f0e8485941701e1133d
Parents: 5d49eeb
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Aug 8 23:44:59 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Aug 8 23:45:57 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/AbstractRpcSink.java  | 37 ++++++++++++--------
 .../org/apache/flume/sink/TestAvroSink.java     |  6 ++++
 2 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/99db32cc/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
index b3208fc..5146834 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -43,6 +43,7 @@ import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -149,11 +150,11 @@ public abstract class AbstractRpcSink extends AbstractSink
   private Properties clientProps;
   private SinkCounter sinkCounter;
   private int cxnResetInterval;
+  private AtomicBoolean resetConnectionFlag;
   private final int DEFAULT_CXN_RESET_INTERVAL = 0;
   private final ScheduledExecutorService cxnResetExecutor = Executors
     .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
       .setNameFormat("Rpc Sink Reset Thread").build());
-  private final Lock resetLock = new ReentrantLock();
 
   @Override
   public void configure(Context context) {
@@ -206,6 +207,7 @@ public abstract class AbstractRpcSink extends AbstractSink
           "port: {}",
           new Object[] { getName(), hostname, port });
       try {
+        resetConnectionFlag = new AtomicBoolean(false);
         client = initializeRpcClient(clientProps);
         Preconditions.checkNotNull(client, "Rpc Client could not be " +
           "initialized. " + getName() + " could not be started");
@@ -214,17 +216,7 @@ public abstract class AbstractRpcSink extends AbstractSink
           cxnResetExecutor.schedule(new Runnable() {
             @Override
             public void run() {
-              resetLock.lock();
-              try {
-                destroyConnection();
-                createConnection();
-              } catch (Throwable throwable) {
-                //Don't rethrow, else this runnable won't get scheduled again.
-                logger.error("Error while trying to expire connection",
-                  throwable);
-              } finally {
-                resetLock.unlock();
-              }
+              resetConnectionFlag.set(true);
             }
           }, cxnResetInterval, TimeUnit.SECONDS);
         }
@@ -241,6 +233,17 @@ public abstract class AbstractRpcSink extends AbstractSink
 
   }
 
+  private void resetConnection() {
+      try {
+        destroyConnection();
+        createConnection();
+      } catch (Throwable throwable) {
+        //Don't rethrow, else this runnable won't get scheduled again.
+        logger.error("Error while trying to expire connection",
+          throwable);
+      }
+  }
+
   private void destroyConnection() {
     if (client != null) {
       logger.debug("Rpc sink {} closing Rpc client: {}", getName(), client);
@@ -332,7 +335,14 @@ public abstract class AbstractRpcSink extends AbstractSink
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
 
-    resetLock.lock();
+    if(resetConnectionFlag.get()) {
+      resetConnection();
+      // if the time to reset is long and the timeout is short
+      // this may cancel the next reset request
+      // this should however not be an issue
+      resetConnectionFlag.set(false);
+    }
+
     try {
       transaction.begin();
 
@@ -382,7 +392,6 @@ public abstract class AbstractRpcSink extends AbstractSink
         throw new EventDeliveryException("Failed to send events", t);
       }
     } finally {
-      resetLock.unlock();
       transaction.close();
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/99db32cc/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index 8760c25..757a536 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -300,6 +300,12 @@ public class TestAvroSink {
     sink.start();
     RpcClient firstClient = sink.getUnderlyingClient();
     Thread.sleep(6000);
+    Transaction t = channel.getTransaction();
+    t.begin();
+    channel.put(EventBuilder.withBody("This is a test", Charset.defaultCharset()));
+    t.commit();
+    t.close();
+    sink.process();
     // Make sure they are not the same object, connection should be reset
     Assert.assertFalse(firstClient == sink.getUnderlyingClient());
     sink.stop();


Mime
View raw message