metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmiklav...@apache.org
Subject [12/15] metron git commit: METRON-1657 Parser aggregation in storm (justinleet) closes apache/metron#1099
Date Fri, 20 Jul 2018 15:40:40 GMT
METRON-1657 Parser aggregation in storm (justinleet) closes apache/metron#1099


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/f4345383
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/f4345383
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/f4345383

Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: f43453830469b7fad14b59c95ad8122f320a4b05
Parents: 3a2675e
Author: justinleet <justinjleet@gmail.com>
Authored: Wed Jul 18 08:54:43 2018 -0400
Committer: leet <leet@apache.org>
Committed: Wed Jul 18 08:54:43 2018 -0400

----------------------------------------------------------------------
 .../service/impl/StormAdminServiceImpl.java     |  23 +-
 .../rest/service/impl/StormCLIWrapper.java      |  25 +-
 .../common/bolt/ConfiguredParserBolt.java       |  10 +-
 .../configuration/SensorParserConfig.java       |   6 +
 .../apache/metron/common/error/MetronError.java |  64 ++--
 .../common/bolt/ConfiguredParserBoltTest.java   |   2 +-
 .../metron/common/error/MetronErrorTest.java    |  16 +-
 metron-platform/metron-parsers/README.md        |  25 +-
 .../apache/metron/parsers/bolt/ParserBolt.java  | 241 ++++++++------
 .../apache/metron/parsers/bolt/WriterBolt.java  |   3 +-
 .../parsers/topology/ParserComponents.java      |  67 ++++
 .../parsers/topology/ParserTopologyBuilder.java | 200 ++++++++----
 .../parsers/topology/ParserTopologyCLI.java     | 264 ++++++++++-----
 .../parsers/topology/config/ValueSupplier.java  |   3 +-
 .../metron/parsers/bolt/ParserBoltTest.java     | 182 +++++++----
 .../metron/parsers/bolt/WriterBoltTest.java     |  30 +-
 .../parsers/integration/ParserDriver.java       |  50 ++-
 .../components/ParserTopologyComponent.java     |  58 ++--
 .../parsers/topology/ParserTopologyCLITest.java | 318 +++++++++++++------
 ...pleHbaseEnrichmentWriterIntegrationTest.java |   3 +-
 .../integration/WriterBoltIntegrationTest.java  | 150 ++++++++-
 .../metron/writer/BulkWriterComponent.java      |   3 +-
 .../metron/writer/BulkWriterComponentTest.java  |  34 +-
 use-cases/parser_chaining/README.md             |  14 +
 .../aggregated_parser_chaining_flow.svg         |  14 +
 .../aggregated_parser_chaining_flow.xml         |  14 +
 26 files changed, 1259 insertions(+), 560 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
