flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject flume git commit: FLUME-2373. Support TBinaryProtocol in Thrift RPC.
Date Thu, 18 Dec 2014 20:14:51 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.6 c829d84b0 -> 49101731c


FLUME-2373. Support TBinaryProtocol in Thrift RPC.

(Stefan Krawczyk via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/49101731
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/49101731
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/49101731

Branch: refs/heads/flume-1.6
Commit: 49101731c35ed5674b668993dc00a7635c2eee20
Parents: c829d84
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Dec 18 12:13:19 2014 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Dec 18 12:14:37 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/flume/sink/ThriftSink.java  |  4 +--
 .../org/apache/flume/source/ThriftSource.java   | 29 ++++++++++++++++--
 .../org/apache/flume/sink/TestThriftSink.java   | 11 ++++---
 .../org/apache/flume/api/ThriftRpcClient.java   | 32 ++++++++++++++++++--
 .../apache/flume/api/TestThriftRpcClient.java   | 13 ++++----
 .../apache/flume/api/ThriftTestingSource.java   | 15 +++++++--
 6 files changed, 85 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/49101731/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
index 48a9775..baa60d0 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java
@@ -18,11 +18,11 @@
  */
 package org.apache.flume.sink;
 
+import java.util.Properties;
+
 import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientConfigurationConstants;
 import org.apache.flume.api.RpcClientFactory;
-
-import java.util.Properties;
 /**
  * <p>
  * A {@link org.apache.flume.Sink} implementation that can send events to an RPC server (such
as

http://git-wip-us.apache.org/repos/asf/flume/blob/49101731/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
index c3881b4..551fe13 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -34,6 +34,7 @@ import org.apache.flume.thrift.ThriftSourceProtocol;
 import org.apache.flume.thrift.ThriftFlumeEvent;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TFastFramedTransport;
@@ -70,6 +71,13 @@ public class ThriftSource extends AbstractSource implements Configurable,
    * Config param for the port to listen on.
    */
   public static final String CONFIG_PORT = "port";
+  /**
+   * Config param for the thrift protocol to use.
+   */
+  public static final String CONFIG_PROTOCOL = "protocol";
+  public static final String BINARY_PROTOCOL = "binary";
+  public static final String COMPACT_PROTOCOL = "compact";
+  
   private Integer port;
   private String bindAddress;
   private int maxThreads = 0;
@@ -77,6 +85,7 @@ public class ThriftSource extends AbstractSource implements Configurable,
   private TServer server;
   private TServerTransport serverTransport;
   private ExecutorService servingExecutor;
+  private String protocol;
 
   @Override
   public void configure(Context context) {
@@ -98,6 +107,17 @@ public class ThriftSource extends AbstractSource implements Configurable,
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
     }
+    
+    protocol = context.getString(CONFIG_PROTOCOL);
+    if (protocol == null) {
+      // default is to use the compact protocol.
+      protocol = COMPACT_PROTOCOL;
+    } 
+    Preconditions.checkArgument(
+        (protocol.equalsIgnoreCase(BINARY_PROTOCOL) ||
+            protocol.equalsIgnoreCase(COMPACT_PROTOCOL)),
+        "binary or compact are the only valid Thrift protocol types to " +
+        "choose from.");
   }
 
   @Override
