flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From juha...@apache.org
Subject git commit: FLUME-1926. Optionally timeout Avro Sink Rpc Clients to avoid stickiness
Date Thu, 14 Mar 2013 06:02:24 GMT
Updated Branches:
  refs/heads/trunk 082cfb498 -> ce48a126e


FLUME-1926. Optionally timeout Avro Sink Rpc Clients to avoid stickiness

(Hari Shreedharan via Juhani Connolly)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ce48a126
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ce48a126
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ce48a126

Branch: refs/heads/trunk
Commit: ce48a126e8ceebe0fb0eb4048c6b6601688e442d
Parents: 082cfb4
Author: Juhani Connolly <juhani_connolly@cyberagent.co.jp>
Authored: Thu Mar 14 15:01:52 2013 +0900
Committer: Juhani Connolly <juhani_connolly@cyberagent.co.jp>
Committed: Thu Mar 14 15:01:52 2013 +0900

----------------------------------------------------------------------
 .../org/apache/flume/sink/AbstractRpcSink.java     |   40 ++++++++++
 .../java/org/apache/flume/sink/TestAvroSink.java   |   61 +++++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   25 +++---
 3 files changed, 114 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ce48a126/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
index f5699e4..892c949 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -18,8 +18,11 @@
  */
 package org.apache.flume.sink;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -37,6 +40,9 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This sink provides the basic RPC functionality for Flume. This sink takes
@@ -140,6 +146,11 @@ public abstract class AbstractRpcSink extends AbstractSink
   private RpcClient client;
   private Properties clientProps;
   private SinkCounter sinkCounter;
+  private int cxnResetInterval;
+  private final int DEFAULT_CXN_RESET_INTERVAL = 0;
+  private final ScheduledExecutorService cxnResetExecutor = Executors
+    .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+      .setNameFormat("Rpc Sink Reset Thread").build());
 
   @Override
   public void configure(Context context) {
@@ -162,6 +173,13 @@ public abstract class AbstractRpcSink extends AbstractSink
     if (sinkCounter == null) {
       sinkCounter = new SinkCounter(getName());
     }
+    cxnResetInterval = context.getInteger("reset-connection-interval",
+      DEFAULT_CXN_RESET_INTERVAL);
+    if(cxnResetInterval == DEFAULT_CXN_RESET_INTERVAL) {
+      logger.info("Connection reset is set to " + String.valueOf
+        (DEFAULT_CXN_RESET_INTERVAL) +". Will not reset connection to next " +
+        "hop");
+    }
   }
 
   /**
@@ -189,6 +207,14 @@ public abstract class AbstractRpcSink extends AbstractSink
         Preconditions.checkNotNull(client, "Rpc Client could not be " +
           "initialized. " + getName() + " could not be started");
         sinkCounter.incrementConnectionCreatedCount();
+        if (cxnResetInterval > 0) {
+          cxnResetExecutor.schedule(new Runnable() {
+            @Override
+            public void run() {
+              destroyConnection();
+            }
+          }, cxnResetInterval, TimeUnit.SECONDS);
+        }
       } catch (Exception ex) {
         sinkCounter.incrementConnectionFailedCount();
         if (ex instanceof FlumeException) {
@@ -266,6 +292,15 @@ public abstract class AbstractRpcSink extends AbstractSink
     logger.info("Rpc sink {} stopping...", getName());
 
     destroyConnection();
+    cxnResetExecutor.shutdown();
+    try {
+      if (cxnResetExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        cxnResetExecutor.shutdownNow();
+      }
+    } catch (Exception ex) {
+      logger.error("Interrupted while waiting for connection reset executor " +
+        "to shut down");
+    }
     sinkCounter.stop();
     super.stop();
 
@@ -338,4 +373,9 @@ public abstract class AbstractRpcSink extends AbstractSink
 
     return status;
   }
+
+  @VisibleForTesting
+  RpcClient getUnderlyingClient() {
+    return client;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/ce48a126/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index 3b1c8db..ac47ee9 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -43,6 +43,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.api.RpcClient;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
@@ -264,6 +265,66 @@ public class TestAvroSink {
     server.close();
   }
 
+  @Test
+  public void testReset() throws Exception {
+
+    setUp();
+    Server server = createServer(new MockAvroServer());
+
+    server.start();
+
+    Context context = new Context();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+    context.put("reset-connection-interval", String.valueOf("5"));
+
+    sink.setChannel(channel);
+    Configurables.configure(sink, context);
+    sink.start();
+    RpcClient firstClient = sink.getUnderlyingClient();
+    Thread.sleep(6000);
+    // Make sure they are not the same object, connection should be reset
+    Assert.assertFalse(firstClient == sink.getUnderlyingClient());
+    sink.stop();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+    context.put("reset-connection-interval", String.valueOf("0"));
+
+    sink.setChannel(channel);
+    Configurables.configure(sink, context);
+    sink.start();
+    firstClient = sink.getUnderlyingClient();
+    Thread.sleep(6000);
+    // Make sure they are the same object, since connection should not be reset
+    Assert.assertTrue(firstClient == sink.getUnderlyingClient());
+    sink.stop();
+
+    context.clear();
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+
+    sink.setChannel(channel);
+    Configurables.configure(sink, context);
+    sink.start();
+    firstClient = sink.getUnderlyingClient();
+    Thread.sleep(6000);
+    // Make sure they are the same object, since connection should not be reset
+    Assert.assertTrue(firstClient == sink.getUnderlyingClient());
+    sink.stop();
+    server.close();
+  }
+
   private Server createServer(AvroSourceProtocol protocol)
       throws IllegalAccessException, InstantiationException {
 

http://git-wip-us.apache.org/repos/asf/flume/blob/ce48a126/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index d72c965..b0dcbfc 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1453,18 +1453,19 @@ hostname / port pair. The events are taken from the configured Channel
in
 batches of the configured batch size.
 Required properties are in **bold**.
 
-===================   =======  ==============================================
-Property Name         Default  Description
-===================   =======  ==============================================
-**channel**           --
-**type**              --       The component type name, needs to be ``avro``.
-**hostname**          --       The hostname or IP address to bind to.
-**port**              --       The port # to listen on.
-batch-size            100      number of event to batch together for send.
-connect-timeout       20000    Amount of time (ms) to allow for the first (handshake) request.
-request-timeout       20000    Amount of time (ms) to allow for requests after the first.
-compression-type      none     This can be "none" or "deflate".  The compression-type must
match the compression-type of matching AvroSource
-compression-level     6	       The level of compression to compress event. 0 = no compression
and 1-9 is compression.  The higher the number the more compression
+==========================   =======  ==============================================
+Property Name                Default  Description
+==========================   =======  ==============================================
+**channel**                  --
+**type**                     --       The component type name, needs to be ``avro``.
+**hostname**                 --       The hostname or IP address to bind to.
+**port**                     --       The port # to listen on.
+batch-size                   100      number of event to batch together for send.
+connect-timeout              20000    Amount of time (ms) to allow for the first (handshake)
request.
+request-timeout              20000    Amount of time (ms) to allow for requests after the
first.
+connection-reset-interval    none     Amount of time (s) before the connection to the next
hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow
the sink to connect to hosts behind a hardware load-balancer when news hosts are added without
having to restart the agent.
+compression-type             none     This can be "none" or "deflate".  The compression-type
must match the compression-type of matching AvroSource
+compression-level            6	      The level of compression to compress event. 0 = no compression
and 1-9 is compression.  The higher the number the more compression
 ===================   =======  ==============================================
 
 Example for agent named a1:


Mime
View raw message