index 40b01f1..4569a23 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
@@ -50,17 +50,28 @@ public class StormAdminServiceImpl implements StormAdminService {
         TopologyResponse topologyResponse = new TopologyResponse();
         if (globalConfigService.get() == null) {
             topologyResponse.setErrorMessage(TopologyStatusCode.GLOBAL_CONFIG_MISSING.toString());
-        } else if (sensorParserConfigService.findOne(name) == null) {
-            topologyResponse.setErrorMessage(TopologyStatusCode.SENSOR_PARSER_CONFIG_MISSING.toString());
-        } else {
-            topologyResponse = createResponse(stormCLIClientWrapper.startParserTopology(name), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR);
+            return topologyResponse;
         }
-        return topologyResponse;
+
+        String[] sensorTypes = name.split(",");
+        for (String sensorType : sensorTypes) {
+            if (sensorParserConfigService.findOne(sensorType.trim()) == null) {
+                topologyResponse
+                    .setErrorMessage(TopologyStatusCode.SENSOR_PARSER_CONFIG_MISSING.toString());
+                return topologyResponse;
+            }
+        }
+
+        return createResponse(
+            stormCLIClientWrapper.startParserTopology(name),
+                TopologyStatusCode.STARTED,
+                TopologyStatusCode.START_ERROR
+        );
     }
 
     @Override
     public TopologyResponse stopParserTopology(String name, boolean stopNow) throws RestException {
-        return createResponse(stormCLIClientWrapper.stopParserTopology(name, stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
+        return createResponse(stormCLIClientWrapper.stopParserTopology(name.replaceAll(",", "__"), stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
index 26049dd..86e3512 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
@@ -17,14 +17,8 @@
  */
 package org.apache.metron.rest.service.impl;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.utils.KafkaUtils;
-import org.apache.metron.rest.MetronRestConstants;
-import org.apache.metron.rest.RestException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.core.env.Environment;
+import static java.util.stream.Collectors.toList;
+import static org.apache.metron.rest.MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -34,9 +28,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static java.util.stream.Collectors.toList;
-import static org.apache.metron.rest.MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.rest.MetronRestConstants;
+import org.apache.metron.rest.RestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
 
 public class StormCLIWrapper {
 
@@ -99,13 +98,13 @@ public class StormCLIWrapper {
     return exitValue;
   }
 
-  protected String[] getParserStartCommand(String name) {
+  protected String[] getParserStartCommand(String names) {
     List<String> command = new ArrayList<>();
     command.add( environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY));
 
     // sensor type
     command.add( "-s");
-    command.add( name);
+    command.add( names);
 
     // zookeeper
     command.add( "-z");

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index 1cb4e2e..14ce50b 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -28,18 +28,12 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt<ParserConfigur
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   protected final ParserConfigurations configurations = new ParserConfigurations();
-  private String sensorType;
-  public ConfiguredParserBolt(String zookeeperUrl, String sensorType) {
+  public ConfiguredParserBolt(String zookeeperUrl) {
     super(zookeeperUrl, "PARSERS");
-    this.sensorType = sensorType;
   }
 
-  protected SensorParserConfig getSensorParserConfig() {
+  protected SensorParserConfig getSensorParserConfig(String sensorType) {
     return getConfigurations().getSensorParserConfig(sensorType);
   }
 
-  public String getSensorType() {
-    return sensorType;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 4cc4e61..d3eca42 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -125,6 +125,7 @@ public class SensorParserConfig implements Serializable {
 
   /**
    * The parallelism of the Kafka spout.
+   * If multiple sensors are specified, each sensor will use it's own configured value.
    *
    * <p>This property can be overridden on the CLI.
    */
@@ -132,6 +133,7 @@ public class SensorParserConfig implements Serializable {
 
   /**
    * The number of tasks for the Kafka spout.
+   * If multiple sensors are specified, each sensor will use it's own configured value.
    *
    * <p>This property can be overridden on the CLI.
    */
@@ -139,6 +141,7 @@ public class SensorParserConfig implements Serializable {
 
   /**
    * The parallelism of the parser bolt.
+   * If multiple sensors are defined, the last one's config will win.
    *
    * <p>This property can be overridden on the CLI.
    */
@@ -146,6 +149,7 @@ public class SensorParserConfig implements Serializable {
 
   /**
    * The number of tasks for the parser bolt.
+   * If multiple sensors are defined, the last one's config will win.
    *
    * <p>This property can be overridden on the CLI.
    */
@@ -174,6 +178,7 @@ public class SensorParserConfig implements Serializable {
 
   /**
    * The Kafka security protocol.
+   * If multiple sensors are defined, any non PLAINTEXT configuration will be used.
    *
    * <p>This property can be overridden on the CLI.  This property can also be overridden by the spout config.
    */
@@ -199,6 +204,7 @@ public class SensorParserConfig implements Serializable {
 
   /**
    * Configures the cache that backs stellar field transformations.
+   * If there are multiple sensors, the configs are merged, and the last non-empty config wins.
    *
    * <ul>
    *   <li>stellar.cache.maxSize - The maximum number of elements in the cache.

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
index bc02c5c..0493be6 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
@@ -17,26 +17,31 @@
  */
 package org.apache.metron.common.error;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.metron.common.Constants.ERROR_TYPE;
+import static org.apache.metron.common.Constants.ErrorFields;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.Constants.ErrorType;
 import org.apache.metron.common.utils.HashUtils;
+import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.metron.common.Constants.ERROR_TYPE;
-import static org.apache.metron.common.Constants.ErrorFields;
-
 public class MetronError {
 
   private String message;
   private Throwable throwable;
-  private String sensorType = ERROR_TYPE;
+  private Set<String> sensorTypes = Collections.singleton(ERROR_TYPE);
   private ErrorType errorType = ErrorType.DEFAULT_ERROR;
   private Set<String> errorFields;
   private List<Object> rawMessages;
@@ -51,8 +56,8 @@ public class MetronError {
     return this;
   }
 
-  public MetronError withSensorType(String sensorType) {
-    this.sensorType = sensorType;
+  public MetronError withSensorType(Set<String> sensorTypes) {
+    this.sensorTypes = sensorTypes;
     return this;
   }
 
@@ -91,7 +96,12 @@ public class MetronError {
     JSONObject errorMessage = new JSONObject();
     errorMessage.put(Constants.GUID, UUID.randomUUID().toString());
     errorMessage.put(Constants.SENSOR_TYPE, "error");
-    errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(), sensorType);
+    if (sensorTypes.size() == 1) {
+      errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(), sensorTypes.iterator().next());
+    } else {
+      errorMessage
+          .put(ErrorFields.FAILED_SENSOR_TYPE.getName(), new JSONArray().addAll(sensorTypes));
+    }
     errorMessage.put(ErrorFields.ERROR_TYPE.getName(), errorType.getType());
 
     addMessageString(errorMessage);
@@ -184,34 +194,42 @@ public class MetronError {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
 
     MetronError that = (MetronError) o;
 
-    if (message != null ? !message.equals(that.message) : that.message != null)
+    if (message != null ? !message.equals(that.message) : that.message != null) {
       return false;
-    if (throwable != null ? !throwable.equals(that.throwable) : that.throwable != null)
+    }
+    if (getThrowable() != null ? !getThrowable().equals(that.getThrowable())
+        : that.getThrowable() != null) {
       return false;
-    if (sensorType != null ? !sensorType.equals(that.sensorType) : that.sensorType != null)
+    }
+    if (sensorTypes != null ? !sensorTypes.equals(that.sensorTypes) : that.sensorTypes != null) {
       return false;
-    if (errorType != null ? !errorType.equals(that.errorType) : that.errorType != null)
+    }
+    if (errorType != that.errorType) {
       return false;
-    if (errorFields != null ? !errorFields.equals(that.errorFields) : that.errorFields != null)
+    }
+    if (errorFields != null ? !errorFields.equals(that.errorFields) : that.errorFields != null) {
       return false;
+    }
     return rawMessages != null ? rawMessages.equals(that.rawMessages) : that.rawMessages == null;
-
   }
 
   @Override
   public int hashCode() {
     int result = message != null ? message.hashCode() : 0;
-    result = 31 * result + (throwable != null ? throwable.hashCode() : 0);
-    result = 31 * result + (sensorType != null ? sensorType.hashCode() : 0);
+    result = 31 * result + (getThrowable() != null ? getThrowable().hashCode() : 0);
+    result = 31 * result + (sensorTypes != null ? sensorTypes.hashCode() : 0);
     result = 31 * result + (errorType != null ? errorType.hashCode() : 0);
     result = 31 * result + (errorFields != null ? errorFields.hashCode() : 0);
     result = 31 * result + (rawMessages != null ? rawMessages.hashCode() : 0);
     return result;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
index 27b0469..3deba78 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
@@ -46,7 +46,7 @@ public class ConfiguredParserBoltTest extends BaseConfiguredBoltTest {
   public static class StandAloneConfiguredParserBolt extends ConfiguredParserBolt {
 
     public StandAloneConfiguredParserBolt(String zookeeperUrl) {
-      super(zookeeperUrl, null);
+      super(zookeeperUrl);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
index e7390de..177a232 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
@@ -17,20 +17,20 @@
  */
 package org.apache.metron.common.error;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.collect.Sets;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.json.simple.JSONObject;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 public class MetronErrorTest {
 
   private JSONObject message1 = new JSONObject();
@@ -47,7 +47,7 @@ public class MetronErrorTest {
     MetronError error = new MetronError()
             .withMessage("test message")
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
-            .withSensorType("sensorType");
+            .withSensorType(Collections.singleton("sensorType"));
 
     JSONObject errorJSON = error.getJSONObject();
     assertEquals("test message", errorJSON.get(Constants.ErrorFields.MESSAGE.getName()));

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 43bcc4a..0e428e3 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -82,6 +82,13 @@ topology in kafka.  Errors are collected with the context of the error
 (e.g. stacktrace) and original message causing the error and sent to an
 `error` queue.  Invalid messages as determined by global validation
 functions are also treated as errors and sent to an `error` queue. 
+
+Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
+multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
+parser as needed. There are some constraints around this, in particular regarding some configuration.
+Additionally, all sensors must flow to the same error topic. The Kafka topic is retrieved from the input Tuple itself.
+
+A worked example of this can be found in the [Parser Chaining use case](../../use-cases/parser_chaining/README.md#aggregated-parsers-with-parser-chaining).
  
 ## Message Format
 
@@ -101,7 +108,7 @@ Where appropriate there is also a standardization around the 5-tuple JSON fields
 * timestamp (epoch)
 * original_string: A human friendly string representation of the message
 
-The timestamp and original_string fields are madatory. The remaining standard fields are optional.  If any of the optional fields are not applicable then the field should be left out of the JSON.
+The timestamp and original_string fields are mandatory. The remaining standard fields are optional.  If any of the optional fields are not applicable then the field should be left out of the JSON.
 
 So putting it all together a typical Metron message with all 5-tuple fields present would look like the following:
 
@@ -138,6 +145,8 @@ the following fields:
 * `raw_message_bytes` : The raw message bytes
 * `error_hash` : A hash of the error message
 
+When aggregating multiple sensors, all sensors must be using the same error topic.
+
 ## Parser Configuration
 
 The configuration for the various parser topologies is defined by JSON
@@ -170,16 +179,16 @@ then it is assumed to be a regex and will match any topic matching the pattern (
     parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, since this disables batching.
   * The kafka writer can be configured within the parser config as well.  (This is all configured a priori, but this is convenient for overriding the settings).  See [here](../metron-writer/README.md#kafka-writer)
 * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic.
-* `spoutParallelism` : The kafka spout parallelism (default to `1`).  This can be overridden on the command line.
-* `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line.
-* `parserParallelism` : The parser bolt parallelism (default to `1`). This can be overridden on the command line.
-* `parserNumTasks` : The number of tasks for the parser bolt (default to `1`). This can be overridden on the command line.
+* `spoutParallelism` : The kafka spout parallelism (default to `1`).  This can be overridden on the command line, and if there are multiple sensors should be in a comma separated list in the same order as the sensors.
+* `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line, and if there are multiple sensors should be in a comma separated list in the same order as the sensors.
+* `parserParallelism` : The parser bolt parallelism (default to `1`). If there are multiple sensors, the last one's configuration will be used. This can be overridden on the command line.
+* `parserNumTasks` : The number of tasks for the parser bolt (default to `1`). If there are multiple sensors, the last one's configuration will be used. This can be overridden on the command line.
 * `errorWriterParallelism` : The error writer bolt parallelism (default to `1`). This can be overridden on the command line.
 * `errorWriterNumTasks` : The number of tasks for the error writer bolt (default to `1`). This can be overridden on the command line.
 * `numWorkers` : The number of workers to use in the topology (default is the storm default of `1`).
 * `numAckers` : The number of acker executors to use in the topology (default is the storm default of `1`).
-* `spoutConfig` : A map representing a custom spout config (this is a map). This can be overridden on the command line.
-* `securityProtocol` : The security protocol to use for reading from kafka (this is a string).  This can be overridden on the command line and also specified in the spout config via the `security.protocol` key.  If both are specified, then they are merged and the CLI will take precedence.
+* `spoutConfig` : A map representing a custom spout config (this is a map). If there are multiple sensors, the configs will be merged with the last specified taking precedence. This can be overridden on the command line.
+* `securityProtocol` : The security protocol to use for reading from kafka (this is a string).  This can be overridden on the command line and also specified in the spout config via the `security.protocol` key.  If both are specified, then they are merged and the CLI will take precedence. If multiple sensors are used, any non "PLAINTEXT" value will be used.
 * `stormConfig` : The storm config to use (this is a map).  This can be overridden on the command line.  If both are specified, they are merged with CLI properties taking precedence.
 * `cacheConfig` : Cache config for stellar field transformations.   This configures a least frequently used cache.  This is a map with the following keys.  If not explicitly configured (the default), then no cache will be used.
   * `stellar.cache.maxSize` - The maximum number of elements in the cache. Default is to not use a cache.
@@ -598,6 +607,8 @@ and pass `--extra_topology_options custom_config.json` to `start_parser_topology
 Default installed Metron is untuned for production deployment.  There
 are a few knobs to tune to get the most out of your system.
 
+When using aggregated parsers, it's highly recommended to aggregate parsers with similar velocity and parser complexity together.
+
 # Notes on Adding a New Sensor
 In order to allow for meta alerts to be queries alongside regular alerts in Elasticsearch 2.x,
 it is necessary to add an additional field to the templates and mapping for existing sensors.

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index f68c670..213d02c 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -19,7 +19,6 @@ package org.apache.metron.parsers.bolt;
 
 
 import com.github.benmanes.caffeine.cache.Cache;
-import java.io.IOException;
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -27,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -42,15 +42,17 @@ import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.message.metadata.RawMessage;
 import org.apache.metron.common.message.metadata.RawMessageUtil;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.parsers.topology.ParserComponents;
 import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration;
 import org.apache.metron.writer.WriterToBulkWriter;
 import org.apache.metron.writer.bolt.BatchTimeoutHelper;
 import org.apache.storm.Config;
@@ -69,10 +71,9 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private OutputCollector collector;
-  private MessageParser<JSONObject> parser;
-  //default filter is noop, so pass everything through.
-  private MessageFilter<JSONObject> filter;
-  private WriterHandler writer;
+  private Map<String, ParserComponents> sensorToComponentMap;
+  private Map<String, String> topicToSensorMap = new HashMap<>();
+
   private Context stellarContext;
   private transient MessageGetStrategy messageGetStrategy;
   private transient Cache<CachingStellarProcessor.Key, Object> cache;
@@ -81,20 +82,21 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   private int batchTimeoutDivisor = 1;
 
   public ParserBolt( String zookeeperUrl
-                   , String sensorType
-                   , MessageParser<JSONObject> parser
-                   , WriterHandler writer
-  )
-  {
-    super(zookeeperUrl, sensorType);
-    this.writer = writer;
-    this.parser = parser;
-  }
+                   , Map<String, ParserComponents> sensorToComponentMap
+  ) {
+    super(zookeeperUrl);
+    this.sensorToComponentMap = sensorToComponentMap;
 
-
-  public ParserBolt withMessageFilter(MessageFilter<JSONObject> filter) {
-    this.filter = filter;
-    return this;
+    // Ensure that all sensors are either bulk sensors or not bulk sensors.  Can't mix and match.
+    Boolean handleAcks = null;
+    for (Map.Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
+      boolean writerHandleAck = entry.getValue().getWriter().handleAck();
+      if (handleAcks == null) {
+        handleAcks = writerHandleAck;
+      } else if (!handleAcks.equals(writerHandleAck)) {
+        throw new IllegalArgumentException("All writers must match when calling handleAck()");
+      }
+    }
   }
 
   /**
@@ -137,8 +139,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     return defaultBatchTimeout;
   }
 
-  public MessageParser<JSONObject> getParser() {
-    return parser;
+  public Map<String, ParserComponents> getSensorToComponentMap() {
+    return sensorToComponentMap;
   }
 
   /**
@@ -153,14 +155,15 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     // to get the valid WriterConfiguration.  But don't store any non-serializable objects,
     // else Storm will throw a runtime error.
     Function<WriterConfiguration, WriterConfiguration> configurationXform;
-    if(writer.isWriterToBulkWriter()) {
+    WriterHandler writer = sensorToComponentMap.entrySet().iterator().next().getValue().getWriter();
+    if (writer.isWriterToBulkWriter()) {
       configurationXform = WriterToBulkWriter.TRANSFORMATION;
-    }
-    else {
+    } else {
       configurationXform = x -> x;
     }
     WriterConfiguration writerconf = configurationXform
-        .apply(getConfigurationStrategy().createWriterConfig(writer.getBulkMessageWriter(), getConfigurations()));
+        .apply(getConfigurationStrategy()
+            .createWriterConfig(writer.getBulkMessageWriter(), getConfigurations()));
 
     BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
     this.requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval();
@@ -182,40 +185,61 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     super.prepare(stormConf, context, collector);
     messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
     this.collector = collector;
-    if(getSensorParserConfig() != null) {
-      cache = CachingStellarProcessor.createCache(getSensorParserConfig().getCacheConfig());
-    }
-    initializeStellar();
-    if(getSensorParserConfig() != null && filter == null) {
-      getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext);
-      if (!StringUtils.isEmpty(getSensorParserConfig().getFilterClassName())) {
-        filter = Filters.get(getSensorParserConfig().getFilterClassName()
-                , getSensorParserConfig().getParserConfig()
-        );
+
+    // Build the Stellar cache
+    Map<String, Object> cacheConfig = new HashMap<>();
+    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
+      String sensor = entry.getKey();
+      SensorParserConfig config = getSensorParserConfig(sensor);
+
+      if (config != null) {
+        cacheConfig.putAll(config.getCacheConfig());
       }
     }
+    cache = CachingStellarProcessor.createCache(cacheConfig);
 
-    parser.init();
+    // Need to prep all sensors
+    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
+      String sensor = entry.getKey();
+      MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
 
-    writer.init(stormConf, context, collector, getConfigurations());
-    if (defaultBatchTimeout == 0) {
-      //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
-      //probably because we are in a unit test scenario.  So calculate it here.
-      WriterConfiguration writerConfig = getConfigurationStrategy()
-          .createWriterConfig(writer.getBulkMessageWriter(), getConfigurations());
-      BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerConfig::getAllConfiguredTimeouts, batchTimeoutDivisor);
-      defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
-    }
-    writer.setDefaultBatchTimeout(defaultBatchTimeout);
+      initializeStellar();
+      if (getSensorParserConfig(sensor) != null && sensorToComponentMap.get(sensor).getFilter() == null) {
+        getSensorParserConfig(sensor).getParserConfig().putIfAbsent("stellarContext", stellarContext);
+        if (!StringUtils.isEmpty(getSensorParserConfig(sensor).getFilterClassName())) {
+          MessageFilter<JSONObject> filter = Filters.get(
+              getSensorParserConfig(sensor).getFilterClassName(),
+              getSensorParserConfig(sensor).getParserConfig()
+          );
+          getSensorToComponentMap().get(sensor).setFilter(filter);
+        }
+      }
 
-    SensorParserConfig config = getSensorParserConfig();
-    if(config != null) {
-      config.init();
-    }
-    else {
-      throw new IllegalStateException("Unable to retrieve a parser config for " + getSensorType());
+      parser.init();
+
+      SensorParserConfig config = getSensorParserConfig(sensor);
+      if (config != null) {
+        config.init();
+        topicToSensorMap.put(config.getSensorTopic(), sensor);
+      } else {
+        throw new IllegalStateException(
+            "Unable to retrieve a parser config for " + sensor);
+      }
+      parser.configure(config.getParserConfig());
+
+      WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
+      writer.init(stormConf, context, collector, getConfigurations());
+      if (defaultBatchTimeout == 0) {
+        //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
+        //probably because we are in a unit test scenario.  So calculate it here.
+        WriterConfiguration writerConfig = getConfigurationStrategy()
+            .createWriterConfig(writer.getBulkMessageWriter(), getConfigurations());
+        BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(
+            writerConfig::getAllConfiguredTimeouts, batchTimeoutDivisor);
+        defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+      }
+      writer.setDefaultBatchTimeout(defaultBatchTimeout);
     }
-    parser.configure(config.getParserConfig());
   }
 
   protected void initializeStellar() {
@@ -237,7 +261,9 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   public void execute(Tuple tuple) {
     if (TupleUtils.isTick(tuple)) {
       try {
-        writer.flush(getConfigurations(), messageGetStrategy);
+        for (Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
+          entry.getValue().getWriter().flush(getConfigurations(), messageGetStrategy);
+        }
       } catch (Exception e) {
         throw new RuntimeException(
             "This should have been caught in the writerHandler.  If you see this, file a JIRA", e);
@@ -246,69 +272,101 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
       }
       return;
     }
-    SensorParserConfig sensorParserConfig = getSensorParserConfig();
+
     byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
     try {
-      //we want to ack the tuple in the situation where we have are not doing a bulk write
-      //otherwise we want to defer to the writerComponent who will ack on bulk commit.
-      boolean ackTuple = !writer.handleAck();
+      SensorParserConfig sensorParserConfig;
+      MessageParser<JSONObject> parser;
+      String sensor;
+      Map<String, Object> metadata;
+      if (sensorToComponentMap.size() == 1) {
+        // There's only one parser, so grab info directly
+        Entry<String, ParserComponents> sensorParser = sensorToComponentMap.entrySet().iterator()
+            .next();
+        sensor = sensorParser.getKey();
+        parser = sensorParser.getValue().getMessageParser();
+        sensorParserConfig = getSensorParserConfig(sensor);
+      } else {
+        // There's multiple parsers, so pull the topic from the Tuple and look up the sensor
+        String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
+        sensor = topicToSensorMap.get(topic);
+        parser = sensorToComponentMap.get(sensor).getMessageParser();
+        sensorParserConfig = getSensorParserConfig(sensor);
+      }
+
+      List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
+      boolean ackTuple = false;
       int numWritten = 0;
-      if(sensorParserConfig != null) {
+      if (sensorParserConfig != null) {
         RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
-                                                                   , tuple
-                                                                   , originalMessage
-                                                                   , sensorParserConfig.getReadMetadata()
-                                                                   , sensorParserConfig.getRawMessageStrategyConfig()
-                                                                   );
-        Map<String, Object> metadata = rawMessage.getMetadata();
-        List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
+            , tuple
+            , originalMessage
+            , sensorParserConfig.getReadMetadata()
+            , sensorParserConfig.getRawMessageStrategyConfig()
+        );
+        metadata = rawMessage.getMetadata();
 
         Optional<List<JSONObject>> messages = parser.parseOptional(rawMessage.getMessage());
         for (JSONObject message : messages.orElse(Collections.emptyList())) {
-          sensorParserConfig.getRawMessageStrategy().mergeMetadata( message
-                                                                  , metadata
-                                                                  , sensorParserConfig.getMergeMetadata()
-                                                                  , sensorParserConfig.getRawMessageStrategyConfig()
-                                                                  );
-          message.put(Constants.SENSOR_TYPE, getSensorType());
+          //we want to ack the tuple in the situation where we have are not doing a bulk write
+          //otherwise we want to defer to the writerComponent who will ack on bulk commit.
+          WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
+          ackTuple = !writer.handleAck();
+
+          sensorParserConfig.getRawMessageStrategy().mergeMetadata(
+              message,
+              metadata,
+              sensorParserConfig.getMergeMetadata(),
+              sensorParserConfig.getRawMessageStrategyConfig()
+          );
+          message.put(Constants.SENSOR_TYPE, sensor);
+
           for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
             if (handler != null) {
-              if(!sensorParserConfig.getMergeMetadata()) {
+              if (!sensorParserConfig.getMergeMetadata()) {
                 //if we haven't merged metadata, then we need to pass them along as configuration params.
-                handler.transformAndUpdate(message, stellarContext, sensorParserConfig.getParserConfig(), metadata);
-              }
-              else {
-                handler.transformAndUpdate(message, stellarContext, sensorParserConfig.getParserConfig());
+                handler.transformAndUpdate(
+                    message,
+                    stellarContext,
+                    sensorParserConfig.getParserConfig(),
+                    metadata
+                );
+              } else {
+                handler.transformAndUpdate(
+                    message,
+                    stellarContext,
+                    sensorParserConfig.getParserConfig()
+                );
               }
             }
           }
-          if(!message.containsKey(Constants.GUID)) {
+          if (!message.containsKey(Constants.GUID)) {
             message.put(Constants.GUID, UUID.randomUUID().toString());
           }
 
+          MessageFilter<JSONObject> filter = sensorToComponentMap.get(sensor).getFilter();
           if (filter == null || filter.emitTuple(message, stellarContext)) {
             boolean isInvalid = !parser.validate(message);
             List<FieldValidator> failedValidators = null;
-            if(!isInvalid) {
+            if (!isInvalid) {
               failedValidators = getFailedValidators(message, fieldValidations);
               isInvalid = !failedValidators.isEmpty();
             }
-            if( isInvalid) {
+            if (isInvalid) {
               MetronError error = new MetronError()
-                      .withErrorType(Constants.ErrorType.PARSER_INVALID)
-                      .withSensorType(getSensorType())
-                      .addRawMessage(message);
-              Set<String> errorFields = failedValidators == null?null:failedValidators.stream()
-                      .flatMap(fieldValidator -> fieldValidator.getInput().stream())
-                      .collect(Collectors.toSet());
+                  .withErrorType(Constants.ErrorType.PARSER_INVALID)
+                  .withSensorType(Collections.singleton(sensor))
+                  .addRawMessage(message);
+              Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
+                  .flatMap(fieldValidator -> fieldValidator.getInput().stream())
+                  .collect(Collectors.toSet());
               if (errorFields != null && !errorFields.isEmpty()) {
                 error.withErrorFields(errorFields);
               }
               ErrorUtils.handleError(collector, error);
-            }
-            else {
+            } else {
               numWritten++;
-              writer.write(getSensorType(), tuple, message, getConfigurations(), messageGetStrategy);
+              writer.write(sensor, tuple, message, getConfigurations(), messageGetStrategy);
             }
           }
         }
@@ -316,9 +374,10 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
       //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
       //(meaning that none of the messages are valid either globally or locally)
       //then we want to handle the ack ourselves.
-      if(ackTuple || numWritten == 0) {
+      if (ackTuple || numWritten == 0) {
         collector.ack(tuple);
       }
+
     } catch (Throwable ex) {
       handleError(originalMessage, tuple, ex, collector);
     }
@@ -328,7 +387,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
             .withThrowable(ex)
-            .withSensorType(getSensorType())
+            .withSensorType(sensorToComponentMap.keySet())
             .addRawMessage(originalMessage);
     ErrorUtils.handleError(collector, error);
     collector.ack(tuple);

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
index ef93ba2..fdfceda 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.parsers.bolt;
 
+import java.util.Collections;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.error.MetronError;
@@ -84,7 +85,7 @@ public class WriterBolt extends BaseRichBolt {
       MetronError error = new MetronError()
               .withErrorType(errorType)
               .withThrowable(e)
-              .withSensorType(sensorType)
+              .withSensorType(Collections.singleton(sensorType))
               .addRawMessage(message);
       ErrorUtils.handleError(collector, error);
       collector.ack(tuple);

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
new file mode 100644
index 0000000..32d56b9
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.topology;
+
+import java.io.Serializable;
+import org.apache.metron.parsers.bolt.WriterHandler;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+
+public class ParserComponents implements Serializable {
+  private static final long serialVersionUID = 7880346740026374665L;
+
+  private MessageParser<JSONObject> messageParser;
+  private MessageFilter<JSONObject> filter;
+  private WriterHandler writer;
+
+  public ParserComponents(
+      MessageParser<JSONObject> messageParser,
+      MessageFilter<JSONObject> filter,
+      WriterHandler writer) {
+    this.messageParser = messageParser;
+    this.filter = filter;
+    this.writer = writer;
+  }
+
+  public MessageParser<JSONObject> getMessageParser() {
+    return messageParser;
+  }
+
+  public MessageFilter<JSONObject> getFilter() {
+    return filter;
+  }
+
+  public WriterHandler getWriter() {
+    return writer;
+  }
+
+  public void setMessageParser(
+      MessageParser<JSONObject> messageParser) {
+    this.messageParser = messageParser;
+  }
+
+  public void setFilter(
+      MessageFilter<JSONObject> filter) {
+    this.filter = filter;
+  }
+
+  public void setWriter(WriterHandler writer) {
+    this.writer = writer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 5b3e0d5..d20e1a5 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -17,35 +17,45 @@
  */
 package org.apache.metron.parsers.topology;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.metron.common.utils.KafkaUtils;
-import org.apache.metron.parsers.topology.config.ValueSupplier;
-import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
-import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
-import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
-import org.apache.storm.Config;
-import org.apache.storm.kafka.spout.KafkaSpout;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.topology.TopologyBuilder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.MessageWriter;
-import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.topology.config.ValueSupplier;
+import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
+import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
+import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
 import org.apache.metron.writer.AbstractWriter;
 import org.apache.metron.writer.kafka.KafkaWriter;
+import org.apache.storm.Config;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
 import org.json.simple.JSONObject;
 
-import java.util.*;
-
 /**
  * Builds a Storm topology that parses telemetry data received from a sensor.
  */
@@ -75,7 +85,7 @@ public class ParserTopologyBuilder {
    *
    * @param zookeeperUrl             Zookeeper URL
    * @param brokerUrl                Kafka Broker URL
-   * @param sensorType               Type of sensor
+   * @param sensorTypes               Type of sensor
    * @param spoutParallelismSupplier         Supplier for the parallelism hint for the spout
    * @param spoutNumTasksSupplier            Supplier for the number of tasks for the spout
    * @param parserParallelismSupplier        Supplier for the parallelism hint for the parser bolt
@@ -91,14 +101,14 @@ public class ParserTopologyBuilder {
    */
   public static ParserTopology build(String zookeeperUrl,
                                       Optional<String> brokerUrl,
-                                      String sensorType,
-                                      ValueSupplier<Integer> spoutParallelismSupplier,
-                                      ValueSupplier<Integer> spoutNumTasksSupplier,
+                                      List<String> sensorTypes,
+                                      ValueSupplier<List> spoutParallelismSupplier,
+                                      ValueSupplier<List> spoutNumTasksSupplier,
                                       ValueSupplier<Integer> parserParallelismSupplier,
                                       ValueSupplier<Integer> parserNumTasksSupplier,
                                       ValueSupplier<Integer> errorWriterParallelismSupplier,
                                       ValueSupplier<Integer> errorWriterNumTasksSupplier,
-                                      ValueSupplier<Map> kafkaSpoutConfigSupplier,
+                                      ValueSupplier<List> kafkaSpoutConfigSupplier,
                                       ValueSupplier<String> securityProtocolSupplier,
                                       ValueSupplier<String> outputTopicSupplier,
                                       ValueSupplier<String> errorTopicSupplier,
@@ -107,40 +117,72 @@ public class ParserTopologyBuilder {
 
     // fetch configuration from zookeeper
     ParserConfigurations configs = new ParserConfigurations();
-    SensorParserConfig parserConfig = getSensorParserConfig(zookeeperUrl, sensorType, configs);
-    int spoutParallelism = spoutParallelismSupplier.get(parserConfig, Integer.class);
-    int spoutNumTasks = spoutNumTasksSupplier.get(parserConfig, Integer.class);
-    int parserParallelism = parserParallelismSupplier.get(parserConfig, Integer.class);
-    int parserNumTasks = parserNumTasksSupplier.get(parserConfig, Integer.class);
-    int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfig, Integer.class);
-    int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, Integer.class);
-    String outputTopic = outputTopicSupplier.get(parserConfig, String.class);
-
-    Map<String, Object> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfig, Map.class);
-    Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class));
+    Map<String, SensorParserConfig> sensorToParserConfigs = getSensorParserConfig(zookeeperUrl, sensorTypes, configs);
+    Collection<SensorParserConfig> parserConfigs = sensorToParserConfigs.values();
+
+    @SuppressWarnings("unchecked")
+    List<Integer> spoutParallelism = (List<Integer>) spoutParallelismSupplier.get(parserConfigs, List.class);
+    @SuppressWarnings("unchecked")
+    List<Integer> spoutNumTasks = (List<Integer>) spoutNumTasksSupplier.get(parserConfigs, List.class);
+    int parserParallelism = parserParallelismSupplier.get(parserConfigs, Integer.class);
+    int parserNumTasks = parserNumTasksSupplier.get(parserConfigs, Integer.class);
+    int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfigs, Integer.class);
+    int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfigs, Integer.class);
+    String outputTopic = outputTopicSupplier.get(parserConfigs, String.class);
+
+    List<Map<String, Object>> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfigs, List.class);
+    Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfigs, String.class));
 
     // create the spout
     TopologyBuilder builder = new TopologyBuilder();
-    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig), parserConfig);
-    builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
-            .setNumTasks(spoutNumTasks);
+    int i = 0;
+    List<String> spoutIds = new ArrayList<>();
+    for (Entry<String, SensorParserConfig> entry: sensorToParserConfigs.entrySet()) {
+      KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, entry.getKey(), securityProtocol,
+          Optional.ofNullable(kafkaSpoutConfig.get(i)), entry.getValue());
+      String spoutId = sensorToParserConfigs.size() > 1 ? "kafkaSpout-" + entry.getKey() : "kafkaSpout";
+      builder.setSpout(spoutId, kafkaSpout, spoutParallelism.get(i))
+          .setNumTasks(spoutNumTasks.get(i));
+      spoutIds.add(spoutId);
+      ++i;
+    }
 
     // create the parser bolt
-    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, Optional.of(outputTopic));
-    builder.setBolt("parserBolt", parserBolt, parserParallelism)
-            .setNumTasks(parserNumTasks)
-            .localOrShuffleGrouping("kafkaSpout");
+    ParserBolt parserBolt = createParserBolt(
+        zookeeperUrl,
+        brokerUrl,
+        sensorToParserConfigs,
+        securityProtocol,
+        configs,
+        Optional.ofNullable(outputTopic)
+    );
+
+    BoltDeclarer boltDeclarer = builder
+        .setBolt("parserBolt", parserBolt, parserParallelism)
+        .setNumTasks(parserNumTasks);
+
+    for (String spoutId : spoutIds) {
+      boltDeclarer.localOrShuffleGrouping(spoutId);
+    }
 
     // create the error bolt, if needed
     if (errorWriterNumTasks > 0) {
-      String errorTopic = errorTopicSupplier.get(parserConfig, String.class);
-      WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, errorTopic);
+      String errorTopic = errorTopicSupplier.get(parserConfigs, String.class);
+      WriterBolt errorBolt = createErrorBolt(
+          zookeeperUrl,
+          brokerUrl,
+          sensorTypes.get(0),
+          securityProtocol,
+          configs,
+          parserConfigs.iterator().next(),
+          errorTopic
+      );
       builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
               .setNumTasks(errorWriterNumTasks)
               .localOrShuffleGrouping("parserBolt", Constants.ERROR_STREAM);
     }
 
-    return new ParserTopology(builder, stormConfigSupplier.get(parserConfig, Config.class));
+    return new ParserTopology(builder, stormConfigSupplier.get(parserConfigs, Config.class));
   }
 
   /**
@@ -216,42 +258,62 @@ public class ParserTopologyBuilder {
    *
    * @param zookeeperUrl Zookeeper URL
    * @param brokerUrl    Kafka Broker URL
-   * @param sensorType   Type of sensor that is being consumed.
+   * @param sensorTypeToParserConfig
    * @param configs
-   * @param parserConfig
    * @return A Storm bolt that parses input from a sensor
    */
   private static ParserBolt createParserBolt( String zookeeperUrl,
                                               Optional<String> brokerUrl,
-                                              String sensorType,
+                                              Map<String, SensorParserConfig> sensorTypeToParserConfig,
                                               Optional<String> securityProtocol,
                                               ParserConfigurations configs,
-                                              SensorParserConfig parserConfig,
                                               Optional<String> outputTopic) {
 
-    // create message parser
-    MessageParser<JSONObject> parser = ReflectionUtils.createInstance(parserConfig.getParserClassName());
-    parser.configure(parserConfig.getParserConfig());
+    Map<String, ParserComponents> parserBoltConfigs = new HashMap<>();
+    for( Entry<String, SensorParserConfig> entry : sensorTypeToParserConfig.entrySet()) {
+      String sensorType = entry.getKey();
+      SensorParserConfig parserConfig = entry.getValue();
+      // create message parser
+      MessageParser<JSONObject> parser = ReflectionUtils
+          .createInstance(parserConfig.getParserClassName());
+      parser.configure(parserConfig.getParserConfig());
+
+      // create message filter
+      MessageFilter<JSONObject> filter = null;
+      if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
+        filter = Filters.get(
+            parserConfig.getFilterClassName(),
+            parserConfig.getParserConfig()
+        );
+      }
 
-    // create a writer
-    AbstractWriter writer;
-    if(parserConfig.getWriterClassName() == null) {
+      // create a writer
+      AbstractWriter writer;
+      if (parserConfig.getWriterClassName() == null) {
 
-      // if not configured, use a sensible default
-      writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
-              .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
+        // if not configured, use a sensible default
+        writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
+            .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
 
-    } else {
-      writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
-    }
+      } else {
+        writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+      }
 
-    // configure it
-    writer.configure(sensorType, new ParserWriterConfiguration(configs));
+      // configure it
+      writer.configure(sensorType, new ParserWriterConfiguration(configs));
 
-    // create a writer handler
-    WriterHandler writerHandler = createWriterHandler(writer);
+      // create a writer handler
+      WriterHandler writerHandler = createWriterHandler(writer);
+
+      ParserComponents components = new ParserComponents(
+         parser,
+         filter,
+         writerHandler
+      );
+      parserBoltConfigs.put(sensorType, components);
+    }
 
-    return new ParserBolt(zookeeperUrl, sensorType, parser, writerHandler);
+    return new ParserBolt(zookeeperUrl, parserBoltConfigs);
   }
 
   /**
@@ -304,22 +366,26 @@ public class ParserTopologyBuilder {
    * Fetch the parser configuration from Zookeeper.
    *
    * @param zookeeperUrl Zookeeper URL
-   * @param sensorType   Type of sensor
+   * @param sensorTypes Types of sensor
    * @param configs
    * @return
    * @throws Exception
    */
-  private static SensorParserConfig getSensorParserConfig(String zookeeperUrl, String sensorType, ParserConfigurations configs) throws Exception {
+  private static Map<String, SensorParserConfig> getSensorParserConfig(String zookeeperUrl, List<String> sensorTypes, ParserConfigurations configs) throws Exception {
+    Map<String, SensorParserConfig> parserConfigs = new HashMap<>();
     try(CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl)) {
       client.start();
       ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client);
-      SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType);
-      if (parserConfig == null) {
-        throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." +
-                "  Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command.");
+      for (String sensorType : sensorTypes) {
+        SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType);
+        if (parserConfig == null) {
+          throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." +
+                  "  Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command.");
+        }
+        parserConfigs.put(sensorType, parserConfig);
       }
-      return parserConfig;
     }
+    return parserConfigs;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index f60ff44..eb39f89 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -17,27 +17,44 @@
  */
 package org.apache.metron.parsers.topology;
 
+import com.google.common.base.Joiner;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.parsers.topology.config.Arg;
+import org.apache.metron.parsers.topology.config.ConfigHandlers;
 import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.utils.Utils;
-import com.google.common.base.Joiner;
-import org.apache.commons.cli.*;
-import org.apache.commons.io.FileUtils;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.parsers.topology.config.Arg;
-import org.apache.metron.parsers.topology.config.ConfigHandlers;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.function.Function;
 
 public class ParserTopologyCLI {
 
+  private static final String STORM_JOB_SEPARATOR = "__";
+
   public enum ParserOptions {
     HELP("h", code -> {
       Option o = new Option(code, "help", false, "This screen");
@@ -45,7 +62,7 @@ public class ParserTopologyCLI {
       return o;
     }),
     ZK_QUORUM("z", code -> {
-      Option o = new Option(code, "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+      Option o = new Option(code, "zk", true, "Zookeeper Quorum URL (zk1:2181,zk2:2181,...");
       o.setArgName("ZK_QUORUM");
       o.setRequired(true);
       return o;
@@ -56,14 +73,14 @@ public class ParserTopologyCLI {
       o.setRequired(false);
       return o;
     }),
-    SENSOR_TYPE("s", code -> {
-      Option o = new Option(code, "sensor", true, "Sensor Type");
-      o.setArgName("SENSOR_TYPE");
+    SENSOR_TYPES("s", code -> {
+      Option o = new Option(code, "sensor", true, "Sensor Types as comma-separated list");
+      o.setArgName("SENSOR_TYPES");
       o.setRequired(true);
       return o;
     }),
     SPOUT_PARALLELISM("sp", code -> {
-      Option o = new Option(code, "spout_p", true, "Spout Parallelism Hint");
+      Option o = new Option(code, "spout_p", true, "Spout Parallelism Hint. If multiple sensors are specified, this should be a comma separated list in the same order.");
       o.setArgName("SPOUT_PARALLELISM_HINT");
       o.setRequired(false);
       o.setType(Number.class);
@@ -91,7 +108,7 @@ public class ParserTopologyCLI {
       return o;
     }),
     SPOUT_NUM_TASKS("snt", code -> {
-      Option o = new Option(code, "spout_num_tasks", true, "Spout Num Tasks");
+      Option o = new Option(code, "spout_num_tasks", true, "Spout Num Tasks. If multiple sensors are specified, this should be a comma separated list in the same order.");
       o.setArgName("NUM_TASKS");
       o.setRequired(false);
       o.setType(Number.class);
@@ -307,7 +324,9 @@ public class ParserTopologyCLI {
   public ParserTopologyBuilder.ParserTopology createParserTopology(final CommandLine cmd) throws Exception {
     String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);
     Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
-    String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
+    String sensorTypeRaw= ParserOptions.SENSOR_TYPES.get(cmd);
+    List<String> sensorTypes = Arrays.stream(sensorTypeRaw.split(",")).map(String::trim).collect(
+        Collectors.toList());
 
     /*
      * It bears mentioning why we're creating this ValueSupplier indirection here.
@@ -328,116 +347,191 @@ public class ParserTopologyCLI {
      */
 
     // kafka spout parallelism
-    ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> {
+    ValueSupplier<List> spoutParallelism = (parserConfigs, clazz) -> {
       if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) {
-        return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
+        // Handle the case where there's only one and we can default reasonably
+        if( parserConfigs.size() == 1) {
+          return Collections.singletonList(Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1")));
+        }
+
+        // Handle the multiple explicitly passed spout parallelism's case.
+        String parallelismRaw = ParserOptions.SPOUT_PARALLELISM.get(cmd, "1");
+        List<String> parallelisms = Arrays.stream(parallelismRaw.split(",")).map(String::trim).collect(
+            Collectors.toList());
+        if (parallelisms.size() != parserConfigs.size()) {
+          throw new IllegalArgumentException("Spout parallelism should match number of sensors 1:1");
+        }
+        List<Integer> spoutParallelisms = new ArrayList<>();
+        for (String s : parallelisms) {
+          spoutParallelisms.add(Integer.parseInt(s));
+        }
+        return spoutParallelisms;
+      }
+
+      List<Integer> spoutParallelisms = new ArrayList<>();
+      for (SensorParserConfig parserConfig : parserConfigs) {
+        spoutParallelisms.add(parserConfig.getSpoutParallelism());
       }
-      return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1);
+      return spoutParallelisms;
     };
 
     // kafka spout number of tasks
-    ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> {
+    ValueSupplier<List> spoutNumTasks = (parserConfigs, clazz) -> {
       if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) {
-        return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
+        // Handle the case where there's only one and we can default reasonably
+        if( parserConfigs.size() == 1) {
+          return Collections.singletonList(Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1")));
+        }
+
+        // Handle the multiple explicitly passed spout parallelism's case.
+        String numTasksRaw = ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1");
+        List<String> numTasks = Arrays.stream(numTasksRaw.split(",")).map(String::trim).collect(
+            Collectors.toList());
+        if (numTasks.size() != parserConfigs.size()) {
+          throw new IllegalArgumentException("Spout num tasks should match number of sensors 1:1");
+        }
+        List<Integer> spoutTasksList = new ArrayList<>();
+        for (String s : numTasks) {
+          spoutTasksList.add(Integer.parseInt(s));
+        }
+        return spoutTasksList;
+      }
+
+      List<Integer> numTasks = new ArrayList<>();
+      for (SensorParserConfig parserConfig : parserConfigs) {
+        numTasks.add(parserConfig.getSpoutNumTasks());
       }
-      return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1);
+      return numTasks;
     };
 
     // parser bolt parallelism
-    ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> {
+    ValueSupplier<Integer> parserParallelism = (parserConfigs, clazz) -> {
       if(ParserOptions.PARSER_PARALLELISM.has(cmd)) {
         return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
       }
-      return Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1);
+      int retValue = 1;
+      for (SensorParserConfig config : parserConfigs) {
+        Integer configValue = config.getParserParallelism();
+        retValue = configValue == null ? retValue : configValue;
+      }
+      return retValue;
     };
 
     // parser bolt number of tasks
-    ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> {
+    ValueSupplier<Integer> parserNumTasks = (parserConfigs, clazz) -> {
       if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) {
         return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
       }
-      return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1);
+      int retValue = 1;
+      for (SensorParserConfig config : parserConfigs) {
+        Integer configValue = config.getParserNumTasks();
+        retValue = configValue == null ? retValue : configValue;
+      }
+      return retValue;
     };
 
     // error bolt parallelism
-    ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> {
+    ValueSupplier<Integer> errorParallelism = (parserConfigs, clazz) -> {
       if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) {
         return Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
       }
-      return Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1);
+      int retValue = 1;
+      for (SensorParserConfig config : parserConfigs) {
+        Integer configValue = config.getErrorWriterParallelism();
+        retValue = configValue == null ? retValue : configValue;
+      }
+      return retValue;
     };
 
     // error bolt number of tasks
-    ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> {
+    ValueSupplier<Integer> errorNumTasks = (parserConfigs, clazz) -> {
       if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) {
         return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
       }
-      return Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1);
+      int retValue = 1;
+      for (SensorParserConfig config : parserConfigs) {
+        Integer configValue = config.getErrorWriterNumTasks();
+        retValue = configValue == null ? retValue : configValue;
+      }
+      return retValue;
     };
 
     // kafka spout config
-    ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> {
+    ValueSupplier<List> spoutConfig = (parserConfigs, clazz) -> {
       if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
-        return readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
+        return Collections.singletonList(readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd))));
+      }
+      List<Map<String, Object>> retValue = new ArrayList<>();
+      for (SensorParserConfig config : parserConfigs) {
+        retValue.add(config.getSpoutConfig());
       }
-      return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new HashMap<>());
+      return retValue;
     };
 
     // security protocol
-    ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> {
+    ValueSupplier<String> securityProtocol = (parserConfigs, clazz) -> {
       Optional<String> sp = Optional.empty();
       if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) {
         sp = Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd));
       }
+      // Need to adjust to handle list of spoutConfigs. Any non-plaintext wins
       if (!sp.isPresent()) {
-        sp = getSecurityProtocol(sp, spoutConfig.get(parserConfig, Map.class));
+        sp = getSecurityProtocol(sp, spoutConfig.get(parserConfigs, List.class));
+      }
+      // Need to look through parserConfigs for any non-plaintext
+      String parserConfigSp = SecurityProtocol.PLAINTEXT.name;
+      for (SensorParserConfig config : parserConfigs) {
+        String configSp = config.getSecurityProtocol();
+        if (!SecurityProtocol.PLAINTEXT.name.equals(configSp)) {
+          // We have a winner
+          parserConfigSp = configSp;
+        }
       }
-      return sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null));
+
+      return sp.orElse(Optional.ofNullable(parserConfigSp).orElse(null));
     };
 
     // storm configuration
-    ValueSupplier<Config> stormConf = (parserConfig, clazz) -> {
-      Map<String, Object> c = parserConfig.getStormConfig();
+    ValueSupplier<Config> stormConf = (parserConfigs, clazz) -> {
+      // Last one wins
       Config finalConfig = new Config();
-      if(c != null && !c.isEmpty()) {
-        finalConfig.putAll(c);
-      }
-      if(parserConfig.getNumAckers() != null) {
-        Config.setNumAckers(finalConfig, parserConfig.getNumAckers());
-      }
-      if(parserConfig.getNumWorkers() != null) {
-        Config.setNumWorkers(finalConfig, parserConfig.getNumWorkers());
+      for (SensorParserConfig parserConfig : parserConfigs) {
+        Map<String, Object> c = parserConfig.getStormConfig();
+        if (c != null && !c.isEmpty()) {
+          finalConfig.putAll(c);
+        }
+        if (parserConfig.getNumAckers() != null) {
+          Config.setNumAckers(finalConfig, parserConfig.getNumAckers());
+        }
+        if (parserConfig.getNumWorkers() != null) {
+          Config.setNumWorkers(finalConfig, parserConfig.getNumWorkers());
+        }
       }
       return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig);
     };
 
     // output topic
-    ValueSupplier<String> outputTopic = (parserConfig, clazz) -> {
-      String topic;
+    ValueSupplier<String> outputTopic = (parserConfigs, clazz) -> {
+      String topic = null;
 
       if(ParserOptions.OUTPUT_TOPIC.has(cmd)) {
         topic = ParserOptions.OUTPUT_TOPIC.get(cmd);
-
-      } else if(parserConfig.getOutputTopic() != null) {
-        topic = parserConfig.getOutputTopic();
-
-      } else {
-        topic = Constants.ENRICHMENT_TOPIC;
       }
 
       return topic;
     };
 
-    // error topic
-    ValueSupplier<String> errorTopic = (parserConfig, clazz) -> {
-      String topic;
-
-      if(parserConfig.getErrorTopic() != null) {
-        topic = parserConfig.getErrorTopic();
-
-      } else {
-        // topic will to set to the 'parser.error.topic' setting in globals when the error bolt is created
-        topic = null;
+    // Error topic will throw an exception if the topics aren't all the same.
+    ValueSupplier<String> errorTopic = (parserConfigs, clazz) -> {
+      // topic will to set to the 'parser.error.topic' setting in globals when the error bolt is created
+      String topic = null;
+      for (SensorParserConfig parserConfig : parserConfigs) {
+        String currentTopic = parserConfig.getErrorTopic();
+        if(topic != null && !topic.equals(currentTopic)) {
+          throw new IllegalArgumentException(
+              "Parser Aggregation specified with differing error topics");
+        }
+        topic = currentTopic;
       }
 
       return topic;
@@ -446,7 +540,7 @@ public class ParserTopologyCLI {
     return getParserTopology(
             zookeeperUrl,
             brokerUrl,
-            sensorType,
+            sensorTypes,
             spoutParallelism,
             spoutNumTasks,
             parserParallelism,
@@ -462,14 +556,14 @@ public class ParserTopologyCLI {
 
   protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl,
                                                                     Optional<String> brokerUrl,
-                                                                    String sensorType,
-                                                                    ValueSupplier<Integer> spoutParallelism,
-                                                                    ValueSupplier<Integer> spoutNumTasks,
+                                                                    List<String> sensorTypes,
+                                                                    ValueSupplier<List> spoutParallelism,
+                                                                    ValueSupplier<List> spoutNumTasks,
                                                                     ValueSupplier<Integer> parserParallelism,
                                                                     ValueSupplier<Integer> parserNumTasks,
                                                                     ValueSupplier<Integer> errorParallelism,
                                                                     ValueSupplier<Integer> errorNumTasks,
-                                                                    ValueSupplier<Map> spoutConfig,
+                                                                    ValueSupplier<List> spoutConfig,
                                                                     ValueSupplier<String> securityProtocol,
                                                                     ValueSupplier<Config> stormConf,
                                                                     ValueSupplier<String> outputTopic,
@@ -477,7 +571,7 @@ public class ParserTopologyCLI {
     return ParserTopologyBuilder.build(
             zookeeperUrl,
             brokerUrl,
-            sensorType,
+            sensorTypes,
             spoutParallelism,
             spoutNumTasks,
             parserParallelism,
@@ -505,15 +599,15 @@ public class ParserTopologyCLI {
       }
       ParserTopologyCLI cli = new ParserTopologyCLI();
       ParserTopologyBuilder.ParserTopology topology = cli.createParserTopology(cmd);
-      String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
+      String sensorTypes = ParserOptions.SENSOR_TYPES.get(cmd);
       if (ParserOptions.TEST.has(cmd)) {
         topology.getTopologyConfig().put(Config.TOPOLOGY_DEBUG, true);
         LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(sensorType, topology.getTopologyConfig(), topology.getBuilder().createTopology());
+        cluster.submitTopology(sensorTypes.replaceAll(",", STORM_JOB_SEPARATOR), topology.getTopologyConfig(), topology.getBuilder().createTopology());
         Utils.sleep(300000);
         cluster.shutdown();
       } else {
-        StormSubmitter.submitTopology(sensorType, topology.getTopologyConfig(), topology.getBuilder().createTopology());
+        StormSubmitter.submitTopology(sensorTypes.replaceAll(",", STORM_JOB_SEPARATOR), topology.getTopologyConfig(), topology.getBuilder().createTopology());
       }
     } catch (Exception e) {
       e.printStackTrace();
@@ -521,15 +615,27 @@ public class ParserTopologyCLI {
     }
   }
 
-  private static Optional<String> getSecurityProtocol(Optional<String> protocol, Map<String, Object> spoutConfig) {
+  private static Optional<String> getSecurityProtocol(Optional<String> protocol, List<Map<String, Object>> spoutConfig) {
     Optional<String> ret = protocol;
-    if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) {
+    if(ret.isPresent() && protocol.get().equalsIgnoreCase(SecurityProtocol.PLAINTEXT.name)) {
       ret = Optional.empty();
     }
     if(!ret.isPresent()) {
-      ret = Optional.ofNullable((String) spoutConfig.get("security.protocol"));
+      // Need to look through spoutConfig for any non-plaintext
+      String spoutConfigSp = null;
+      for (Map<String, Object> config: spoutConfig) {
+        String configSp = (String) config.get(KafkaUtils.SECURITY_PROTOCOL);
+        if (configSp != null && !SecurityProtocol.PLAINTEXT.name.equals(configSp)) {
+          // We have a winner
+          spoutConfigSp = configSp;
+        } else if (configSp != null) {
+          // Use something explicitly defined.
+          spoutConfigSp = configSp;
+        }
+      }
+      ret = Optional.ofNullable(spoutConfigSp);
     }
-    if(ret.isPresent() && ret.get().equalsIgnoreCase("PLAINTEXT")) {
+    if(ret.isPresent() && ret.get().equalsIgnoreCase(SecurityProtocol.PLAINTEXT.name)) {
       ret = Optional.empty();
     }
     return ret;

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
index 0ede0f8..98aca7b 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.parsers.topology.config;
 
+import java.util.Collection;
 import org.apache.metron.common.configuration.SensorParserConfig;
 
 
@@ -26,5 +27,5 @@ import org.apache.metron.common.configuration.SensorParserConfig;
  * @param <T>
  */
 public interface ValueSupplier<T> {
-  T get(SensorParserConfig config, Class<T> clazz);
+  T get(Collection<SensorParserConfig> config, Class<T> clazz);
 }


Mime
View raw message