metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject incubator-metron git commit: METRON-233: Expose Kafka producer and consumer configs for parser topologies. This closes apache/incubator-metron#160
Date Thu, 23 Jun 2016 13:52:11 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master d002d2525 -> a35894e9d


METRON-233: Expose Kafka producer and consumer configs for parser topologies. This closes
apache/incubator-metron#160


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/a35894e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/a35894e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/a35894e9

Branch: refs/heads/master
Commit: a35894e9d9d0b51e6a93cf794b61820be7409b63
Parents: d002d25
Author: cstella <cestella@gmail.com>
Authored: Thu Jun 23 09:51:52 2016 -0400
Committer: cstella <cestella@gmail.com>
Committed: Thu Jun 23 09:51:52 2016 -0400

----------------------------------------------------------------------
 .../common/spout/kafka/SpoutConfigFunction.java |  24 ++++
 .../common/spout/kafka/SpoutConfigOptions.java  |  80 +++++++++++++
 .../common/spout/kafka/SpoutConfigTest.java     | 113 +++++++++++++++++++
 metron-platform/metron-parsers/README.md        |  86 ++++++++++----
 .../parsers/topology/ParserTopologyBuilder.java |   5 +-
 .../parsers/topology/ParserTopologyCLI.java     |  48 +++++++-
 .../metron/parsers/writer/KafkaWriter.java      |  26 ++++-
 .../components/ParserTopologyComponent.java     |   1 +
 .../metron/parsers/writer/KafkaWriterTest.java  |  95 ++++++++++++++++
 9 files changed, 451 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a35894e9/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java
