metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject incubator-metron git commit: METRON-294 Refactor parser topology builder (nickwallen via cestella) closes apache/incubator-metron#185
Date Fri, 15 Jul 2016 17:12:08 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master e50c0f377 -> 1c568bf04


METRON-294 Refactor parser topology builder (nickwallen via cestella) closes apache/incubator-metron#185


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/1c568bf0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/1c568bf0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/1c568bf0

Branch: refs/heads/master
Commit: 1c568bf046a85fee1dc170a96bfe98ca79fb085f
Parents: e50c0f3
Author: nickwallen <nick@nickallen.org>
Authored: Fri Jul 15 13:11:59 2016 -0400
Committer: cstella <cestella@gmail.com>
Committed: Fri Jul 15 13:11:59 2016 -0400

----------------------------------------------------------------------
 .../parsers/topology/ParserTopologyBuilder.java | 271 +++++++++++++------
 1 file changed, 187 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1c568bf0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 1ad82c5..d9004d1 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -40,106 +40,209 @@ import storm.kafka.KafkaSpout;
 import storm.kafka.ZkHosts;
 
 import java.util.EnumMap;
-import java.util.Map;
 
+/**
+ * Builds a Storm topology that parses telemetry data received from a sensor.
+ */
 public class ParserTopologyBuilder {
 
+  /**
+   * Builds a Storm topology that parses telemetry data received from an external sensor.
+   *
+   * @param zookeeperUrl             Zookeeper URL
+   * @param brokerUrl                Kafka Broker URL
+   * @param sensorType               Type of sensor
+   * @param offset                   Kafka topic offset where the topology will start; BEGINNING,
END, WHERE_I_LEFT_OFF
+   * @param spoutParallelism         Parallelism hint for the spout
+   * @param spoutNumTasks            Number of tasks for the spout
+   * @param parserParallelism        Parallelism hint for the parser bolt
+   * @param parserNumTasks           Number of tasks for the parser bolt
+   * @param invalidWriterParallelism Parallelism hint for the bolt that handles invalid data
+   * @param invalidWriterNumTasks    Number of tasks for the bolt that handles invalid data
+   * @param errorWriterParallelism   Parallelism hint for the bolt that handles errors
+   * @param errorWriterNumTasks      Number of tasks for the bolt that handles errors
+   * @param kafkaSpoutConfig         Configuration options for the kafka spout
+   * @return A Storm topology that parses telemetry data received from an external sensor
+   * @throws Exception
+   */
   public static TopologyBuilder build(String zookeeperUrl,
-                         String brokerUrl,
-                         String sensorType,
-                         SpoutConfig.Offset offset,
-                         int spoutParallelism,
-                         int spoutNumTasks,
-                         int parserParallelism,
-                         int parserNumTasks,
-                         int invalidWriterParallelism,
-                         int invalidWriterNumTasks,
-                         int errorWriterParallelism,
-                         int errorWriterNumTasks,
-                         EnumMap<SpoutConfigOptions, Object> kafkaSpoutConfigOptions
-                                     ) throws Exception {
-    CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
-    client.start();
-    ParserConfigurations configurations = new ParserConfigurations();
-    ConfigurationsUtils.updateParserConfigsFromZookeeper(configurations, client);
-    SensorParserConfig sensorParserConfig = configurations.getSensorParserConfig(sensorType);
-    if(sensorParserConfig == null) {
-      throw new IllegalStateException("Cannot find the parser configuration in zookeeper
for " + sensorType + "." +
-              "  Please check that it exists in zookeeper by using the zk_load_configs.sh
-m DUMP command.");
-    }
-    client.close();
-    String sensorTopic = sensorParserConfig.getSensorTopic() != null ? sensorParserConfig.getSensorTopic()
: sensorType;
+                                      String brokerUrl,
+                                      String sensorType,
+                                      SpoutConfig.Offset offset,
+                                      int spoutParallelism,
+                                      int spoutNumTasks,
+                                      int parserParallelism,
+                                      int parserNumTasks,
+                                      int invalidWriterParallelism,
+                                      int invalidWriterNumTasks,
+                                      int errorWriterParallelism,
+                                      int errorWriterNumTasks,
+                                      EnumMap<SpoutConfigOptions, Object> kafkaSpoutConfig
+  ) throws Exception {
+
+    // fetch configuration from zookeeper
+    ParserConfigurations configs = new ParserConfigurations();
+    SensorParserConfig parserConfig = getSensorParserConfig(zookeeperUrl, sensorType, configs);
+
+    // create the spout
     TopologyBuilder builder = new TopologyBuilder();
-    ZkHosts zkHosts = new ZkHosts(zookeeperUrl);
-    SpoutConfig spoutConfig = new SpoutConfig(zkHosts, sensorTopic, "", sensorTopic).from(offset);
-    SpoutConfigOptions.configure(spoutConfig, kafkaSpoutConfigOptions);
-    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, offset, kafkaSpoutConfig,
parserConfig);
     builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
-           .setNumTasks(spoutNumTasks);
-    MessageParser<JSONObject> parser = ReflectionUtils.createInstance(sensorParserConfig.getParserClassName());
-    parser.configure(sensorParserConfig.getParserConfig());
-
-    ParserBolt parserBolt = new ParserBolt(zookeeperUrl
-                                          , sensorType
-                                          , parser
-                                          ,getHandler( sensorType
-                                                     , configurations
-                                                     , sensorParserConfig.getWriterClassName()
== null
-                                                     ? new KafkaWriter(brokerUrl).withTopic(Constants.ENRICHMENT_TOPIC)
-                                                     :ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName())
-                                                     )
-                                          );
-
-    WriterBolt errorBolt = new WriterBolt(getHandler( sensorType
-                                                    , configurations
-                                                    , sensorParserConfig.getErrorWriterClassName()
== null
-                                                    ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_ERROR_TOPIC)
-                                                                                .withConfigPrefix("error")
-                                                    :ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName())
-                                                    )
-                                         , configurations
-                                         , sensorType
-                                         );
-    WriterBolt invalidBolt = new WriterBolt(getHandler( sensorType
-                                                    , configurations
-                                                    , sensorParserConfig.getErrorWriterClassName()
== null
-                                                    ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_INVALID_TOPIC)
-                                                                                .withConfigPrefix("invalid")
-                                                    :ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName())
-                                                    )
-                                         , configurations
-                                         , sensorType
-                                         );
+            .setNumTasks(spoutNumTasks);
 
