flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2238. Provide option to configure worker threads in NettyAvroRpcClient
Date Tue, 10 Dec 2013 00:36:44 GMT
Updated Branches:
  refs/heads/flume-1.5 e371d565b -> 209169bb5


FLUME-2238. Provide option to configure worker threads in NettyAvroRpcClient

(Cameron Gandevia via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/209169bb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/209169bb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/209169bb

Branch: refs/heads/flume-1.5
Commit: 209169bb561f65d58a7140d952c34163d31ab825
Parents: e371d56
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Mon Dec 9 16:35:33 2013 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Mon Dec 9 16:35:33 2013 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 35 ++++----
 .../apache/flume/api/NettyAvroRpcClient.java    | 86 +++++++++++++++-----
 .../api/RpcClientConfigurationConstants.java    |  6 ++
 .../flume/api/TestNettyAvroRpcClient.java       | 79 ++++++++++++++++--
 4 files changed, 162 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 8687cb7..0737c44 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1638,25 +1638,26 @@ hostname / port pair. The events are taken from the configured Channel
in
 batches of the configured batch size.
 Required properties are in **bold**.
 
-==========================   =======  ==============================================
+==========================   =====================================================  ===========================================================================================
 Property Name                Default  Description
-==========================   =======  ==============================================
+==========================   =====================================================  ===========================================================================================
 **channel**                  --
-**type**                     --       The component type name, needs to be ``avro``.
-**hostname**                 --       The hostname or IP address to bind to.
-**port**                     --       The port # to listen on.
-batch-size                   100      number of event to batch together for send.
-connect-timeout              20000    Amount of time (ms) to allow for the first (handshake)
request.
-request-timeout              20000    Amount of time (ms) to allow for requests after the
first.
-reset-connection-interval    none     Amount of time (s) before the connection to the next
hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow
the sink to connect to hosts behind a hardware load-balancer when news hosts are added without
having to restart the agent.
-compression-type             none     This can be "none" or "deflate".  The compression-type
must match the compression-type of matching AvroSource
-compression-level            6        The level of compression to compress event. 0 = no
compression and 1-9 is compression.  The higher the number the more compression
-ssl                          false    Set to true to enable SSL for this AvroSink. When configuring
SSL, you can optionally set a "truststore", "truststore-password", "truststore-type", and
specify whether to "trust-all-certs".
-trust-all-certs              false    If this is set to true, SSL server certificates for
remote servers (Avro Sources) will not be checked. This should NOT be used in production because
it makes it easier for an attacker to execute a man-in-the-middle attack and "listen in" on
the encrypted connection.
-truststore                   --       The path to a custom Java truststore file. Flume uses
the certificate authority information in this file to determine whether the remote Avro Source's
SSL authentication credentials should be trusted. If not specified, the default Java JSSE
certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will
be used.
-truststore-password          --       The password for the specified truststore.
-truststore-type              JKS      The type of the Java truststore. This can be "JKS"
or other supported Java truststore type.
-==========================   =======  ==============================================
+**type**                     --                                                     The component
type name, needs to be ``avro``.
+**hostname**                 --                                                     The hostname
or IP address to bind to.
+**port**                     --                                                     The port
# to listen on.
+batch-size                   100                                                    number
of event to batch together for send.
+connect-timeout              20000                                                  Amount
of time (ms) to allow for the first (handshake) request.
+request-timeout              20000                                                  Amount
of time (ms) to allow for requests after the first.
+reset-connection-interval    none                                                   Amount
of time (s) before the connection to the next hop is reset. This will force the Avro Sink
to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware
load-balancer when news hosts are added without having to restart the agent.
+compression-type             none                                                   This
can be "none" or "deflate".  The compression-type must match the compression-type of matching
AvroSource
+compression-level            6                                                      The level
of compression to compress event. 0 = no compression and 1-9 is compression.  The higher the
number the more compression
+ssl                          false                                                  Set to
true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a "truststore",
"truststore-password", "truststore-type", and specify whether to "trust-all-certs".
+trust-all-certs              false                                                  If this
is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked.
This should NOT be used in production because it makes it easier for an attacker to execute
a man-in-the-middle attack and "listen in" on the encrypted connection.
+truststore                   --                                                     The path
to a custom Java truststore file. Flume uses the certificate authority information in this
file to determine whether the remote Avro Source's SSL authentication credentials should be
trusted. If not specified, the default Java JSSE certificate authority files (typically "jssecacerts"
or "cacerts" in the Oracle JRE) will be used.
+truststore-password          --                                                     The password
for the specified truststore.
+truststore-type              JKS                                                    The type
of the Java truststore. This can be "JKS" or other supported Java truststore type.
+maxIoWorkers                 2 * the number of available processors in the machine  The maximum
number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.
+==========================   =====================================================  ===========================================================================================
 
 Example for agent named a1:
 

http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index 9aabdd4..a2eb264 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManager;
@@ -55,6 +56,7 @@ import org.apache.avro.ipc.CallFuture;
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -99,6 +101,7 @@ implements RpcClient {
       .getLogger(NettyAvroRpcClient.class);
   private boolean enableDeflateCompression;
   private int compressionLevel;
+  private int maxIoWorkers;
 
   /**
    * This constructor is intended to be called from {@link RpcClientFactory}.
@@ -128,20 +131,34 @@ implements RpcClient {
 
     try {
 
+      ExecutorService bossExecutor =
+        Executors.newCachedThreadPool(new TransceiverThreadFactory(
+          "Avro " + NettyTransceiver.class.getSimpleName() + " Boss"));
+      ExecutorService workerExecutor =
+        Executors.newCachedThreadPool(new TransceiverThreadFactory(
+          "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"));
+
       if (enableDeflateCompression || enableSsl) {
-        socketChannelFactory = new SSLCompressionChannelFactory(
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")),
-            enableDeflateCompression, enableSsl, trustAllCerts, compressionLevel,
-            truststore, truststorePassword, truststoreType);
+        if (maxIoWorkers >= 1) {
+          socketChannelFactory = new SSLCompressionChannelFactory(
+            bossExecutor, workerExecutor,
+            enableDeflateCompression, enableSsl, trustAllCerts,
+            compressionLevel, truststore, truststorePassword, truststoreType,
+            maxIoWorkers);
+        } else {
+          socketChannelFactory = new SSLCompressionChannelFactory(
+            bossExecutor, workerExecutor,
+            enableDeflateCompression, enableSsl, trustAllCerts,
+            compressionLevel, truststore, truststorePassword, truststoreType);
+        }
       } else {
-        socketChannelFactory = new NioClientSocketChannelFactory(
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")));
+        if (maxIoWorkers >= 1) {
+          socketChannelFactory = new NioClientSocketChannelFactory(
+              bossExecutor, workerExecutor, maxIoWorkers);
+        } else {
+          socketChannelFactory = new NioClientSocketChannelFactory(
+              bossExecutor, workerExecutor);
+        }
       }
 
       transceiver = new NettyTransceiver(this.address,
@@ -587,6 +604,23 @@ implements RpcClient {
     truststoreType = properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
 
+    String maxIoWorkersStr = properties.getProperty(
+      RpcClientConfigurationConstants.MAX_IO_WORKERS);
+    if (!StringUtils.isEmpty(maxIoWorkersStr)) {
+      try {
+        maxIoWorkers = Integer.parseInt(maxIoWorkersStr);
+      } catch (NumberFormatException ex) {
+        logger.warn ("Invalid maxIOWorkers:" + maxIoWorkersStr + " Using " +
+          "default maxIOWorkers.");
+        maxIoWorkers = -1;
+      }
+    }
+
+    if (maxIoWorkers < 1) {
+      logger.warn("Using default maxIOWorkers");
+      maxIoWorkers = -1;
+    }
+
     this.connect();
   }
 
@@ -628,13 +662,13 @@ implements RpcClient {
    */
   private static class SSLCompressionChannelFactory extends NioClientSocketChannelFactory
{
 
-    private boolean enableCompression;
-    private int compressionLevel;
-    private boolean enableSsl;
-    private boolean trustAllCerts;
-    private String truststore;
-    private String truststorePassword;
-    private String truststoreType;
+    private final boolean enableCompression;
+    private final int compressionLevel;
+    private final boolean enableSsl;
+    private final boolean trustAllCerts;
+    private final String truststore;
+    private final String truststorePassword;
+    private final String truststoreType;
 
     public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor,
         boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
@@ -650,6 +684,20 @@ implements RpcClient {
       this.truststoreType = truststoreType;
     }
 
+    public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor,
+        boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
+        int compressionLevel, String truststore, String truststorePassword,
+        String truststoreType, int maxIOWorkers) {
+      super(bossExecutor, workerExecutor, maxIOWorkers);
+      this.enableCompression = enableCompression;
+      this.enableSsl = enableSsl;
+      this.compressionLevel = compressionLevel;
+      this.trustAllCerts = trustAllCerts;
+      this.truststore = truststore;
+      this.truststorePassword = truststorePassword;
+      this.truststoreType = truststoreType;
+    }
+
     @Override
     public SocketChannel newChannel(ChannelPipeline pipeline) {
       TrustManager[] managers;

http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 7aa70cb..136c504 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -144,6 +144,12 @@ public final class RpcClientConfigurationConstants {
   public static final String CONFIG_TRUSTSTORE_PASSWORD = "truststore-password";
   public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type";
 
+  /**
+   * Configuration constants for the NettyAvroRpcClient
+   * NioClientSocketChannelFactory
+   */
+  public static final String MAX_IO_WORKERS = "maxIoWorkers";
+
   private RpcClientConfigurationConstants() {
     // disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/209169bb/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
index bfb1fa6..cf4f415 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
@@ -18,28 +18,22 @@
  */
 package org.apache.flume.api;
 
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
-import org.junit.Test;
-
 import org.apache.avro.ipc.Server;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
-import org.apache.flume.event.EventBuilder;
-
 import org.apache.flume.api.RpcTestUtils.FailedAvroHandler;
 import org.apache.flume.api.RpcTestUtils.OKAvroHandler;
 import org.apache.flume.api.RpcTestUtils.ThrowingAvroHandler;
 import org.apache.flume.api.RpcTestUtils.UnknownAvroHandler;
+import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -333,4 +327,73 @@ public class TestNettyAvroRpcClient {
     RpcTestUtils.handlerBatchAppendTest(new ThrowingAvroHandler());
     logger.error("Throwing: I should never have gotten here!");
   }
+
+  /**
+   * configure the NettyAvroRpcClient with a non-default
+   * NioClientSocketChannelFactory number of io worker threads
+   *
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testAppendWithMaxIOWorkers() throws FlumeException, EventDeliveryException
{
+    NettyAvroRpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "localhost",
localhost
+        + ":" + server.getPort());
+    props.setProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS, Integer.toString(2));
+    try {
+      client = new NettyAvroRpcClient();
+      client.configure(props);
+      for (int i = 0; i < 5; i++) {
+        client.append(EventBuilder.withBody("evt:" + i, Charset.forName("UTF8")));
+      }
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+
+  /**
+   * Simple request with compression on the server and client with compression
+   * level 0
+   *
+   * configure the NettyAvroRpcClient with a non-default
+   * NioClientSocketChannelFactory number of io worker threads
+   *
+   * Compression level 0 = no compression
+   *
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testAppendWithMaxIOWorkersSimpleCompressionLevel0() throws FlumeException,
+      EventDeliveryException {
+    NettyAvroRpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler(), 0, true);
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "localhost",
localhost
+        + ":" + server.getPort());
+    props.setProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS, Integer.toString(2));
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "deflate");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, "" + 0);
+
+    try {
+      client = new NettyAvroRpcClient();
+      client.configure(props);
+      for (int i = 0; i < 5; i++) {
+        client.append(EventBuilder.withBody("evt:" + i, Charset.forName("UTF8")));
+      }
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
 }


Mime
View raw message