new file mode 100644
index 0000000..e8abd60
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.spout.kafka;
+
+public interface SpoutConfigFunction {
+  void configure(storm.kafka.SpoutConfig config, Object val);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a35894e9/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java
new file mode 100644
index 0000000..807694e
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.spout.kafka;
+
+import com.google.common.base.Joiner;
+import org.apache.metron.common.utils.ConversionUtils;
+import storm.kafka.SpoutConfig;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+public enum SpoutConfigOptions implements SpoutConfigFunction {
+  retryDelayMaxMs( (config, val) -> config.retryDelayMaxMs = convertVal(val, Long.class)
),
+  retryDelayMultiplier ( (config, val) -> config.retryDelayMultiplier = convertVal(val,
Double.class)),
+  retryInitialDelayMs( (config, val) -> config.retryInitialDelayMs = convertVal(val, Long.class)),
+  stateUpdateIntervalMs( (config, val) -> config.stateUpdateIntervalMs = convertVal(val,
Long.class)),
+  bufferSizeBytes( (config, val) -> config.bufferSizeBytes = convertVal(val, Integer.class)),
+  fetchMaxWait( (config, val) -> config.fetchMaxWait = convertVal(val, Integer.class)),
+  fetchSizeBytes( (config, val) -> config.fetchSizeBytes= convertVal(val, Integer.class)),
+  maxOffsetBehind( (config, val) -> config.maxOffsetBehind = convertVal(val, Long.class)),
+  metricsTimeBucketSizeInSecs( (config, val) -> config.metricsTimeBucketSizeInSecs = convertVal(val,
Integer.class)),
+  socketTimeoutMs( (config, val) -> config.socketTimeoutMs = convertVal(val, Integer.class)),
+  ;
+
+  SpoutConfigFunction  spoutConfigFunc;
+  SpoutConfigOptions(SpoutConfigFunction spoutConfigFunc) {
+    this.spoutConfigFunc = spoutConfigFunc;
+  }
+
+  @Override
+  public void configure(SpoutConfig config, Object val) {
+    spoutConfigFunc.configure(config, val);
+  }
+
+  public static SpoutConfig configure(SpoutConfig config, EnumMap<SpoutConfigOptions,
Object> configs) {
+    if(configs != null) {
+      for(Map.Entry<SpoutConfigOptions, Object> kv : configs.entrySet()) {
+        kv.getKey().configure(config, kv.getValue());
+      }
+    }
+    return config;
+  }
+
+  public static EnumMap<SpoutConfigOptions, Object> coerceMap(Map<String, Object>
map) {
+    EnumMap<SpoutConfigOptions, Object> ret = new EnumMap<>(SpoutConfigOptions.class);
+    for(Map.Entry<String, Object> kv : map.entrySet()) {
+      try {
+        ret.put(SpoutConfigOptions.valueOf(kv.getKey()), kv.getValue());
+      }
+      catch(Exception ex) {
+        String possibleOptions = Joiner.on(",").join(SpoutConfigOptions.values());
+        throw new IllegalArgumentException("Configuration keys for spout config must be one
of: " + possibleOptions, ex);
+      }
+    }
+    return ret;
+  }
+  private static <EXPECTED_T> EXPECTED_T convertVal(Object val, Class<EXPECTED_T>
clazz) {
+    Object ret = ConversionUtils.convert(val, clazz);
+    if(ret == null) {
+      throw new IllegalArgumentException("Unable to convert " + val + " to expected type
" + clazz.getCanonicalName());
+    }
+    return clazz.cast(ret);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a35894e9/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java
new file mode 100644
index 0000000..74f99d7
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.spout.kafka;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.utils.JSONUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SpoutConfigTest {
+
+  /**
+   {
+    "retryDelayMaxMs" : 1000,
+    "retryDelayMultiplier" : 1.2,
+    "retryInitialDelayMs" : 2000,
+    "stateUpdateIntervalMs" : 3000,
+    "bufferSizeBytes" : 4000,
+    "fetchMaxWait" : 5000,
+    "fetchSizeBytes" : 6000,
+    "maxOffsetBehind" : 7000,
+    "metricsTimeBucketSizeInSecs" : 8000,
+    "socketTimeoutMs" : 9000
+   }
+   */
+  @Multiline
+  public static String config;
+
+  @Test
+  public void testConfigApplication() throws IOException {
+    SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
+    Map<String, Object> configMap = JSONUtils.INSTANCE.load(config, new TypeReference<Map<String,
Object>>() {
+    });
+    SpoutConfigOptions.configure(spoutConfig, SpoutConfigOptions.coerceMap(configMap));
+    Assert.assertEquals(1000, spoutConfig.retryDelayMaxMs);
+    Assert.assertEquals(1.2, spoutConfig.retryDelayMultiplier, 1e-7);
+    Assert.assertEquals(2000, spoutConfig.retryInitialDelayMs);
+    Assert.assertEquals(3000, spoutConfig.stateUpdateIntervalMs);
+    Assert.assertEquals(4000, spoutConfig.bufferSizeBytes);
+    Assert.assertEquals(5000, spoutConfig.fetchMaxWait);
+    Assert.assertEquals(6000, spoutConfig.fetchSizeBytes);
+    Assert.assertEquals(7000, spoutConfig.maxOffsetBehind);
+    Assert.assertEquals(8000, spoutConfig.metricsTimeBucketSizeInSecs);
+    Assert.assertEquals(9000, spoutConfig.socketTimeoutMs);
+  }
+  /**
+   {
+    "retryDelayMaxMs" : 1000,
+    "retryDelayMultiplier" : 1.2,
+    "retryInitialDelayMs" : 2000,
+    "stateUpdateIntervalMs" : 3000,
+    "bufferSizeBytes" : 4000
+   }
+   */
+  @Multiline
+  public static String incompleteConfig;
+  @Test
+  public void testIncompleteConfigApplication() throws IOException {
+    SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
+    Map<String, Object> configMap = JSONUtils.INSTANCE.load(incompleteConfig, new TypeReference<Map<String,
Object>>() {
+    });
+    SpoutConfigOptions.configure(spoutConfig, SpoutConfigOptions.coerceMap(configMap));
+    Assert.assertEquals(1000, spoutConfig.retryDelayMaxMs);
+    Assert.assertEquals(1.2, spoutConfig.retryDelayMultiplier, 1e-7);
+    Assert.assertEquals(2000, spoutConfig.retryInitialDelayMs);
+    Assert.assertEquals(3000, spoutConfig.stateUpdateIntervalMs);
+    Assert.assertEquals(4000, spoutConfig.bufferSizeBytes);
+    Assert.assertEquals(10000, spoutConfig.fetchMaxWait); //default
+    Assert.assertEquals(1024*1024, spoutConfig.fetchSizeBytes); //default
+    Assert.assertEquals(Long.MAX_VALUE, spoutConfig.maxOffsetBehind);//default
+    Assert.assertEquals(60, spoutConfig.metricsTimeBucketSizeInSecs); //default
+    Assert.assertEquals(10000, spoutConfig.socketTimeoutMs); //default
+  }
+
+  @Test
+  public void testEmptyConfigApplication() throws IOException {
+    SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
+    SpoutConfigOptions.configure(spoutConfig, SpoutConfigOptions.coerceMap(new HashMap<>()));
+    //ensure defaults are used
+    Assert.assertEquals(60*1000, spoutConfig.retryDelayMaxMs);
+    Assert.assertEquals(1.0, spoutConfig.retryDelayMultiplier, 1e-7);
+    Assert.assertEquals(0, spoutConfig.retryInitialDelayMs);
+    Assert.assertEquals(2000, spoutConfig.stateUpdateIntervalMs);
+    Assert.assertEquals(1024*1024, spoutConfig.bufferSizeBytes);
+    Assert.assertEquals(10000, spoutConfig.fetchMaxWait); //default
+    Assert.assertEquals(1024*1024, spoutConfig.fetchSizeBytes); //default
+    Assert.assertEquals(Long.MAX_VALUE, spoutConfig.maxOffsetBehind);//default
+    Assert.assertEquals(60, spoutConfig.metricsTimeBucketSizeInSecs); //default
+    Assert.assertEquals(10000, spoutConfig.socketTimeoutMs); //default
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a35894e9/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 76adfcc..f94651c 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -237,27 +237,73 @@ The usage for `start_parser_topology.sh` is as follows:
 
 ```
 usage: start_parser_topology.sh
- -e,--extra_options <JSON_FILE>               Extra options in the form of
-                                              a JSON file with a map for
-                                              content.
- -h,--help                                    This screen
- -k,--kafka <BROKER_URL>                      Kafka Broker URL
- -mt,--message_timeout <TIMEOUT_IN_SECS>      Message Timeout in Seconds
- -mtp,--max_task_parallelism <MAX_TASK>       Max task parallelism
- -na,--num_ackers <NUM_ACKERS>                Number of Ackers
- -nw,--num_workers <NUM_WORKERS>              Number of Workers
- -pnt,--parser_num_tasks <PARSER_NUM_TASKS>   Parser Num Tasks
- -pp,--parser_p <PARSER_PARALLELISM_HINT>     Parser Parallelism Hint
- -s,--sensor <SENSOR_TYPE>                    Sensor Type
- -snt,--spout_num_tasks <NUM_TASKS>           Spout Num Tasks
- -sp,--spout_p <SPOUT_PARALLELISM_HINT>       Spout Parallelism Hint
- -t,--test <TEST>                             Run in Test Mode
- -z,--zk <ZK_QUORUM>                          Zookeeper Quroum URL
-                                              (zk1:2181,zk2:2181,...
+ -e,--extra_topology_options <JSON_FILE>        Extra options in the form
+                                                of a JSON file with a map
+                                                for content.
+ -esc,--extra_kafka_spout_config <JSON_FILE>    Extra spout config options
+                                                in the form of a JSON file
+                                                with a map for content.
+                                                Possible keys are:
+                                                retryDelayMaxMs,retryDelay
+                                                Multiplier,retryInitialDel
+                                                ayMs,stateUpdateIntervalMs
+                                                ,bufferSizeBytes,fetchMaxW
+                                                ait,fetchSizeBytes,maxOffs
+                                                etBehind,metricsTimeBucket
+                                                SizeInSecs,socketTimeoutMs
+ -ewnt,--error_writer_num_tasks <NUM_TASKS>     Error Writer Num Tasks
+ -ewp,--error_writer_p <PARALLELISM_HINT>       Error Writer Parallelism
+                                                Hint
+ -h,--help                                      This screen
+ -iwnt,--invalid_writer_num_tasks <NUM_TASKS>   Invalid Writer Num Tasks
+ -iwp,--invalid_writer_p <PARALLELISM_HINT>     Invalid Message Writer
+                                                Parallelism Hint
+ -k,--kafka <BROKER_URL>                        Kafka Broker URL
+ -mt,--message_timeout <TIMEOUT_IN_SECS>        Message Timeout in Seconds
+ -mtp,--max_task_parallelism <MAX_TASK>         Max task parallelism
+ -na,--num_ackers <NUM_ACKERS>                  Number of Ackers
+ -nw,--num_workers <NUM_WORKERS>                Number of Workers
+ -pnt,--parser_num_tasks <NUM_TASKS>            Parser Num Tasks
+ -pp,--parser_p <PARALLELISM_HINT>              Parser Parallelism Hint
+ -s,--sensor <SENSOR_TYPE>                      Sensor Type
+ -snt,--spout_num_tasks <NUM_TASKS>             Spout Num Tasks
+ -sp,--spout_p <SPOUT_PARALLELISM_HINT>         Spout Parallelism Hint
+ -t,--test <TEST>                               Run in Test Mode
+ -z,--zk <ZK_QUORUM>                            Zookeeper Quroum URL
+                                                (zk1:2181,zk2:2181,...
 ```
 
-A small note on the extra options.  These options are intended to be Storm configuration
options and will live in
-a JSON file which will be loaded into the Storm config.  For instance, if you wanted to set
some storm property on
+# The `--extra_kafka_spout_config` Option
+These options are intended to configure the Storm Kafka Spout more completely.  These options
can be
+specified in a JSON file containing a map associating the kafka spout configuration parameter
to a value.
+The range of values possible to configure are:
+* retryDelayMaxMs
+* retryDelayMultiplier
+* retryInitialDelayMs
+* stateUpdateIntervalMs
+* bufferSizeBytes
+* fetchMaxWait
+* fetchSizeBytes
+* maxOffsetBehind
+* metricsTimeBucketSizeInSecs
+* socketTimeoutMs
+
+These are described in some detail [here](https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.4/bk_storm-user-guide/content/storm-kafka-api-ref.html).
+
+For instance, creating a JSON file which will set the `bufferSizeBytes` to 2MB and `retryDelayMaxMs`
to 2000 would look like
+```
+{
+  "bufferSizeBytes" : 2000000,
+  "retryDelayMaxMs" : 2000
+}
+```
+
+This would be loaded by passing the file as argument to `--extra_kafka_spout_config`
+
+# The `--extra_topology_options` Option
+
+These options are intended to be Storm configuration options and will live in
+a JSON file which will be loaded into the Storm config.  For instance, if you wanted to set
a storm property on
 the config called `topology.ticks.tuple.freq.secs` to 1000 and `storm.local.dir` to `/opt/my/path`
 you could create a file called `custom_config.json` containing 
 ```
@@ -266,4 +312,4 @@ you could create a file called `custom_config.json` containing
   "storm.local.dir" : "/opt/my/path"
 }
 ```
-and pass `--extra_options custom_config.json` to `start_parser_topology.sh`.
+and pass `--extra_topology_options custom_config.json` to `start_parser_topology.sh`.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a35894e9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index d83d260..1ad82c5 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -27,6 +27,7 @@ import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
 import org.apache.metron.common.interfaces.MessageWriter;
 import org.apache.metron.common.spout.kafka.SpoutConfig;
+import org.apache.metron.common.spout.kafka.SpoutConfigOptions;
 import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.common.writer.AbstractWriter;
 import org.apache.metron.parsers.bolt.ParserBolt;
@@ -54,7 +55,8 @@ public class ParserTopologyBuilder {
                          int invalidWriterParallelism,
                          int invalidWriterNumTasks,
                          int errorWriterParallelism,
-                         int errorWriterNumTasks
+                         int errorWriterNumTasks,
+                         EnumMap<SpoutConfigOptions, Object> kafkaSpoutConfigOptions
                                      ) throws Exception {
     CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
     client.start();
@@ -70,6 +72,7 @@ public class ParserTopologyBuilder {
     TopologyBuilder builder = new TopologyBuilder();
     ZkHosts zkHosts = new ZkHosts(zookeeperUrl);
     SpoutConfig spoutConfig = new SpoutConfig(zkHosts, sensorTopic, "", sensorTopic).from(offset);
+    SpoutConfigOptions.configure(spoutConfig, kafkaSpoutConfigOptions);
     KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
     builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
            .setNumTasks(spoutNumTasks);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a35894e9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 0578fa5..7bf56e3 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -23,12 +23,20 @@ import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.utils.Utils;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Joiner;
 import org.apache.commons.cli.*;
+import org.apache.commons.io.FileUtils;
 import org.apache.metron.common.spout.kafka.SpoutConfig;
+import org.apache.metron.common.spout.kafka.SpoutConfigOptions;
+import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.parsers.topology.config.Arg;
 import org.apache.metron.parsers.topology.config.ConfigHandlers;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -149,13 +157,25 @@ public class ParserTopologyCLI {
     }, new ConfigHandlers.SetMessageTimeoutHandler()
     )
     ,EXTRA_OPTIONS("e", code -> {
-      Option o = new Option(code, "extra_options", true, "Extra options in the form of a
JSON file with a map for content.");
+      Option o = new Option(code, "extra_topology_options", true, "Extra options in the form
of a JSON file with a map for content.");
       o.setArgName("JSON_FILE");
       o.setRequired(false);
       o.setType(String.class);
       return o;
     }, new ConfigHandlers.LoadJSONHandler()
     )
+    ,SPOUT_CONFIG("esc", code -> {
+      Option o = new Option(code
+                           , "extra_kafka_spout_config"
+                           , true
+                           , "Extra spout config options in the form of a JSON file with
a map for content.  " +
+                             "Possible keys are: " + Joiner.on(",").join(SpoutConfigOptions.values()));
+      o.setArgName("JSON_FILE");
+      o.setRequired(false);
+      o.setType(String.class);
+      return o;
+    }
+    )
     ,TEST("t", code ->
     {
       Option o = new Option("t", "test", true, "Run in Test Mode");
@@ -260,6 +280,10 @@ public class ParserTopologyCLI {
       int errorNumTasks= Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
       int invalidParallelism = Integer.parseInt(ParserOptions.INVALID_WRITER_PARALLELISM.get(cmd,
"1"));
       int invalidNumTasks= Integer.parseInt(ParserOptions.INVALID_WRITER_NUM_TASKS.get(cmd,
"1"));
+      EnumMap<SpoutConfigOptions, Object> spoutConfig = new EnumMap<SpoutConfigOptions,
Object>(SpoutConfigOptions.class);
+      if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
+        spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
+      }
       SpoutConfig.Offset offset = cmd.hasOption("t") ? SpoutConfig.Offset.BEGINNING : SpoutConfig.Offset.WHERE_I_LEFT_OFF;
       TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
               brokerUrl,
@@ -272,7 +296,8 @@ public class ParserTopologyCLI {
               invalidParallelism,
               invalidNumTasks,
               errorParallelism,
-              errorNumTasks
+              errorNumTasks,
+              spoutConfig
       );
       Config stormConf = ParserOptions.getConfig(cmd);
 
@@ -290,4 +315,23 @@ public class ParserTopologyCLI {
       System.exit(-1);
     }
   }
+  private static EnumMap<SpoutConfigOptions, Object> readSpoutConfig(File inputFile)
{
+    String json = null;
+    if (inputFile.exists()) {
+      try {
+        json = FileUtils.readFileToString(inputFile);
+      } catch (IOException e) {
+        throw new IllegalStateException("Unable to process JSON file " + inputFile, e);
+      }
+    }
+    else {
+      throw new IllegalArgumentException("Unable to load JSON file at " + inputFile.getAbsolutePath());
+    }
+    try {
+      return SpoutConfigOptions.coerceMap(JSONUtils.INSTANCE.load(json, new TypeReference<Map<String,
Object>>() {
+      }));
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to process JSON.", e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a35894e9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
index f8578c8..6090491 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
@@ -40,7 +40,8 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
     ,KEY_SERIALIZER("kafka.keySerializer")
     ,VALUE_SERIALIZER("kafka.valueSerializer")
     ,REQUIRED_ACKS("kafka.requiredAcks")
-    ,TOPIC("kafka.topic");
+    ,TOPIC("kafka.topic")
+    ,PRODUCER_CONFIGS("kafka.producerConfigs");
     ;
     String key;
     Configurations(String key) {
@@ -64,6 +65,7 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
   private String kafkaTopic = Constants.ENRICHMENT_TOPIC;
   private KafkaProducer kafkaProducer;
   private String configPrefix = null;
+  private Map<String, Object> producerConfigs = new HashMap<>();
 
   public KafkaWriter() {}
 
@@ -95,9 +97,15 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
     return this;
   }
 
+  public KafkaWriter withProducerConfigs(Map<String, Object> extraConfigs) {
+    this.producerConfigs = extraConfigs;
+    return this;
+  }
+
   public Optional<String> getConfigPrefix() {
     return Optional.ofNullable(configPrefix);
   }
+
   @Override
   public void configure(String sensorName, WriterConfiguration configuration) {
     Map<String, Object> configMap = configuration.getSensorConfig(sensorName);
@@ -121,16 +129,26 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
     if(topic != null) {
       withTopic(topic);
     }
+    Map<String, Object> producerConfigs = (Map)Configurations.PRODUCER_CONFIGS.get(getConfigPrefix(),
configMap);
+    if(producerConfigs != null) {
+      withProducerConfigs(producerConfigs);
+    }
   }
 
-  @Override
-  public void init() {
+  public Map<String, Object> createProducerConfigs() {
     Map<String, Object> producerConfig = new HashMap<>();
     producerConfig.put("bootstrap.servers", brokerUrl);
     producerConfig.put("key.serializer", keySerializer);
     producerConfig.put("value.serializer", valueSerializer);
     producerConfig.put("request.required.acks", requiredAcks);
-    this.kafkaProducer = new KafkaProducer<>(producerConfig);
+    producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs);
+    return producerConfig;
+  }
+
+  @Override
+  public void init() {
+
+    this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a35894e9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index ad00deb..f639e02 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -79,6 +79,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
                                                                    , 1
                                                                    , 1
                                                                    , 1
+                                                                   , null
                                                                    );
       Map<String, Object> stormConf = new HashMap<>();
       stormConf.put(Config.TOPOLOGY_DEBUG, true);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a35894e9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java
new file mode 100644
index 0000000..57dc3e2
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.writer;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaWriterTest {
+
+
+  public static final String SENSOR_TYPE = "test";
+  public WriterConfiguration createConfiguration(final Map<String, Object> parserConfig)
{
+    ParserConfigurations configurations = new ParserConfigurations();
+    configurations.updateSensorParserConfig( SENSOR_TYPE
+                                           , new SensorParserConfig() {{
+                                              setParserConfig(parserConfig);
+                                                                      }}
+                                           );
+    return new ParserWriterConfiguration(configurations);
+  }
+
+  @Test
+  public void testHappyPathGlobalConfig() throws Exception {
+    KafkaWriter writer = new KafkaWriter();
+    WriterConfiguration configuration = createConfiguration(
+            new HashMap<String, Object>() {{
+              put("kafka.brokerUrl" , "localhost:6667");
+              put("kafka.topic" , SENSOR_TYPE);
+              put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
+            }}
+    );
+
+    writer.configure(SENSOR_TYPE, configuration);
+    Map<String, Object> producerConfigs = writer.createProducerConfigs();
+    Assert.assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
+    Assert.assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    Assert.assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    Assert.assertEquals(producerConfigs.get("request.required.acks"), 1);
+    Assert.assertEquals(producerConfigs.get("key1"), 1);
+    Assert.assertEquals(producerConfigs.get("key2"), "value2");
+  }
+
+  @Test
+  public void testHappyPathGlobalConfigWithPrefix() throws Exception {
+    KafkaWriter writer = new KafkaWriter();
+    writer.withConfigPrefix("prefix");
+    WriterConfiguration configuration = createConfiguration(
+            new HashMap<String, Object>() {{
+              put("prefix.kafka.brokerUrl" , "localhost:6667");
+              put("prefix.kafka.topic" , SENSOR_TYPE);
+              put("prefix.kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
+            }}
+    );
+
+    writer.configure(SENSOR_TYPE, configuration);
+    Map<String, Object> producerConfigs = writer.createProducerConfigs();
+    Assert.assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
+    Assert.assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    Assert.assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    Assert.assertEquals(producerConfigs.get("request.required.acks"), 1);
+    Assert.assertEquals(producerConfigs.get("key1"), 1);
+    Assert.assertEquals(producerConfigs.get("key2"), "value2");
+  }
+}


Mime
View raw message