+    // create the parser bolt
+    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, configs,
parserConfig);
     builder.setBolt("parserBolt", parserBolt, parserParallelism)
-           .setNumTasks(parserNumTasks)
-           .shuffleGrouping("kafkaSpout");
-    if(errorWriterNumTasks > 0) {
+            .setNumTasks(parserNumTasks)
+            .shuffleGrouping("kafkaSpout");
+
+    // create the error bolt, if needed
+    if (errorWriterNumTasks > 0) {
+      WriterBolt errorBolt = createErrorBolt(brokerUrl, sensorType, configs, parserConfig);
       builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
               .setNumTasks(errorWriterNumTasks)
-              .shuffleGrouping("parserBolt", Constants.ERROR_STREAM)
-      ;
+              .shuffleGrouping("parserBolt", Constants.ERROR_STREAM);
     }
-    if(invalidWriterNumTasks > 0) {
+
+    // create the invalid bolt, if needed
+    if (invalidWriterNumTasks > 0) {
+      WriterBolt invalidBolt = createInvalidBolt(brokerUrl, sensorType, configs, parserConfig);
       builder.setBolt("invalidMessageWriter", invalidBolt, invalidWriterParallelism)
               .setNumTasks(invalidWriterNumTasks)
-              .shuffleGrouping("parserBolt", Constants.INVALID_STREAM)
-      ;
+              .shuffleGrouping("parserBolt", Constants.INVALID_STREAM);
     }
+
     return builder;
   }
 
-  private static WriterHandler getHandler(String sensorType, ParserConfigurations configurations,
AbstractWriter writer) {
-    writer.configure(sensorType, new ParserWriterConfiguration(configurations));
-    if(writer instanceof BulkMessageWriter) {
-      return new WriterHandler((BulkMessageWriter<JSONObject>)writer);
-    }
-    else if(writer instanceof MessageWriter) {
-      return new WriterHandler((MessageWriter<JSONObject>)writer);
+  /**
+   * Create a spout that consumes tuples from a Kafka topic.
+   *
+   * @param zookeeperUrl            Zookeeper URL
+   * @param sensorType              Type of sensor
+   * @param offset                  Kafka topic offset where the topology will start; BEGINNING,
END, WHERE_I_LEFT_OFF
+   * @param kafkaSpoutConfigOptions Configuration options for the kafka spout
+   * @param parserConfig            Configuration for the parser
+   * @return
+   */
+  private static KafkaSpout createKafkaSpout(String zookeeperUrl, String sensorType, SpoutConfig.Offset
offset, EnumMap<SpoutConfigOptions, Object> kafkaSpoutConfigOptions, SensorParserConfig
parserConfig) {
+
+    String inputTopic = parserConfig.getSensorTopic() != null ? parserConfig.getSensorTopic()
: sensorType;
+    SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeeperUrl), inputTopic, "",
inputTopic).from(offset);
+    SpoutConfigOptions.configure(spoutConfig, kafkaSpoutConfigOptions);
+    return new KafkaSpout(spoutConfig);
+  }
+
+  /**
+   * Create a bolt that parses input from a sensor.
+   *
+   * @param zookeeperUrl Zookeeper URL
+   * @param brokerUrl    Kafka Broker URL
+   * @param sensorType   Type of sensor that is being consumed.
+   * @param configs
+   * @param parserConfig
+   * @return A Storm bolt that parses input from a sensor
+   */
+  private static ParserBolt createParserBolt(String zookeeperUrl, String brokerUrl, String
sensorType, ParserConfigurations configs, SensorParserConfig parserConfig) {
+
+    // create message parser
+    MessageParser<JSONObject> parser = ReflectionUtils.createInstance(parserConfig.getParserClassName());
+    parser.configure(parserConfig.getParserConfig());
+
+    // create writer - if not configured uses a sensible default
+    AbstractWriter writer = parserConfig.getWriterClassName() == null ?
+            new KafkaWriter(brokerUrl).withTopic(Constants.ENRICHMENT_TOPIC) :
+            ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    writer.configure(sensorType, new ParserWriterConfiguration(configs));
+
+    // create a writer handler
+    WriterHandler writerHandler = createWriterHandler(writer);
+
+    return new ParserBolt(zookeeperUrl, sensorType, parser, writerHandler);
+  }
+
+  /**
+   * Create a bolt that handles invalid messages.
+   *
+   * @param brokerUrl    The Kafka Broker URL
+   * @param sensorType   Type of sensor that is being consumed.
+   * @param configs
+   * @param parserConfig
+   * @return A Storm bolt that handles invalid messages.
+   */
+  private static WriterBolt createInvalidBolt(String brokerUrl, String sensorType, ParserConfigurations
configs, SensorParserConfig parserConfig) {
+
+    // create writer - if not configured uses a sensible default
+    AbstractWriter writer = parserConfig.getErrorWriterClassName() == null
+            ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_INVALID_TOPIC).withConfigPrefix("invalid")
+            : ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    writer.configure(sensorType, new ParserWriterConfiguration(configs));
+
+    // create a writer handler
+    WriterHandler writerHandler = createWriterHandler(writer);
+
+    return new WriterBolt(writerHandler, configs, sensorType);
+  }
+
+  /**
+   * Create a bolt that handles error messages.
+   *
+   * @param brokerUrl    Kafka Broker URL
+   * @param sensorType   Type of sensor that is being consumed.
+   * @param configs
+   * @param parserConfig
+   * @return A Storm bolt that handles error messages.
+   */
+  private static WriterBolt createErrorBolt(String brokerUrl, String sensorType, ParserConfigurations
configs, SensorParserConfig parserConfig) {
+
+    // create writer - if not configured uses a sensible default
+    AbstractWriter writer = parserConfig.getErrorWriterClassName() == null
+            ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_ERROR_TOPIC).withConfigPrefix("error")
+            : ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    writer.configure(sensorType, new ParserWriterConfiguration(configs));
+
+    // create a writer handler
+    WriterHandler writerHandler = createWriterHandler(writer);
+
+    return new WriterBolt(writerHandler, configs, sensorType);
+  }
+
+  /**
+   * Fetch the parser configuration from Zookeeper.
+   *
+   * @param zookeeperUrl Zookeeper URL
+   * @param sensorType   Type of sensor
+   * @param configs
+   * @return
+   * @throws Exception
+   */
+  private static SensorParserConfig getSensorParserConfig(String zookeeperUrl, String sensorType,
ParserConfigurations configs) throws Exception {
+    CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
+    client.start();
+    ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client);
+    SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType);
+    if (parserConfig == null) {
+      throw new IllegalStateException("Cannot find the parser configuration in zookeeper
for " + sensorType + "." +
+              "  Please check that it exists in zookeeper by using the 'zk_load_configs.sh
-m DUMP' command.");
     }
-    else {
+    client.close();
+    return parserConfig;
+  }
+
+  /**
+   * Creates a WriterHandler
+   *
+   * @param writer The writer.
+   * @return A WriterHandler
+   */
+  private static WriterHandler createWriterHandler(AbstractWriter writer) {
+
+    if (writer instanceof BulkMessageWriter) {
+      return new WriterHandler((BulkMessageWriter<JSONObject>) writer);
+    } else if (writer instanceof MessageWriter) {
+      return new WriterHandler((MessageWriter<JSONObject>) writer);
+    } else {
       throw new IllegalStateException("Unable to create parser bolt: writer must be a MessageWriter
or a BulkMessageWriter");
     }
   }
-
 }


Mime
View raw message