flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1756. Avro client should be able to use load balancing RPC
Date Thu, 20 Dec 2012 01:50:09 GMT
Updated Branches:
  refs/heads/trunk efd6d26c1 -> 88980d06b


FLUME-1756. Avro client should be able to use load balancing RPC

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/88980d06
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/88980d06
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/88980d06

Branch: refs/heads/trunk
Commit: 88980d06b0b5b8af632055af104a4a6e03b32a62
Parents: efd6d26
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Wed Dec 19 17:49:02 2012 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Wed Dec 19 17:49:02 2012 -0800

----------------------------------------------------------------------
 bin/flume-ng                                       |   15 +++--
 .../apache/flume/client/avro/AvroCLIClient.java    |   50 +++++++++++----
 2 files changed, 47 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/88980d06/bin/flume-ng
----------------------------------------------------------------------
diff --git a/bin/flume-ng b/bin/flume-ng
index 4fd1503..ee86c95 100755
--- a/bin/flume-ng
+++ b/bin/flume-ng
@@ -190,12 +190,15 @@ agent options:
   --help,-h             display help text
 
 avro-client options:
-  --dirname <dir>       directory to stream to avro source
-  --host,-H <host>      hostname to which events will be sent (required)
-  --port,-p <port>      port of the avro source (required)
-  --filename,-F <file>  text file to stream to avro source [default: std input]
-  --headerFile,-R <file> headerFile containing headers as key/value pairs on each new
line
-  --help,-h             display help text
+  --rpcProps,-P <file>   RPC client properties file with server connection params
+  --host,-H <host>       hostname to which events will be sent
+  --port,-p <port>       port of the avro source
+  --dirname <dir>        directory to stream to avro source
+  --filename,-F <file>   text file to stream to avro source (default: std input)
+  --headerFile,-R <file> File containing event headers as key/value pairs on each new
line
+  --help,-h              display help text
+
+  Either --rpcProps or both --host and --port must be specified.
 
 Note that if <conf> directory is specified, then it is always included first
 in the classpath.

http://git-wip-us.apache.org/repos/asf/flume/blob/88980d06/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
index f00ce24..da23a75 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -59,6 +60,7 @@ public class AvroCLIClient {
   private String hostname;
   private int port;
   private String fileName;
+  private String rpcClientPropsFile;
   private String dirName;
   private Map<String, String> headers = new HashMap<String, String>();
   private int sent;
@@ -73,7 +75,7 @@ public class AvroCLIClient {
     } catch (ParseException e) {
       logger.error("Unable to parse command line options - {}", e.getMessage());
     } catch (IOException e) {
-      logger.error("Unable to send data to Flume. Exception follows.",  e);
+      logger.error("Unable to send data to Flume. Exception follows.", e);
     } catch (FlumeException e) {
       logger.error("Unable to open connection to Flume. Exception follows.", e);
     } catch (EventDeliveryException e) {
@@ -120,7 +122,10 @@ public class AvroCLIClient {
   private boolean parseCommandLine(String[] args) throws ParseException {
     Options options = new Options();
 
-    options.addOption("p", "port", true, "port of the avro source")
+    options
+        .addOption("P", "rpcProps", true, "RPC client properties file with " +
+            "server connection params")
+        .addOption("p", "port", true, "port of the avro source")
         .addOption("H", "host", true, "hostname of the avro source")
         .addOption("F", "filename", true, "file to stream to avro source")
         .addOption(null, "dirname", true, "directory to stream to avro source")
@@ -144,19 +149,34 @@ public class AvroCLIClient {
           "--filename and --dirname options cannot be used simultaneously");
     }
 
-    if (!commandLine.hasOption("port")) {
-      throw new ParseException(
-          "You must specify a port to connect to with --port");
+    if (!commandLine.hasOption("port") && !commandLine.hasOption("host") &&
+        !commandLine.hasOption("rpcProps")) {
+      throw new ParseException("Either --rpcProps or both --host and --port " +
+          "must be specified.");
     }
 
-    port = Integer.parseInt(commandLine.getOptionValue("port"));
+    if (commandLine.hasOption("rpcProps")) {
+      rpcClientPropsFile = commandLine.getOptionValue("rpcProps");
+      Preconditions.checkNotNull(rpcClientPropsFile, "RPC client properties " +
+          "file must be specified after --rpcProps argument.");
+      Preconditions.checkArgument(new File(rpcClientPropsFile).exists(),
+          "RPC client properties file %s does not exist!", rpcClientPropsFile);
+    }
 
-    if (!commandLine.hasOption("host")) {
-      throw new ParseException(
-          "You must specify a hostname to connect to with --host");
+    if (rpcClientPropsFile == null) {
+      if (!commandLine.hasOption("port")) {
+        throw new ParseException(
+            "You must specify a port to connect to with --port");
+      }
+      port = Integer.parseInt(commandLine.getOptionValue("port"));
+
+      if (!commandLine.hasOption("host")) {
+        throw new ParseException(
+            "You must specify a hostname to connect to with --host");
+      }
+      hostname = commandLine.getOptionValue("host");
     }
 
-    hostname = commandLine.getOptionValue("host");
     fileName = commandLine.getOptionValue("filename");
     dirName = commandLine.getOptionValue("dirname");
 
@@ -172,8 +192,14 @@ public class AvroCLIClient {
 
     EventReader reader = null;
 
-    RpcClient rpcClient = RpcClientFactory.getDefaultInstance(hostname, port,
-        BATCH_SIZE);
+    RpcClient rpcClient;
+    if (rpcClientPropsFile != null) {
+      rpcClient = RpcClientFactory.getInstance(new File(rpcClientPropsFile));
+    } else {
+      rpcClient = RpcClientFactory.getDefaultInstance(hostname, port,
+          BATCH_SIZE);
+    }
+
     try {
       if (fileName != null) {
         reader = new SimpleTextLineEventReader(new FileReader(new File(fileName)));


Mime
View raw message