metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [6/9] incubator-metron git commit: METRON-56 Create unified enrichment topology (merrimanr via cestella) closes apache/incubator-metron#33
Date Tue, 01 Mar 2016 19:30:01 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java
new file mode 100644
index 0000000..83ecd42
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.components;
+
+
+import com.google.common.base.Function;
+import kafka.Kafka;
+import kafka.admin.AdminUtils;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.*;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+
+public class KafkaWithZKComponent implements InMemoryComponent {
+
+
+  public static class Topic {
+    public int numPartitions;
+    public String name;
+
+    public Topic(String name, int numPartitions) {
+      this.numPartitions = numPartitions;
+      this.name = name;
+    }
+  }
+  private transient KafkaServer kafkaServer;
+  private transient EmbeddedZookeeper zkServer;
+  private transient ZkClient zkClient;
+  private transient ConsumerConnector consumer;
+  private String zookeeperConnectString;
+  private int brokerPort = 6667;
+  private List<Topic> topics = Collections.emptyList();
+  private Function<KafkaWithZKComponent, Void> postStartCallback;
+
+  public KafkaWithZKComponent withPostStartCallback(Function<KafkaWithZKComponent, Void> f) {
+    postStartCallback = f;
+    return this;
+  }
+
+  public KafkaWithZKComponent withExistingZookeeper(String zookeeperConnectString) {
+    this.zookeeperConnectString = zookeeperConnectString;
+    return this;
+  }
+
+  public KafkaWithZKComponent withBrokerPort(int brokerPort) {
+    if(brokerPort <= 0)
+    {
+      brokerPort = TestUtils.choosePort();
+    }
+    this.brokerPort = brokerPort;
+    return this;
+  }
+
+  public KafkaWithZKComponent withTopics(List<Topic> topics) {
+    this.topics = topics;
+    return this;
+  }
+
+  public List<Topic> getTopics() {
+    return topics;
+  }
+
+  public int getBrokerPort() {
+    return brokerPort;
+  }
+
+
+  public String getBrokerList()  {
+    return "localhost:" + brokerPort;
+  }
+
+  public KafkaProducer<String, byte[]> createProducer()
+  {
+    return createProducer(new HashMap<String, Object>());
+  }
+
+  public KafkaProducer<String, byte[]> createProducer(Map<String, Object> properties)
+  {
+    Map<String, Object> producerConfig = new HashMap<>();
+    producerConfig.put("bootstrap.servers", getBrokerList());
+    producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+    producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+    producerConfig.put("request.required.acks", "-1");
+    producerConfig.put("fetch.message.max.bytes", ""+ 1024*1024*10);
+    producerConfig.put("replica.fetch.max.bytes", "" + 1024*1024*10);
+    producerConfig.put("message.max.bytes", "" + 1024*1024*10);
+    producerConfig.put("message.send.max.retries", "10");
+    producerConfig.putAll(properties);
+    return new KafkaProducer<>(producerConfig);
+  }
+
+  @Override
+  public void start() {
+    // setup Zookeeper
+    if(zookeeperConnectString == null) {
+      String zkConnect = TestZKUtils.zookeeperConnect();
+      zkServer = new EmbeddedZookeeper(zkConnect);
+      zookeeperConnectString = zkServer.connectString();
+    }
+    zkClient = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+
+    // setup Broker
+    Properties props = TestUtils.createBrokerConfig(0, brokerPort, true);
+    KafkaConfig config = new KafkaConfig(props);
+    Time mock = new MockTime();
+    kafkaServer = TestUtils.createServer(config, mock);
+    for(Topic topic : getTopics()) {
+      try {
+        createTopic(topic.name, topic.numPartitions, true);
+      } catch (InterruptedException e) {
+        throw new RuntimeException("Unable to create topic", e);
+      }
+    }
+    postStartCallback.apply(this);
+  }
+
+  public String getZookeeperConnect() {
+    return zookeeperConnectString;
+  }
+
+  @Override
+  public void stop() {
+    kafkaServer.shutdown();
+    zkClient.close();
+    if(zkServer != null) {
+      zkServer.shutdown();
+    }
+
+  }
+
+  public List<byte[]> readMessages(String topic) {
+    SimpleConsumer consumer = new SimpleConsumer("localhost", 6667, 100000, 64 * 1024, "consumer");
+    FetchRequest req = new FetchRequestBuilder()
+            .clientId("consumer")
+            .addFetch(topic, 0, 0, 100000)
+            .build();
+    FetchResponse fetchResponse = consumer.fetch(req);
+    Iterator<MessageAndOffset> results = fetchResponse.messageSet(topic, 0).iterator();
+    List<byte[]> messages = new ArrayList<>();
+    while(results.hasNext()) {
+      ByteBuffer payload = results.next().message().payload();
+      byte[] bytes = new byte[payload.limit()];
+      payload.get(bytes);
+      messages.add(bytes);
+    }
+    return messages;
+  }
+
+  public ConsumerIterator<byte[], byte[]> getStreamIterator(String topic) {
+    return getStreamIterator(topic, "group0", "consumer0");
+  }
+  public ConsumerIterator<byte[], byte[]> getStreamIterator(String topic, String group, String consumerName) {
+    // setup simple consumer
+    Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), group, consumerName, -1);
+    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
+    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+    topicCountMap.put(topic, 1);
+    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
+    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
+    return iterator;
+  }
+
+  public void shutdownConsumer() {
+    consumer.shutdown();
+  }
+
+  public void createTopic(String name) throws InterruptedException {
+    createTopic(name, 1, true);
+  }
+
+  public void waitUntilMetadataIsPropagated(String topic, int numPartitions) {
+    List<KafkaServer> servers = new ArrayList<>();
+    servers.add(kafkaServer);
+    for(int part = 0;part < numPartitions;++part) {
+      TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, part, 5000);
+    }
+  }
+
+  public void createTopic(String name, int numPartitions, boolean waitUntilMetadataIsPropagated) throws InterruptedException {
+    AdminUtils.createTopic(zkClient, name, numPartitions, 1, new Properties());
+    if(waitUntilMetadataIsPropagated) {
+      waitUntilMetadataIsPropagated(name, numPartitions);
+    }
+  }
+
+  public void writeMessages(String topic, List<byte[]> messages) {
+    KafkaProducer<String, byte[]> kafkaProducer = createProducer();
+    for(byte[] message: messages) {
+      kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, message));
+    }
+    kafkaProducer.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/KafkaUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/KafkaUtil.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/KafkaUtil.java
new file mode 100644
index 0000000..bf2ef4f
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/KafkaUtil.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.util;
+
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaUtil {
+  public static <K,V> void send(Producer<K,V> producer, K key, V value, String topic) {
+    producer.send(new KeyedMessage<>(topic, key,value));
+  }
+
+  public static <K,V> void send(Producer<K,V> producer, Iterable<Map.Entry<K,V>> messages, String topic) {
+    for(Map.Entry<K,V> kv : messages) {
+      send(producer, kv.getKey(), kv.getValue(), topic);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/KafkaLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/KafkaLoader.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/KafkaLoader.java
new file mode 100644
index 0000000..4f53e5a
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/KafkaLoader.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.flux.Flux;
+import storm.kafka.SpoutConfig;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaLoader {
+
+  private String brokerUrl;
+  private String topic;
+  private String samplePath;
+  private int delay = 1000;
+  private int iterations = -1;
+  private KafkaProducer kafkaProducer;
+
+  public KafkaLoader(String brokerUrl, String topic, String samplePath) {
+    this.brokerUrl = brokerUrl;
+    this.topic = topic;
+    this.samplePath = samplePath;
+  }
+
+  public KafkaLoader withDelay(int delay) {
+    this.delay = delay;
+    return this;
+  }
+
+  public KafkaLoader withIterations(int iterations) {
+    this.iterations = iterations;
+    return this;
+  }
+
+  public void start() {
+    Map<String, Object> producerConfig = new HashMap<>();
+    producerConfig.put("bootstrap.servers", brokerUrl);
+    producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    kafkaProducer = new KafkaProducer<>(producerConfig);
+    try {
+      while (iterations == -1 || iterations-- > 0) {
+        BufferedReader reader = new BufferedReader(new FileReader(samplePath));
+        String line;
+        while((line = reader.readLine()) != null) {
+          kafkaProducer.send(new ProducerRecord<String, String>(topic, line));
+          Thread.sleep(delay);
+        }
+        reader.close();
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void stop() {
+    kafkaProducer.close();
+  }
+
+
+  public static void main(String[] args) {
+    KafkaLoader kafkaLoader = new KafkaLoader(args[0], args[1], args[2]);
+    if (args.length > 3) kafkaLoader.withDelay(Integer.parseInt(args[3]));
+    if (args.length > 4) kafkaLoader.withIterations(Integer.parseInt(args[4]));
+    kafkaLoader.start();
+    kafkaLoader.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
new file mode 100644
index 0000000..ef8b2e2
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.SourceConfig;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class SourceConfigUtils {
+
+  public static CuratorFramework getClient(String zookeeperUrl) {
+    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+  }
+
+  public static void writeToZookeeperFromFile(String sourceName, String filePath, String zookeeperUrl) throws Exception {
+    writeToZookeeper(sourceName, Files.readAllBytes(Paths.get(filePath)), zookeeperUrl);
+  }
+
+  public static void writeToZookeeper(String sourceName, byte[] configData, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      client.setData().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + sourceName, configData);
+    } catch(KeeperException.NoNodeException e) {
+      client.create().creatingParentsIfNeeded().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + sourceName, configData);
+    }
+    client.close();
+  }
+
+  public static byte[] readConfigBytesFromZookeeper(String sourceName, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    byte[] data = client.getData().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + sourceName);
+    client.close();
+    return data;
+  }
+
+  public static SourceConfig readConfigFromZookeeper(String sourceName, String zookeeperUrl) throws Exception {
+    byte[] data = readConfigBytesFromZookeeper(sourceName, zookeeperUrl);
+    return SourceConfig.load(new ByteArrayInputStream(data));
+  }
+
+  public static void dumpConfigs(String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    List<String> children = client.getChildren().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+    for(String child: children) {
+      byte[] data = client.getData().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + child);
+      System.out.println("Config for source " + child);
+      System.out.println(new String(data));
+      System.out.println();
+    }
+    client.close();
+  }
+
+  public static void main(String[] args) {
+    try {
+      File root = new File("./metron-streaming/Metron-Common/src/test/resources/config/source/");
+      for(File child: root.listFiles()) {
+        writeToZookeeperFromFile(child.getName().replaceFirst("-config.json", ""), child.getPath(), "node1:2181");
+      }
+      SourceConfigUtils.dumpConfigs("node1:2181");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
deleted file mode 100644
index 7473b01..0000000
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
+++ /dev/null
@@ -1,401 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "asa-local"
-config:
-    topology.workers: 1
-
-components:
-    -   id: "parser"
-        className: "org.apache.metron.parsing.parsers.GrokAsaParser"
-    -   id: "jdbcConfig"
-        className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
-        properties:
-            -   name: "host"
-                value: "${mysql.ip}"
-            -   name: "port"
-                value: ${mysql.port}
-            -   name: "username"
-                value: "${mysql.username}"
-            -   name: "password"
-                value: "${mysql.password}"
-            -   name: "table"
-                value: "GEO"
-    -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
-        configMethods:
-            -   name: "withJdbcConfig"
-                args:
-                    - ref: "jdbcConfig"
-    -   id: "geoEnrichment"
-        className: "org.apache.metron.domain.Enrichment"
-        properties:
-            -   name: "name"
-                value:  "geo"
-            -   name: "fields"
-                value: ["ip_src_addr", "ip_dst_addr"]
-            -   name: "adapter"
-                ref: "geoEnrichmentAdapter"
-    -   id: "hostEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
-        constructorArgs:
-            - '${org.apache.metron.enrichment.host.known_hosts}'
-    -   id: "hostEnrichment"
-        className: "org.apache.metron.domain.Enrichment"
-        properties:
-            -   name: "name"
-                value:  "host"
-            -   name: "fields"
-                value: ["ip_src_addr", "ip_dst_addr"]
-            -   name: "adapter"
-                ref: "hostEnrichmentAdapter"
-    -   id: "enrichments"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "hostEnrichment"
-    -   id: "indexAdapter"
-        className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
-    -   id: "alertsConfig"
-        className: "java.util.HashMap"
-        configMethods:
-            -   name: "put"
-                args: ["whitelist_table_name", "ip_whitelist"]
-            -   name: "put"
-                args: ["blacklist_table_name", "ip_blacklist"]
-            -   name: "put"
-                args: ["quorum", "mon.cluster2.ctolab.hortonworks.com, nn1.cluster2.ctolab.hortonworks.com, nn2.cluster2.ctolab.hortonworks.com"]
-            -   name: "put"
-                args: ["port", "2181"]
-            -   name: "put"
-                args: ["_MAX_CACHE_SIZE_OBJECTS_NUM", "3600"]
-            -   name: "put"
-                args: ["_MAX_TIME_RETAIN_MINUTES", "1000"]
-    -   id: "alertsAdapter"
-        className: "org.apache.metron.alerts.adapters.CIFAlertsAdapter"
-        constructorArgs:
-            - ref: "alertsConfig"
-    -   id: "alertsIdentifier"
-        className: "org.json.simple.JSONObject"
-        configMethods:
-            -   name: "put"
-                args: ["environment", "local"]
-            -   name: "put"
-                args: ["topology", "asa"]
-    -   id: "metricConfig"
-        className: "org.apache.commons.configuration.BaseConfiguration"
-        configMethods:
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.graphite"
-                    - "${org.apache.metron.metrics.reporter.graphite}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.console"
-                    - "${org.apache.metron.metrics.reporter.console}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.jmx"
-                    - "${org.apache.metron.metrics.reporter.jmx}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.address"
-                    - "${org.apache.metron.metrics.graphite.address}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.port"
-                    - "${org.apache.metron.metrics.graphite.port}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
-
-spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/AsaOutput"
-            -   name: "withRepeating"
-                args:
-                    - true
-
-bolts:
-    -   id: "parserBolt"
-        className: "org.apache.metron.bolt.TelemetryParserBolt"
-        configMethods:
-            -   name: "withMessageParser"
-                args:
-                    - ref: "parser"
-            -   name: "withEnrichments"
-                args:
-                    - ref: "enrichments"
-    -   id: "indexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "asa_index"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM.dd.hh"
-            -   name: "withDocumentName"
-                args:
-                    - "asa_doc"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "alertsBolt"
-        className: "org.apache.metron.alerts.TelemetryAlertsBolt"
-        configMethods:
-            -   name: "withIdentifier"
-                args:
-                    - ref: "alertsIdentifier"
-            -   name: "withMaxCacheSize"
-                args: [1000]
-            -   name: "withMaxTimeRetain"
-                args: [3600]
-            -   name: "withAlertsAdapter"
-                args:
-                    - ref: "alertsAdapter"
-            -   name: "withOutputFieldName"
-                args: ["message"]
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "alertsIndexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "alert"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM.ww"
-            -   name: "withDocumentName"
-                args:
-                    - "asa_alert"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "errorIndexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "error"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM"
-            -   name: "withDocumentName"
-                args:
-                    - "asa_error"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "hostEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "joinBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
-        configMethods:
-        -   name: "withEnrichments"
-            args:
-                - ref: "enrichments"
-        -   name: "withMaxCacheSize"
-            args: [10000]
-        -   name: "withMaxTimeRetain"
-            args: [10]
-
-streams:
-    -   name: "spout -> parser"
-        from: "testingSpout"
-        to: "parserBolt"
-        grouping:
-            type: SHUFFLE
-    -   name: "parser -> host"
-        from: "parserBolt"
-        to: "hostEnrichmentBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> geo"
-        from: "parserBolt"
-        to: "geoEnrichmentBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> join"
-        from: "parserBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "geo -> join"
-        from: "geoEnrichmentBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["key"]
-    -   name: "host -> join"
-        from: "hostEnrichmentBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-    -   name: "join -> alerts"
-        from: "joinBolt"
-        to: "alertsBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "alerts -> alertsIndexing"
-        from: "alertsBolt"
-        to: "alertsIndexingBolt"
-        grouping:
-            streamId: "message"
-            type: SHUFFLE
-    -   name: "join -> indexing"
-        from: "joinBolt"
-        to: "indexingBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> errors"
-        from: "parserBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE
-    -   name: "indexing -> errors"
-        from: "indexingBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE
-    -   name: "alerts -> errors"
-        from: "alertsBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
index 94694ab..78c68d5 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
@@ -18,146 +18,14 @@ name: "asa"
 config:
     topology.workers: 1
 
+
 components:
     -   id: "parser"
         className: "org.apache.metron.parsing.parsers.GrokAsaParser"
-    -   id: "jdbcConfig"
-        className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
-        properties:
-            -   name: "host"
-                value: "${mysql.ip}"
-            -   name: "port"
-                value: ${mysql.port}
-            -   name: "username"
-                value: "${mysql.username}"
-            -   name: "password"
-                value: "${mysql.password}"
-            -   name: "table"
-                value: "GEO"
-    -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
-        configMethods:
-            -   name: "withJdbcConfig"
-                args:
-                    - ref: "jdbcConfig"
-    -   id: "geoEnrichment"
-        className: "org.apache.metron.domain.Enrichment"
-        properties:
-            -   name: "name"
-                value:  "geo"
-            -   name: "fields"
-                value: ["ip_src_addr", "ip_dst_addr"]
-            -   name: "adapter"
-                ref: "geoEnrichmentAdapter"
-    -   id: "hostEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
-        constructorArgs:
-            - '${org.apache.metron.enrichment.host.known_hosts}'
-    -   id: "hostEnrichment"
-        className: "org.apache.metron.domain.Enrichment"
-        properties:
-            -   name: "name"
-                value:  "host"
-            -   name: "fields"
-                value: ["ip_src_addr", "ip_dst_addr"]
-            -   name: "adapter"
-                ref: "hostEnrichmentAdapter"
-    -   id: "enrichments"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "hostEnrichment"
-    -   id: "indexAdapter"
-        className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
-    -   id: "alertsConfig"
-        className: "java.util.HashMap"
-        configMethods:
-            -   name: "put"
-                args: ["whitelist_table_name", "ip_whitelist"]
-            -   name: "put"
-                args: ["blacklist_table_name", "ip_blacklist"]
-            -   name: "put"
-                args: ["quorum", "mon.cluster2.ctolab.hortonworks.com, nn1.cluster2.ctolab.hortonworks.com, nn2.cluster2.ctolab.hortonworks.com"]
-            -   name: "put"
-                args: ["port", "2181"]
-            -   name: "put"
-                args: ["_MAX_CACHE_SIZE_OBJECTS_NUM", "3600"]
-            -   name: "put"
-                args: ["_MAX_TIME_RETAIN_MINUTES", "1000"]
-    -   id: "alertsAdapter"
-        className: "org.apache.metron.alerts.adapters.CIFAlertsAdapter"
+    -   id: "writer"
+        className: "org.apache.metron.writer.KafkaWriter"
         constructorArgs:
-            - ref: "alertsConfig"
-    -   id: "alertsIdentifier"
-        className: "org.json.simple.JSONObject"
-        configMethods:
-            -   name: "put"
-                args: ["environment", "local"]
-            -   name: "put"
-                args: ["topology", "asa"]
-    -   id: "metricConfig"
-        className: "org.apache.commons.configuration.BaseConfiguration"
-        configMethods:
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.graphite"
-                    - "${org.apache.metron.metrics.reporter.graphite}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.console"
-                    - "${org.apache.metron.metrics.reporter.console}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.jmx"
-                    - "${org.apache.metron.metrics.reporter.jmx}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.address"
-                    - "${org.apache.metron.metrics.graphite.address}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.port"
-                    - "${org.apache.metron.metrics.graphite.port}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
+            - "${kafka.broker}"
     -   id: "zkHosts"
         className: "storm.kafka.ZkHosts"
         constructorArgs:
@@ -168,18 +36,30 @@ components:
             # zookeeper hosts
             - ref: "zkHosts"
             # topic name
-            - "${spout.kafka.topic.pcap}"
+            - "${spout.kafka.topic.asa}"
             # zk root
             - ""
             # id
-            - "${spout.kafka.topic.pcap}"
+            - "${spout.kafka.topic.asa}"
         properties:
-            -   name: "forceFromStart"
+            -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
                 value: -1
+            -   name: "socketTimeoutMs"
+                value: 1000000
 
 spouts:
+    -   id: "testingSpout"
+        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
+        parallelism: 1
+        configMethods:
+            -   name: "withFilename"
+                args:
+                    - "SampleInput/YafExampleOutput"
+            -   name: "withRepeating"
+                args:
+                    - true
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:
@@ -187,229 +67,16 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.bolt.TelemetryParserBolt"
-        configMethods:
-            -   name: "withMessageParser"
-                args:
-                    - ref: "parser"
-            -   name: "withEnrichments"
-                args:
-                    - ref: "enrichments"
-    -   id: "indexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "asa_index"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM.dd.hh"
-            -   name: "withDocumentName"
-                args:
-                    - "asa_doc"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "alertsBolt"
-        className: "org.apache.metron.alerts.TelemetryAlertsBolt"
-        configMethods:
-            -   name: "withIdentifier"
-                args:
-                    - ref: "alertsIdentifier"
-            -   name: "withMaxCacheSize"
-                args: [1000]
-            -   name: "withMaxTimeRetain"
-                args: [3600]
-            -   name: "withAlertsAdapter"
-                args:
-                    - ref: "alertsAdapter"
-            -   name: "withOutputFieldName"
-                args: ["message"]
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "alertsIndexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "alert"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM.ww"
-            -   name: "withDocumentName"
-                args:
-                    - "asa_alert"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "errorIndexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "error"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM"
-            -   name: "withDocumentName"
-                args:
-                    - "asa_error"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "hostEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "joinBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
-        configMethods:
-        -   name: "withEnrichments"
-            args:
-                - ref: "enrichments"
-        -   name: "withMaxCacheSize"
-            args: [10000]
-        -   name: "withMaxTimeRetain"
-            args: [10]
+        className: "org.apache.metron.bolt.ParserBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+            - "yaf"
+            - ref: "parser"
+            - ref: "writer"
 
 streams:
-    -   name: "spout -> parser"
+    -   name: "spout -> bolt"
         from: "kafkaSpout"
         to: "parserBolt"
         grouping:
             type: SHUFFLE
-    -   name: "parser -> host"
-        from: "parserBolt"
-        to: "hostEnrichmentBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> geo"
-        from: "parserBolt"
-        to: "geoEnrichmentBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> join"
-        from: "parserBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "geo -> join"
-        from: "geoEnrichmentBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["key"]
-    -   name: "host -> join"
-        from: "hostEnrichmentBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-    -   name: "join -> alerts"
-        from: "joinBolt"
-        to: "alertsBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "alerts -> alertsIndexing"
-        from: "alertsBolt"
-        to: "alertsIndexingBolt"
-        grouping:
-            streamId: "message"
-            type: SHUFFLE
-    -   name: "join -> indexing"
-        from: "joinBolt"
-        to: "indexingBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> errors"
-        from: "parserBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE
-    -   name: "indexing -> errors"
-        from: "indexingBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE
-    -   name: "alerts -> errors"
-        from: "alertsBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/test.yaml
new file mode 100644
index 0000000..9114d94
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/test.yaml
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "asa-test"
+config:
+    topology.workers: 1
+
+
+components:
+    -   id: "parser"
+        className: "org.apache.metron.parsing.parsers.GrokAsaParser"
+    -   id: "writer"
+        className: "org.apache.metron.writer.KafkaWriter"
+        constructorArgs:
+            - "${kafka.broker}"
+    -   id: "zkHosts"
+        className: "storm.kafka.ZkHosts"
+        constructorArgs:
+            - "${kafka.zk}"
+    -   id: "kafkaConfig"
+        className: "storm.kafka.SpoutConfig"
+        constructorArgs:
+            # zookeeper hosts
+            - ref: "zkHosts"
+            # topic name
+            - "${spout.kafka.topic.asa}"
+            # zk root
+            - ""
+            # id
+            - "${spout.kafka.topic.asa}"
+        properties:
+            -   name: "ignoreZkOffsets"
+                value: true
+            -   name: "startOffsetTime"
+                value: -2
+            -   name: "socketTimeoutMs"
+                value: 1000000
+
+spouts:
+    -   id: "testingSpout"
+        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
+        parallelism: 1
+        configMethods:
+            -   name: "withFilename"
+                args:
+                    - "SampleInput/YafExampleOutput"
+            -   name: "withRepeating"
+                args:
+                    - false
+    -   id: "kafkaSpout"
+        className: "storm.kafka.KafkaSpout"
+        constructorArgs:
+            - ref: "kafkaConfig"
+
+bolts:
+    -   id: "parserBolt"
+        className: "org.apache.metron.bolt.ParserBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+            - "yaf"
+            - ref: "parser"
+            - ref: "writer"
+
+streams:
+    -   name: "spout -> bolt"
+        from: "kafkaSpout"
+        to: "parserBolt"
+        grouping:
+            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
deleted file mode 100644
index 851f9d9..0000000
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
+++ /dev/null
@@ -1,192 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "bro-local"
-config:
-    topology.workers: 1
-
-components:
-    -   id: "broParser"
-        className: "org.apache.metron.parsing.parsers.BasicBroParser"
-    -   id: "genericMessageFilter"
-        className: "org.apache.metron.filters.GenericMessageFilter"
-    -   id: "indexAdapter"
-        className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
-    -   id: "metricConfig"
-        className: "org.apache.commons.configuration.BaseConfiguration"
-        configMethods:
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.graphite"
-                    - "${org.apache.metron.metrics.reporter.graphite}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.console"
-                    - "${org.apache.metron.metrics.reporter.console}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.jmx"
-                    - "${org.apache.metron.metrics.reporter.jmx}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.address"
-                    - "${org.apache.metron.metrics.graphite.address}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.port"
-                    - "${org.apache.metron.metrics.graphite.port}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
-
-spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withFilename"
-                args:
-                    - "SampleInput/BroExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - true
-
-bolts:
-    -   id: "parserBolt"
-        className: "org.apache.metron.bolt.TelemetryParserBolt"
-        configMethods:
-            -   name: "withMessageParser"
-                args:
-                    - ref: "broParser"
-    -   id: "indexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "bro_index"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM.dd.hh"
-            -   name: "withDocumentName"
-                args:
-                    - "bro_doc"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "errorIndexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "error"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM"
-            -   name: "withDocumentName"
-                args:
-                    - "bro_error"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-
-streams:
-    -   name: "spout -> parser"
-        from: "testingSpout"
-        to: "parserBolt"
-        grouping:
-            type: SHUFFLE
-    -   name: "parser -> indexing"
-        from: "parserBolt"
-        to: "indexingBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> errors"
-        from: "parserBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE
-    -   name: "indexing -> errors"
-        from: "indexingBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
index 96d836e..fb594b5 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
@@ -18,72 +18,14 @@ name: "bro"
 config:
     topology.workers: 1
 
+
 components:
-    -   id: "broParser"
+    -   id: "parser"
         className: "org.apache.metron.parsing.parsers.BasicBroParser"
-    -   id: "genericMessageFilter"
-        className: "org.apache.metron.filters.GenericMessageFilter"
-    -   id: "indexAdapter"
-        className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
-    -   id: "metricConfig"
-        className: "org.apache.commons.configuration.BaseConfiguration"
-        configMethods:
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.graphite"
-                    - "${org.apache.metron.metrics.reporter.graphite}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.console"
-                    - "${org.apache.metron.metrics.reporter.console}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.jmx"
-                    - "${org.apache.metron.metrics.reporter.jmx}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.address"
-                    - "${org.apache.metron.metrics.graphite.address}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.port"
-                    - "${org.apache.metron.metrics.graphite.port}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
+    -   id: "writer"
+        className: "org.apache.metron.writer.KafkaWriter"
+        constructorArgs:
+            - "${kafka.broker}"
     -   id: "zkHosts"
         className: "storm.kafka.ZkHosts"
         constructorArgs:
@@ -100,12 +42,24 @@ components:
             # id
             - "${spout.kafka.topic.bro}"
         properties:
-            -   name: "forceFromStart"
+            -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
                 value: -1
+            -   name: "socketTimeoutMs"
+                value: 1000000
 
 spouts:
+    -   id: "testingSpout"
+        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
+        parallelism: 1
+        configMethods:
+            -   name: "withFilename"
+                args:
+                    - "SampleInput/YafExampleOutput"
+            -   name: "withRepeating"
+                args:
+                    - true
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:
@@ -113,94 +67,16 @@ spouts:
 
 bolts:
     -   id: "parserBolt"
-        className: "org.apache.metron.bolt.TelemetryParserBolt"
-        configMethods:
-            -   name: "withMessageParser"
-                args:
-                    - ref: "broParser"
-    -   id: "indexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "bro_index"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM.dd.hh"
-            -   name: "withDocumentName"
-                args:
-                    - "bro_doc"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "errorIndexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "error"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM"
-            -   name: "withDocumentName"
-                args:
-                    - "bro_error"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
+        className: "org.apache.metron.bolt.ParserBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+            - "yaf"
+            - ref: "parser"
+            - ref: "writer"
 
 streams:
-    -   name: "spout -> parser"
+    -   name: "spout -> bolt"
         from: "kafkaSpout"
         to: "parserBolt"
         grouping:
             type: SHUFFLE
-    -   name: "parser -> indexing"
-        from: "parserBolt"
-        to: "indexingBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> errors"
-        from: "parserBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE
-    -   name: "indexing -> errors"
-        from: "indexingBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
new file mode 100644
index 0000000..3bd3eed
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "bro-test"
+config:
+    topology.workers: 1
+
+
+components:
+    -   id: "parser"
+        className: "org.apache.metron.parsing.parsers.BasicBroParser"
+    -   id: "writer"
+        className: "org.apache.metron.writer.KafkaWriter"
+        constructorArgs:
+            - "${kafka.broker}"
+    -   id: "zkHosts"
+        className: "storm.kafka.ZkHosts"
+        constructorArgs:
+            - "${kafka.zk}"
+    -   id: "kafkaConfig"
+        className: "storm.kafka.SpoutConfig"
+        constructorArgs:
+            # zookeeper hosts
+            - ref: "zkHosts"
+            # topic name
+            - "${spout.kafka.topic.bro}"
+            # zk root
+            - ""
+            # id
+            - "${spout.kafka.topic.bro}"
+        properties:
+            -   name: "ignoreZkOffsets"
+                value: true
+            -   name: "startOffsetTime"
+                value: -2
+            -   name: "socketTimeoutMs"
+                value: 1000000
+
+spouts:
+    -   id: "testingSpout"
+        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
+        parallelism: 1
+        configMethods:
+            -   name: "withFilename"
+                args:
+                    - "SampleInput/YafExampleOutput"
+            -   name: "withRepeating"
+                args:
+                    - false
+    -   id: "kafkaSpout"
+        className: "storm.kafka.KafkaSpout"
+        constructorArgs:
+            - ref: "kafkaConfig"
+
+bolts:
+    -   id: "parserBolt"
+        className: "org.apache.metron.bolt.ParserBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+            - "yaf"
+            - ref: "parser"
+            - ref: "writer"
+
+streams:
+    -   name: "spout -> bolt"
+        from: "kafkaSpout"
+        to: "parserBolt"
+        grouping:
+            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
new file mode 100644
index 0000000..8033374
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
@@ -0,0 +1,331 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "enrichment"
+config:
+    topology.workers: 1
+
+components:
+# Enrichment
+    -   id: "jdbcConfig"
+        className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+        properties:
+            -   name: "host"
+                value: "${mysql.ip}"
+            -   name: "port"
+                value: ${mysql.port}
+            -   name: "username"
+                value: "${mysql.username}"
+            -   name: "password"
+                value: "${mysql.password}"
+            -   name: "table"
+                value: "GEO"
+    -   id: "geoEnrichmentAdapter"
+        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
+        configMethods:
+            -   name: "withJdbcConfig"
+                args:
+                    - ref: "jdbcConfig"
+    -   id: "geoEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        constructorArgs:
+            -   "geo"
+            -   ref: "geoEnrichmentAdapter"
+    -   id: "hostEnrichmentAdapter"
+        className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
+        constructorArgs:
+            - '${org.apache.metron.enrichment.host.known_hosts}'
+    -   id: "hostEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        constructorArgs:
+            -   "host"
+            -   ref: "hostEnrichmentAdapter"
+    -   id: "enrichments"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "add"
+                args:
+                    - ref: "hostEnrichment"
+
+# Threat Intel
+    -   id: "ipThreatIntelConfig"
+        className: "org.apache.metron.threatintel.ThreatIntelConfig"
+        configMethods:
+            -   name: "withProviderImpl"
+                args:
+                    - "${hbase.provider.impl}"
+            -   name: "withTrackerHBaseTable"
+                args:
+                    - "${threat.intel.tracker.table}"
+            -   name: "withTrackerHBaseCF"
+                args:
+                    - "${threat.intel.tracker.cf}"
+            -   name: "withHBaseTable"
+                args:
+                    - "${threat.intel.ip.table}"
+            -   name: "withHBaseCF"
+                args:
+                    - "${threat.intel.ip.cf}"
+    -   id: "ipThreatIntelAdapter"
+        className: "org.apache.metron.threatintel.ThreatIntelAdapter"
+        configMethods:
+           -    name: "withConfig"
+                args:
+                    - ref: "ipThreatIntelConfig"
+    -   id: "ipThreatIntelEnrichment"
+        className: "org.apache.metron.domain.Enrichment"
+        constructorArgs:
+          -   "ip"
+          -   ref: "ipThreatIntelAdapter"
+    -   id: "threatIntels"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - ref: "ipThreatIntelEnrichment"
+
+#indexing
+    -   id: "indexWriter"
+        className: "org.apache.metron.writer.ElasticsearchWriter"
+        constructorArgs:
+            - "${es.clustername}"
+            - "${es.ip}"
+            - ${es.port}
+            - "yyyy.MM.dd.hh"
+
+#kafka/zookeeper
+    -   id: "zkHosts"
+        className: "storm.kafka.ZkHosts"
+        constructorArgs:
+            - "${kafka.zk}"
+    -   id: "kafkaConfig"
+        className: "storm.kafka.SpoutConfig"
+        constructorArgs:
+            # zookeeper hosts
+            - ref: "zkHosts"
+            # topic name
+            - "enrichments"
+            # zk root
+            - ""
+            # id
+            - "enrichments"
+        properties:
+            -   name: "ignoreZkOffsets"
+                value: true
+            -   name: "startOffsetTime"
+                value: -1
+
+spouts:
+    -   id: "testingSpout"
+        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
+        parallelism: 1
+        configMethods:
+            -   name: "withFilename"
+                args:
+                    - "SampleInput/YafExampleOutput"
+            -   name: "withRepeating"
+                args:
+                    - true
+    -   id: "kafkaSpout"
+        className: "storm.kafka.KafkaSpout"
+        constructorArgs:
+            - ref: "kafkaConfig"
+bolts:
+# Enrichment Bolts
+    -   id: "enrichmentSplitBolt"
+        className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withEnrichments"
+                args:
+                    - ref: "enrichments"
+    -   id: "geoEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "hostEnrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "hostEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "enrichmentJoinBolt"
+        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withEnrichments"
+                args:
+                    - ref: "enrichments"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+
+# Threat Intel Bolts
+    -   id: "threatIntelSplitBolt"
+        className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withEnrichments"
+                args:
+                    - ref: "threatIntels"
+            -   name: "withMessageFieldName"
+                args: ["message"]
+    -   id: "ipThreatIntelBolt"
+        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withEnrichment"
+                args:
+                    - ref: "ipThreatIntelEnrichment"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+    -   id: "threatIntelJoinBolt"
+        className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withEnrichments"
+                args:
+                    - ref: "threatIntels"
+            -   name: "withMaxCacheSize"
+                args: [10000]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+# Indexing Bolts
+    -   id: "indexingBolt"
+        className: "org.apache.metron.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withBulkMessageWriter"
+                args:
+                    - ref: "indexWriter"
+
+
+streams:
+#parser
+    -   name: "spout -> enrichmentSplit"
+        from: "kafkaSpout"
+        to: "enrichmentSplitBolt"
+        grouping:
+            type: SHUFFLE
+
+#enrichment
+    -   name: "enrichmentSplit -> host"
+        from: "enrichmentSplitBolt"
+        to: "hostEnrichmentBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
+    -   name: "enrichmentSplit -> geo"
+        from: "enrichmentSplitBolt"
+        to: "geoEnrichmentBolt"
+        grouping:
+            streamId: "geo"
+            type: FIELDS
+            args: ["key"]
+    -   name: "splitter -> join"
+        from: "enrichmentSplitBolt"
+        to: "enrichmentJoinBolt"
+        grouping:
+            streamId: "message"
+            type: FIELDS
+            args: ["key"]
+    -   name: "geo -> join"
+        from: "geoEnrichmentBolt"
+        to: "enrichmentJoinBolt"
+        grouping:
+            streamId: "geo"
+            type: FIELDS
+            args: ["key"]
+    -   name: "host -> join"
+        from: "hostEnrichmentBolt"
+        to: "enrichmentJoinBolt"
+        grouping:
+            streamId: "host"
+            type: FIELDS
+            args: ["key"]
+
+#threat intel
+    -   name: "enrichmentJoin -> threatSplit"
+        from: "enrichmentJoinBolt"
+        to: "threatIntelSplitBolt"
+        grouping:
+            streamId: "message"
+            type: FIELDS
+            args: ["key"]
+
+    -   name: "threatSplit -> ip"
+        from: "threatIntelSplitBolt"
+        to: "ipThreatIntelBolt"
+        grouping:
+            streamId: "ip"
+            type: FIELDS
+            args: ["key"]
+
+    -   name: "ip -> join"
+        from: "ipThreatIntelBolt"
+        to: "threatIntelJoinBolt"
+        grouping:
+            streamId: "ip"
+            type: FIELDS
+            args: ["key"]
+    -   name: "threatIntelSplit -> threatIntelJoin"
+        from: "threatIntelSplitBolt"
+        to: "threatIntelJoinBolt"
+        grouping:
+            streamId: "message"
+            type: FIELDS
+            args: ["key"]
+#indexing
+    -   name: "threatIntelJoin -> indexing"
+        from: "threatIntelJoinBolt"
+        to: "indexingBolt"
+        grouping:
+            streamId: "message"
+            type: FIELDS
+            args: ["key"]
+    -   name: "indexingBolt -> errorIndexingBolt"
+        from: "indexingBolt"
+        to: "indexingBolt"
+        grouping:
+            streamId: "error"
+            type: SHUFFLE


Mime
View raw message