@@ -167,8 +187,13 @@ public class ThriftSource extends AbstractSource implements Configurable,
     }
 
     try {
-
-      args.protocolFactory(new TCompactProtocol.Factory());
+      if (protocol.equals(BINARY_PROTOCOL)) {
+        logger.info("Using TBinaryProtocol");
+        args.protocolFactory(new TBinaryProtocol.Factory());
+      } else {
+        logger.info("Using TCompactProtocol");
+        args.protocolFactory(new TCompactProtocol.Factory());
+      }
       args.inputTransportFactory(new TFastFramedTransport.Factory());
       args.outputTransportFactory(new TFastFramedTransport.Factory());
       args.processor(new ThriftSourceProtocol

http://git-wip-us.apache.org/repos/asf/flume/blob/49101731/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
index 5f70d1b..fccaede 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
@@ -19,15 +19,18 @@
 package org.apache.flume.sink;
 
 import com.google.common.base.Charsets;
+
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
+import org.apache.flume.api.ThriftRpcClient;
 import org.apache.flume.api.ThriftTestingSource;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.ThriftSource;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -58,7 +61,7 @@ public class TestThriftSink {
     context.put("port", String.valueOf(port));
     context.put("batch-size", String.valueOf(2));
     context.put("request-timeout", String.valueOf(2000L));
-
+    context.put(ThriftRpcClient.CONFIG_PROTOCOL, ThriftRpcClient.COMPACT_PROTOCOL);
     sink.setChannel(channel);
 
     Configurables.configure(sink, context);
@@ -77,7 +80,7 @@ public class TestThriftSink {
 
     Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
     src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
-      port);
+      port, ThriftRpcClient.COMPACT_PROTOCOL);
 
     channel.start();
     sink.start();
@@ -108,7 +111,7 @@ public class TestThriftSink {
   public void testTimeout() throws Exception {
     AtomicLong delay = new AtomicLong();
     src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ALTERNATE
-      .name(), port);
+      .name(), port, ThriftRpcClient.COMPACT_PROTOCOL);
     src.setDelay(delay);
     delay.set(2500);
 
@@ -182,7 +185,7 @@ public class TestThriftSink {
     }
 
     src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
-      port);
+      port, ThriftRpcClient.COMPACT_PROTOCOL);
 
     for (int i = 0; i < 5; i++) {
       Sink.Status status = sink.process();

http://git-wip-us.apache.org/repos/asf/flume/blob/49101731/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
index cf45ab9..6382a0e 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
@@ -24,6 +24,7 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.thrift.Status;
 import org.apache.flume.thrift.ThriftFlumeEvent;
 import org.apache.flume.thrift.ThriftSourceProtocol;
+import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TSocket;
@@ -57,6 +58,13 @@ public class ThriftRpcClient extends AbstractRpcClient {
   private static final Logger LOGGER =
     LoggerFactory.getLogger(ThriftRpcClient.class);
 
+  /**
+   * Config param for the thrift protocol to use.
+   */
+  public static final String CONFIG_PROTOCOL = "protocol";
+  public static final String BINARY_PROTOCOL = "binary";
+  public static final String COMPACT_PROTOCOL = "compact";
+  
   private int batchSize;
   private long requestTimeout;
   private final Lock stateLock;
@@ -68,6 +76,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
   private final AtomicLong threadCounter;
   private int connectionPoolSize;
   private final Random random = new Random();
+  private String protocol;
 
   public ThriftRpcClient() {
     stateLock = new ReentrantLock(true);
@@ -267,6 +276,18 @@ public class ThriftRpcClient extends AbstractRpcClient {
       HostInfo host = HostInfo.getHostInfoList(properties).get(0);
       hostname = host.getHostName();
       port = host.getPortNumber();
+      protocol = properties.getProperty(CONFIG_PROTOCOL);
+      if (protocol == null) {
+        // default is to use the compact protocol.
+        protocol = COMPACT_PROTOCOL;
+      }
+      // check in case that garbage was put in.
+      if (!(protocol.equalsIgnoreCase(BINARY_PROTOCOL) ||
+          protocol.equalsIgnoreCase(COMPACT_PROTOCOL))) {
+        LOGGER.warn("'binary' or 'compact' are the only valid Thrift protocol types to "
+            + "choose from. Defaulting to 'compact'.");
+        protocol = COMPACT_PROTOCOL;
+      }
       batchSize = Integer.parseInt(properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
         RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString()));
@@ -322,8 +343,15 @@ public class ThriftRpcClient extends AbstractRpcClient {
     public ClientWrapper() throws Exception{
       transport = new TFastFramedTransport(new TSocket(hostname, port));
       transport.open();
-      client = new ThriftSourceProtocol.Client(new TCompactProtocol
-        (transport));
+      if (protocol.equals(BINARY_PROTOCOL)) {
+        LOGGER.info("Using TBinaryProtocol");
+        client = new ThriftSourceProtocol.Client(new TBinaryProtocol
+            (transport));
+      } else {
+        LOGGER.info("Using TCompactProtocol");
+        client = new ThriftSourceProtocol.Client(new TCompactProtocol
+            (transport));
+      }
       // Not a great hash code, but since this class is immutable and there
       // is at most one instance of the components of this class,
       // this works fine [If the objects are equal, hash code is the same]

http://git-wip-us.apache.org/repos/asf/flume/blob/49101731/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
index 88eb5e7..a8baaa8 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
@@ -56,6 +56,7 @@ public class TestThriftRpcClient {
     props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "10");
     props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
       "2000");
+    props.setProperty(ThriftRpcClient.CONFIG_PROTOCOL, ThriftRpcClient.COMPACT_PROTOCOL);
   }
 
   @After
@@ -103,7 +104,7 @@ public class TestThriftRpcClient {
   @Test
   public void testOK() throws Exception {
     src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
-      port);
+      port, ThriftRpcClient.COMPACT_PROTOCOL);
     client = (ThriftRpcClient) RpcClientFactory.getInstance(props);
     insertEvents(client, 10); //10 events
     insertAsBatch(client, 10, 25); //16 events
@@ -121,7 +122,7 @@ public class TestThriftRpcClient {
   @Test
   public void testSlow() throws Exception {
     src = new ThriftTestingSource(ThriftTestingSource.HandlerType.SLOW.name(),
-      port);
+      port, ThriftRpcClient.COMPACT_PROTOCOL);
     client = (ThriftRpcClient) RpcClientFactory.getInstance(props);
     insertEvents(client, 2); //2 events
     insertAsBatch(client, 2, 25); //24 events (3 batches)
@@ -139,7 +140,7 @@ public class TestThriftRpcClient {
   @Test(expected = EventDeliveryException.class)
   public void testFail() throws Exception {
     src = new ThriftTestingSource(ThriftTestingSource.HandlerType.FAIL.name(),
-      port);
+      port, ThriftRpcClient.COMPACT_PROTOCOL);
     client = (ThriftRpcClient) RpcClientFactory.getInstance(props);
     insertEvents(client, 2); //2 events
     Assert.fail("Expected EventDeliveryException to be thrown.");
@@ -149,7 +150,7 @@ public class TestThriftRpcClient {
   public void testError() throws Throwable {
     try {
       src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ERROR
-        .name(), port);
+        .name(), port, ThriftRpcClient.COMPACT_PROTOCOL);
       client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" +
         ".0", port);
       insertEvents(client, 2); //2 events
@@ -163,7 +164,7 @@ public class TestThriftRpcClient {
   public void testTimeout() throws Throwable {
     try {
       src = new ThriftTestingSource(ThriftTestingSource.HandlerType.TIMEOUT
-        .name(), port);
+        .name(), port, ThriftRpcClient.COMPACT_PROTOCOL);
       client = (ThriftRpcClient) RpcClientFactory.getThriftInstance(props);
       insertEvents(client, 2); //2 events
     } catch (EventDeliveryException ex) {
@@ -174,7 +175,7 @@ public class TestThriftRpcClient {
   @Test
   public void testMultipleThreads() throws Throwable {
     src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
-      port);
+      port, ThriftRpcClient.COMPACT_PROTOCOL);
     client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" +
       ".0", port, 10);
     int threadCount = 100;

http://git-wip-us.apache.org/repos/asf/flume/blob/49101731/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
index cde7269..63d2fc3 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
@@ -25,7 +25,11 @@ import org.apache.flume.thrift.Status;
 import org.apache.flume.thrift.ThriftFlumeEvent;
 import org.apache.flume.thrift.ThriftSourceProtocol;
 import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.THsHaServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TNonblockingServerSocket;
@@ -180,7 +184,7 @@ public class ThriftTestingSource {
     }
   }
 
-  public ThriftTestingSource(String handlerName, int port) throws Exception {
+  public ThriftTestingSource(String handlerName, int port, String protocol) throws Exception
{
     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(new
       InetSocketAddress("0.0.0.0", port));
     ThriftSourceProtocol.Iface handler = null;
@@ -197,11 +201,16 @@ public class ThriftTestingSource {
     } else if (handlerName.equals(HandlerType.ALTERNATE.name())) {
       handler = new ThriftAlternateHandler();
     }
-
+    TProtocolFactory transportProtocolFactory = null;
+    if (protocol != null && protocol == ThriftRpcClient.BINARY_PROTOCOL) {
+      transportProtocolFactory = new TBinaryProtocol.Factory();
+    } else {
+      transportProtocolFactory = new TCompactProtocol.Factory();
+    }
     server = new THsHaServer(new THsHaServer.Args
       (serverTransport).processor(
       new ThriftSourceProtocol.Processor(handler)).protocolFactory(
-      new TCompactProtocol.Factory()));
+          transportProtocolFactory));
     Executors.newSingleThreadExecutor().submit(new Runnable() {
       @Override
       public void run() {


Mime
View raw message