metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [3/5] incubator-metron git commit: METRON-154: Decouple enrichment and indexing closes apache/incubator-metron#192
Date Wed, 20 Jul 2016 13:14:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/pom.xml b/metron-platform/metron-indexing/pom.xml
new file mode 100644
index 0000000..9760a11
--- /dev/null
+++ b/metron-platform/metron-indexing/pom.xml
@@ -0,0 +1,253 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+  the specific language governing permissions and limitations under the License. 
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-platform</artifactId>
+        <version>0.2.0BETA</version>
+    </parent>
+    <artifactId>metron-indexing</artifactId>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <slf4j.version>1.7.7</slf4j.version>
+        <storm.hdfs.version>0.1.2</storm.hdfs.version>
+        <guava.version>${global_hbase_guava_version}</guava.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hdfs</artifactId>
+            <version>${global_storm_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.storm</groupId>
+                    <artifactId>storm-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_guava_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${global_hadoop_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${global_mockito_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-validator</groupId>
+            <artifactId>commons-validator</artifactId>
+            <version>1.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-test-utilities</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-integration-test</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-enrichment</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemProperties>
+                        <property>
+                            <name>mode</name>
+                            <value>global</value>
+                        </property>
+                    </systemProperties>
+                </configuration>
+            </plugin>
+
+            <!-- Normally, dependency report takes time, skip it -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-project-info-reports-plugin</artifactId>
+                <version>2.7</version>
+
+                <configuration>
+                    <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>emma-maven-plugin</artifactId>
+                <version>1.0-alpha-3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-pmd-plugin</artifactId>
+                <configuration>
+                  <targetJdk>${global_java_version}</targetJdk>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                  <source>${global_java_version}</source>
+                  <target>${global_java_version}</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${global_shade_version}</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                    <artifactSet>
+                        <excludes>
+                            <exclude>*slf4j*</exclude>
+                        </excludes>
+                    </artifactSet>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.metron.guava</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                    <resource>.yaml</resource>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/main/assembly/assembly.xml</descriptor>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-indexing/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/assembly/assembly.xml b/metron-platform/metron-indexing/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..6c9165c
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/assembly/assembly.xml
@@ -0,0 +1,57 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<assembly>
+  <id>archive</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${project.basedir}/src/main/config</directory>
+      <outputDirectory>/config</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0644</fileMode>
+      <lineEnding>unix</lineEnding>
+      <filtered>true</filtered>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/src/main/scripts</directory>
+      <outputDirectory>/bin</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0755</fileMode>
+      <lineEnding>unix</lineEnding>
+      <filtered>true</filtered>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/src/main/flux</directory>
+      <outputDirectory>/flux</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0644</fileMode>
+      <lineEnding>unix</lineEnding>
+    </fileSet>
+  </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
new file mode 100644
index 0000000..d1e697b
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "indexing"
+config:
+    topology.workers: ${indexing.workers}
+    topology.acker.executors: ${indexing.executors}
+
+components:
+
+    -   id: "fileNameFormat"
+        className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
+        configMethods:
+            -   name: "withPrefix"
+                args:
+                    - "enrichment-"
+            -   name: "withExtension"
+                args:
+                  - ".json"
+            -   name: "withPath"
+                args:
+                    - "${index.hdfs.output}"
+
+    -   id: "hdfsRotationPolicy"
+        className: "${bolt.hdfs.rotation.policy}"
+        constructorArgs:
+          -  ${bolt.hdfs.rotation.policy.count}
+          - "${bolt.hdfs.rotation.policy.units}"
+#indexing
+    -   id: "hdfsWriter"
+        className: "org.apache.metron.writer.hdfs.HdfsWriter"
+        configMethods:
+            -   name: "withFileNameFormat"
+                args:
+                    - ref: "fileNameFormat"
+            -   name: "withRotationPolicy"
+                args:
+                    - ref: "hdfsRotationPolicy"
+    -   id: "kafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
+        configMethods:
+            -   name: "withTopic"
+                args:
+                    - "${index.error.topic}"
+            -   name: "withZkQuorum"
+                args:
+                    - "${kafka.zk}"
+
+    -   id: "indexWriter"
+        className: "${writer.class.name}"
+
+#kafka/zookeeper
+    -   id: "zkHosts"
+        className: "storm.kafka.ZkHosts"
+        constructorArgs:
+            - "${kafka.zk}"
+    -   id: "kafkaConfig"
+        className: "org.apache.metron.common.spout.kafka.SpoutConfig"
+        constructorArgs:
+            # zookeeper hosts
+            - ref: "zkHosts"
+            # topic name
+            - "${index.input.topic}"
+            # zk root
+            - ""
+            # id
+            - "indexing"
+        configMethods:
+            -   name: "from"
+                args:
+                    - "${kafka.start}"
+
+spouts:
+    -   id: "kafkaSpout"
+        className: "storm.kafka.KafkaSpout"
+        constructorArgs:
+            - ref: "kafkaConfig"
+bolts:
+
+# Indexing Bolts
+    -   id: "indexingBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withBulkMessageWriter"
+                args:
+                    - ref: "indexWriter"
+            -   name: "withMessageGetter"
+                args:
+                    - "RAW"
+    -   id: "hdfsIndexingBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withBulkMessageWriter"
+                args:
+                    - ref: "hdfsWriter"
+            -   name: "withMessageGetter"
+                args:
+                    - "RAW"
+
+    -   id: "indexingErrorBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withMessageWriter"
+                args:
+                    - ref: "kafkaWriter"
+
+streams:
+
+    -   name: "spout -> indexing"
+        from: "kafkaSpout"
+        to: "indexingBolt"
+        grouping:
+            type: SHUFFLE
+
+    -   name: "spout -> hdfs"
+        from: "kafkaSpout"
+        to: "hdfsIndexingBolt"
+        grouping:
+            type: SHUFFLE
+
+    -   name: "indexingBolt -> errorIndexingBolt"
+        from: "indexingBolt"
+        to: "indexingErrorBolt"
+        grouping:
+            streamId: "error"
+            type: SHUFFLE
+
+    -   name: "hdfsBolt -> errorIndexingBolt"
+        from: "hdfsIndexingBolt"
+        to: "indexingErrorBolt"
+        grouping:
+            streamId: "error"
+            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
new file mode 100644
index 0000000..2c85308
--- /dev/null
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.integration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.common.spout.kafka.SpoutConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
+import org.apache.metron.integration.BaseIntegrationTest;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
+  protected String hdfsDir = "target/indexingIntegrationTest/hdfs";
+  protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
+  protected String fluxPath = "../metron-indexing/src/main/flux/indexing/remote.yaml";
+  protected String testSensorType = "test";
+
+
+  public static List<Map<String, Object>> readDocsFromDisk(String hdfsDirStr) throws IOException {
+    List<Map<String, Object>> ret = new ArrayList<>();
+    File hdfsDir = new File(hdfsDirStr);
+    Stack<File> fs = new Stack<>();
+    if (hdfsDir.exists()) {
+      fs.push(hdfsDir);
+      while (!fs.empty()) {
+        File f = fs.pop();
+        if (f.isDirectory()) {
+          for (File child : f.listFiles()) {
+            fs.push(child);
+          }
+        } else {
+          System.out.println("Processed " + f);
+          if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
+            List<byte[]> data = TestUtils.readSampleData(f.getPath());
+            Iterables.addAll(ret, Iterables.transform(data, new Function<byte[], Map<String, Object>>() {
+              @Nullable
+              @Override
+              public Map<String, Object> apply(@Nullable byte[] bytes) {
+                String s = new String(bytes);
+                try {
+                  return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() {
+                  });
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            }));
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
+  public static void cleanHdfsDir(String hdfsDirStr) {
+    File hdfsDir = new File(hdfsDirStr);
+    Stack<File> fs = new Stack<>();
+    if (hdfsDir.exists()) {
+      fs.push(hdfsDir);
+      while (!fs.empty()) {
+        File f = fs.pop();
+        if (f.isDirectory()) {
+          for (File child : f.listFiles()) {
+            fs.push(child);
+          }
+        } else {
+          if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
+            f.delete();
+          }
+        }
+      }
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    cleanHdfsDir(hdfsDir);
+    final String dateFormat = "yyyy.MM.dd.HH";
+    final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
+    final Properties topologyProperties = new Properties() {{
+      setProperty("kafka.start", SpoutConfig.Offset.BEGINNING.name());
+      setProperty("indexing.workers", "1");
+      setProperty("indexing.executors", "0");
+      setProperty("index.input.topic", Constants.INDEXING_TOPIC);
+      setProperty("index.error.topic", Constants.INDEXING_ERROR_TOPIC);
+      setProperty("index.date.format", dateFormat);
+      //HDFS settings
+
+      setProperty("bolt.hdfs.rotation.policy", TimedRotationPolicy.class.getCanonicalName());
+      setProperty("bolt.hdfs.rotation.policy.count", "1");
+      setProperty("bolt.hdfs.rotation.policy.units", "DAYS");
+      setProperty("index.hdfs.output", hdfsDir);
+    }};
+    setAdditionalProperties(topologyProperties);
+    final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
+      add(new KafkaWithZKComponent.Topic(Constants.INDEXING_TOPIC, 1));
+      add(new KafkaWithZKComponent.Topic(Constants.INDEXING_ERROR_TOPIC, 1));
+    }});
+    List<Map<String, Object>> inputDocs = new ArrayList<>();
+    for(byte[] b : inputMessages) {
+      Map<String, Object> m = JSONUtils.INSTANCE.load(new String(b), new TypeReference<Map<String, Object>>() {});
+      inputDocs.add(m);
+
+    }
+    ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
+            .withTopologyProperties(topologyProperties)
+            .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
+            .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH);
+    FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
+            .withTopologyLocation(new File(fluxPath))
+            .withTopologyName("test")
+            .withTopologyProperties(topologyProperties)
+            .build();
+
+
+    ComponentRunner runner = new ComponentRunner.Builder()
+            .withComponent("kafka", kafkaComponent)
+            .withComponent("config", configUploadComponent)
+            .withComponent("storm", fluxComponent)
+            .withComponent("search", getSearchComponent(topologyProperties))
+            .withMillisecondsBetweenAttempts(15000)
+            .withNumRetries(10)
+            .build();
+    runner.start();
+
+    try {
+      fluxComponent.submitTopology();
+
+      kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, inputMessages);
+      List<Map<String, Object>> docs = cleanDocs(runner.process(getProcessor(inputMessages)));
+      Assert.assertEquals(docs.size(), inputMessages.size());
+      //assert that our input docs are equivalent to the output docs, converting the input docs keys based
+      // on the field name converter
+      assertInputDocsMatchOutputs(inputDocs, docs, getFieldNameConverter());
+      assertInputDocsMatchOutputs(inputDocs, readDocsFromDisk(hdfsDir), x -> x);
+    }
+    finally {
+      if(runner != null) {
+        runner.stop();
+      }
+    }
+  }
+
+  public List<Map<String, Object>> cleanDocs(List<Map<String, Object>> docs) {
+    List<Map<String, Object>> ret = new ArrayList<>();
+    for (Map<String, Object> doc : docs) {
+      Map<String, Object> msg = new HashMap<>();
+      for (Map.Entry<String, Object> kv : doc.entrySet()) {
+        //for writers like solr who modify the keys, we want to undo that if we can
+        msg.put(cleanField(kv.getKey()), kv.getValue());
+      }
+      ret.add(msg);
+    }
+    return ret;
+  }
+
+  public void assertInputDocsMatchOutputs( List<Map<String, Object>> inputDocs
+                                         , List<Map<String, Object>> indexDocs
+                                         , FieldNameConverter converter
+                                         )
+  {
+    for(Map<String, Object> indexDoc : indexDocs) {
+      boolean foundMatch = false;
+      for(Map<String, Object> doc : inputDocs) {
+        if(docMatches(indexDoc, doc, converter)) {
+          foundMatch = true;
+          break;
+        }
+      }
+      if(!foundMatch) {
+        System.err.println("Unable to find: ");
+        printMessage(indexDoc);
+        dumpMessages("INPUT DOCS:", inputDocs);
+      }
+      Assert.assertTrue(foundMatch);
+    }
+  }
+
+  private void printMessage(Map<String, Object> doc) {
+    TreeMap<String, Object> d = new TreeMap<>(doc);
+      for(Map.Entry<String, Object> kv : d.entrySet()) {
+        System.err.println("  " + kv.getKey() + " -> " + kv.getValue());
+      }
+  }
+
+  private void dumpMessages(String title, List<Map<String, Object>> docs) {
+    System.err.println(title);
+    int cnt = 0;
+    for(Map<String, Object> doc : docs) {
+      System.err.println("MESSAGE " + cnt++);
+      printMessage(doc);
+    }
+  }
+
+  boolean docMatches(Map<String, Object> indexedDoc, Map<String, Object> inputDoc, FieldNameConverter converter) {
+    String key = "original_string";
+    String indexKey = converter.convert(key);
+    String originalString = inputDoc.get(key).toString();
+    return originalString.equals(indexedDoc.get(indexKey).toString());
+  }
+  public abstract Processor<List<Map<String, Object>>> getProcessor (List <byte[]>inputMessages);
+  public abstract FieldNameConverter getFieldNameConverter();
+  public abstract InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception;
+  public abstract void setAdditionalProperties(Properties topologyProperties);
+  public abstract String cleanField(String field);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/pom.xml b/metron-platform/metron-integration-test/pom.xml
index 130de5e..4aa10cf 100644
--- a/metron-platform/metron-integration-test/pom.xml
+++ b/metron-platform/metron-integration-test/pom.xml
@@ -143,11 +143,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.metron</groupId>
-      <artifactId>metron-enrichment</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.metron</groupId>
       <artifactId>metron-test-utilities</artifactId>
       <version>${project.parent.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
index 1a3247c..5fdae49 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
@@ -20,7 +20,6 @@ package org.apache.metron.integration;
 import com.google.common.base.Function;
 import org.apache.metron.TestConstants;
 import org.apache.metron.integration.components.KafkaWithZKComponent;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
 
 import javax.annotation.Nullable;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
deleted file mode 100644
index 483f2b5..0000000
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
+++ /dev/null
@@ -1,489 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.*;
-
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.common.Constants;
-import org.apache.metron.TestConstants;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.interfaces.FieldNameConverter;
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
-import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
-import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.enrichment.converter.EnrichmentHelper;
-import org.apache.metron.integration.components.ConfigUploadComponent;
-import org.apache.metron.integration.utils.TestUtils;
-import org.apache.metron.test.utils.UnitTestHelper;
-import org.apache.metron.integration.components.FluxTopologyComponent;
-import org.apache.metron.integration.components.KafkaWithZKComponent;
-import org.apache.metron.integration.mock.MockGeoAdapter;
-import org.apache.metron.test.mock.MockHTable;
-import org.apache.metron.enrichment.lookup.LookupKV;
-
-import org.apache.metron.integration.utils.SampleUtil;
-import org.apache.metron.common.utils.JSONUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.Stack;
-
-public abstract class EnrichmentIntegrationTest extends BaseIntegrationTest {
-
-  private static final String SRC_IP = "ip_src_addr";
-  private static final String DST_IP = "ip_dst_addr";
-  private static final String MALICIOUS_IP_TYPE = "malicious_ip";
-  private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification";
-  private static final Map<String, Object> PLAYFUL_ENRICHMENT = new HashMap<String, Object>() {{
-    put("orientation", "north");
-  }};
-  protected String testSensorType = "test";
-  protected String hdfsDir = "target/enrichmentIntegrationTest/hdfs";
-  protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/test.yaml";
-  protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
-  private String sampleIndexedPath = TestConstants.SAMPLE_DATA_INDEXED_PATH + "TestIndexed";
-
-  public static class Provider implements TableProvider, Serializable {
-    MockHTable.Provider  provider = new MockHTable.Provider();
-    @Override
-    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
-      return provider.getTable(config, tableName);
-    }
-  }
-
-  public static void cleanHdfsDir(String hdfsDirStr) {
-    File hdfsDir = new File(hdfsDirStr);
-    Stack<File> fs = new Stack<>();
-    if(hdfsDir.exists()) {
-      fs.push(hdfsDir);
-      while(!fs.empty()) {
-        File f = fs.pop();
-        if (f.isDirectory()) {
-          for(File child : f.listFiles()) {
-            fs.push(child);
-          }
-        }
-        else {
-          if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
-            f.delete();
-          }
-        }
-      }
-    }
-  }
-
-  public static List<Map<String, Object> > readDocsFromDisk(String hdfsDirStr) throws IOException {
-    List<Map<String, Object>> ret = new ArrayList<>();
-    File hdfsDir = new File(hdfsDirStr);
-    Stack<File> fs = new Stack<>();
-    if(hdfsDir.exists()) {
-      fs.push(hdfsDir);
-      while(!fs.empty()) {
-        File f = fs.pop();
-        if(f.isDirectory()) {
-          for (File child : f.listFiles()) {
-            fs.push(child);
-          }
-        }
-        else {
-          System.out.println("Processed " + f);
-          if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
-            List<byte[]> data = TestUtils.readSampleData(f.getPath());
-            Iterables.addAll(ret, Iterables.transform(data, new Function<byte[], Map<String, Object>>() {
-              @Nullable
-              @Override
-              public Map<String, Object> apply(@Nullable byte[] bytes) {
-                String s = new String(bytes);
-                try {
-                  return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() {
-                  });
-                } catch (IOException e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            }));
-          }
-        }
-      }
-    }
-    return ret;
-  }
-
-
-  @Test
-  public void test() throws Exception {
-    cleanHdfsDir(hdfsDir);
-    final EnrichmentConfigurations configurations = SampleUtil.getSampleEnrichmentConfigs();
-    final String dateFormat = "yyyy.MM.dd.HH";
-    final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
-    final String cf = "cf";
-    final String trackerHBaseTableName = "tracker";
-    final String threatIntelTableName = "threat_intel";
-    final String enrichmentsTableName = "enrichments";
-    final Properties topologyProperties = new Properties() {{
-      setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"},\n" +
-              "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"},\n" +
-              "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"},\n" +
-              "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
-      setProperty("hbase.provider.impl","" + Provider.class.getName());
-      setProperty("threat.intel.tracker.table", trackerHBaseTableName);
-      setProperty("threat.intel.tracker.cf", cf);
-      setProperty("threat.intel.simple.hbase.table", threatIntelTableName);
-      setProperty("threat.intel.simple.hbase.cf", cf);
-      setProperty("enrichment.simple.hbase.table", enrichmentsTableName);
-      setProperty("enrichment.simple.hbase.cf", cf);
-      setProperty("es.clustername", "metron");
-      setProperty("es.port", "9300");
-      setProperty("es.ip", "localhost");
-      setProperty("index.date.format", dateFormat);
-      setProperty("index.hdfs.output", hdfsDir);
-    }};
-    setAdditionalProperties(topologyProperties);
-    final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
-      add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
-    }});
-
-    ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
-            .withTopologyProperties(topologyProperties)
-            .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
-            .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH);
-
-    //create MockHBaseTables
-    final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTableName, cf);
-    final MockHTable threatIntelTable = (MockHTable)MockHTable.Provider.addToCache(threatIntelTableName, cf);
-    EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{
-      add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new EnrichmentValue(new HashMap<>())));
-    }});
-    final MockHTable enrichmentTable = (MockHTable)MockHTable.Provider.addToCache(enrichmentsTableName, cf);
-    EnrichmentHelper.INSTANCE.load(enrichmentTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{
-      add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3")
-                        , new EnrichmentValue(PLAYFUL_ENRICHMENT )
-                        )
-         );
-    }});
-    FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
-            .withTopologyLocation(new File(fluxPath))
-            .withTopologyName("test")
-            .withTopologyProperties(topologyProperties)
-            .build();
-
-    InMemoryComponent searchComponent = getSearchComponent(topologyProperties);
-
-    UnitTestHelper.verboseLogging();
-    ComponentRunner runner = new ComponentRunner.Builder()
-            .withComponent("kafka", kafkaComponent)
-            .withComponent("config", configUploadComponent)
-            .withComponent("search", searchComponent)
-            .withComponent("storm", fluxComponent)
-            .withMillisecondsBetweenAttempts(15000)
-            .withNumRetries(10)
-            .build();
-    runner.start();
-
-    try {
-      fluxComponent.submitTopology();
-
-      kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
-      List<Map<String, Object>> docs = runner.process(getProcessor(inputMessages));
-      Assert.assertEquals(inputMessages.size(), docs.size());
-      List<Map<String, Object>> cleanedDocs = cleanDocs(docs);
-      validateAll(cleanedDocs, getFieldNameConverter());
-
-
-      List<Map<String, Object>> docsFromDisk = readDocsFromDisk(hdfsDir);
-      Assert.assertEquals(docsFromDisk.size(), docs.size()) ;
-      Assert.assertEquals(new File(hdfsDir).list().length, 1);
-      Assert.assertEquals(new File(hdfsDir).list()[0], testSensorType);
-      //we want the identity transformation when dealing with docs on disk.
-      validateAll(docsFromDisk, fieldName -> fieldName);
-    }
-    finally {
-      cleanHdfsDir(hdfsDir);
-      runner.stop();
-    }
-  }
-
-  public List<Map<String, Object>> cleanDocs(List<Map<String, Object>> docs) {
-    List<Map<String, Object>> cleanedDocs = new ArrayList<>();
-    for(Map<String, Object> doc: docs) {
-      Map<String, Object> cleanedFields = new HashMap<>();
-      for(String field: doc.keySet()) {
-        cleanedFields.put(cleanField(field), doc.get(field));
-      }
-      cleanedDocs.add(cleanedFields);
-    }
-    return cleanedDocs;
-  }
-
-  public static void validateAll(List<Map<String, Object>> docs, FieldNameConverter fnc) {
-
-    for (Map<String, Object> doc : docs) {
-      baseValidation(doc, fnc);
-      hostEnrichmentValidation(doc, fnc);
-      geoEnrichmentValidation(doc, fnc);
-      threatIntelValidation(doc, fnc);
-      simpleEnrichmentValidation(doc, fnc);
-    }
-  }
-
-  public static void baseValidation(Map<String, Object> jsonDoc, FieldNameConverter fnc) {
-    assertEnrichmentsExists("threatintels.", setOf("hbaseThreatIntel"), jsonDoc.keySet());
-    assertEnrichmentsExists("enrichments.", setOf("geo", "host", "hbaseEnrichment" ), jsonDoc.keySet());
-    for(Map.Entry<String, Object> kv : jsonDoc.entrySet()) {
-      //ensure no values are empty.
-      Assert.assertTrue(kv.getValue().toString().length() > 0);
-    }
-    //ensure we always have a source ip and destination ip
-    Assert.assertNotNull(jsonDoc.get(SRC_IP));
-    Assert.assertNotNull(jsonDoc.get(DST_IP));
-  }
-
-  private static class EvaluationPayload {
-    Map<String, Object> indexedDoc;
-    String key;
-    FieldNameConverter fnc;
-    public EvaluationPayload(Map<String, Object> indexedDoc, String key, FieldNameConverter fnc) {
-      this.indexedDoc = indexedDoc;
-      this.key = key;
-      this.fnc = fnc;
-    }
-  }
-
-  private static enum HostEnrichments implements Predicate<EvaluationPayload>{
-
-    LOCAL_LOCATION(new Predicate<EvaluationPayload>() {
-
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-
-        return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.local"),"").equals("YES");
-
-      }
-    })
-
-    ,UNKNOWN_LOCATION(new Predicate<EvaluationPayload>() {
-
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.local"),"").equals("UNKNOWN");
-      }
-    })
-    ,IMPORTANT(new Predicate<EvaluationPayload>() {
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.asset_value"),"").equals("important");
-      }
-    })
-    ,PRINTER_TYPE(new Predicate<EvaluationPayload>() {
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.type"),"").equals("printer");
-      }
-    })
-    ,WEBSERVER_TYPE(new Predicate<EvaluationPayload>() {
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.type"),"").equals("webserver");
-      }
-    })
-    ,UNKNOWN_TYPE(new Predicate<EvaluationPayload>() {
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.type"),"").equals("unknown");
-      }
-    })
-    ;
-
-    Predicate<EvaluationPayload> _predicate;
-    HostEnrichments(Predicate<EvaluationPayload> predicate) {
-      this._predicate = predicate;
-    }
-
-    public boolean apply(EvaluationPayload payload) {
-      return _predicate.apply(payload);
-    }
-
-  }
-
-  private static void assertEnrichmentsExists(String topLevel, Set<String> expectedEnrichments, Set<String> keys) {
-    for(String key : keys) {
-      if(key.startsWith(topLevel)) {
-        String secondLevel = Iterables.get(Splitter.on(".").split(key), 1);
-        String message = "Found an enrichment/threat intel (" + secondLevel + ") that I didn't expect (expected enrichments :"
-                       + Joiner.on(",").join(expectedEnrichments) + "), but it was not there.  If you've created a new"
-                       + " enrichment, then please add a validation method to this unit test.  Otherwise, it's a solid error"
-                       + " and should be investigated.";
-        Assert.assertTrue( message, expectedEnrichments.contains(secondLevel));
-      }
-    }
-  }
-  private static void simpleEnrichmentValidation(Map<String, Object> indexedDoc, FieldNameConverter fnc) {
-    if(indexedDoc.getOrDefault(fnc.convert(SRC_IP),"").equals("10.0.2.3")
-            || indexedDoc.getOrDefault(fnc.convert(DST_IP),"").equals("10.0.2.3")
-            ) {
-      Assert.assertTrue(keyPatternExists(fnc.convert("enrichments.hbaseEnrichment"), indexedDoc));
-      if(indexedDoc.getOrDefault(fnc.convert(SRC_IP),"").equals("10.0.2.3")) {
-        Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.hbaseEnrichment." + SRC_IP + "." + PLAYFUL_CLASSIFICATION_TYPE+ ".orientation"))
-                , PLAYFUL_ENRICHMENT.get("orientation")
-        );
-      }
-      else if(indexedDoc.getOrDefault(fnc.convert(DST_IP),"").equals("10.0.2.3")) {
-        Assert.assertEquals( indexedDoc.get(fnc.convert("enrichments.hbaseEnrichment." + DST_IP + "." + PLAYFUL_CLASSIFICATION_TYPE + ".orientation"))
-                , PLAYFUL_ENRICHMENT.get("orientation")
-        );
-      }
-    }
-
-  }
-  private static void threatIntelValidation(Map<String, Object> indexedDoc, FieldNameConverter fnc) {
-    if(indexedDoc.getOrDefault(fnc.convert(SRC_IP),"").equals("10.0.2.3")
-    || indexedDoc.getOrDefault(fnc.convert(DST_IP),"").equals("10.0.2.3")
-            ) {
-      //if we have any threat intel messages, we want to tag is_alert to true
-      Assert.assertTrue(keyPatternExists(fnc.convert("threatintels."), indexedDoc));
-      Assert.assertTrue(indexedDoc.containsKey(fnc.convert("threat.triage.level")));
-      Assert.assertEquals(indexedDoc.getOrDefault(fnc.convert("is_alert"),""), "true");
-      Assert.assertEquals((double)indexedDoc.get(fnc.convert("threat.triage.level")), 10d, 1e-7);
-    }
-    else {
-      //For YAF this is the case, but if we do snort later on, this will be invalid.
-      Assert.assertNull(indexedDoc.get(fnc.convert("is_alert")));
-      Assert.assertFalse(keyPatternExists(fnc.convert("threatintels."), indexedDoc));
-    }
-    //ip threat intels
-    if(keyPatternExists(fnc.convert("threatintels.hbaseThreatIntel."), indexedDoc)) {
-      if(indexedDoc.getOrDefault(fnc.convert(SRC_IP),"").equals("10.0.2.3")) {
-        Assert.assertEquals(indexedDoc.get(fnc.convert("threatintels.hbaseThreatIntel." + SRC_IP + "." + MALICIOUS_IP_TYPE)), "alert");
-      }
-      else if(indexedDoc.getOrDefault(fnc.convert(DST_IP),"").equals("10.0.2.3")) {
-        Assert.assertEquals(indexedDoc.get(fnc.convert("threatintels.hbaseThreatIntel." + DST_IP + "." + MALICIOUS_IP_TYPE)), "alert");
-      }
-      else {
-        Assert.fail("There was a threat intels that I did not expect: " + indexedDoc);
-      }
-    }
-
-  }
-
-  private static void geoEnrichmentValidation(Map<String, Object> indexedDoc, FieldNameConverter fnc) {
-    //should have geo enrichment on every message due to mock geo adapter
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".location_point")), MockGeoAdapter.DEFAULT_LOCATION_POINT);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP +".location_point")), MockGeoAdapter.DEFAULT_LOCATION_POINT);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".longitude")), MockGeoAdapter.DEFAULT_LONGITUDE);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".longitude")), MockGeoAdapter.DEFAULT_LONGITUDE);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".city")), MockGeoAdapter.DEFAULT_CITY);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".city")), MockGeoAdapter.DEFAULT_CITY);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".latitude")), MockGeoAdapter.DEFAULT_LATITUDE);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".latitude")), MockGeoAdapter.DEFAULT_LATITUDE);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".country")), MockGeoAdapter.DEFAULT_COUNTRY);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".country")), MockGeoAdapter.DEFAULT_COUNTRY);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".dmaCode")), MockGeoAdapter.DEFAULT_DMACODE);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".dmaCode")), MockGeoAdapter.DEFAULT_DMACODE);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".postalCode")), MockGeoAdapter.DEFAULT_POSTAL_CODE);
-    Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".postalCode")), MockGeoAdapter.DEFAULT_POSTAL_CODE);
-  }
-
-  private static void hostEnrichmentValidation(Map<String, Object> indexedDoc, FieldNameConverter fnc) {
-    boolean enriched = false;
-    //important local printers
-    {
-      Set<String> ips = setOf("10.0.2.15", "10.60.10.254");
-      if (ips.contains(indexedDoc.get(fnc.convert(SRC_IP)))) {
-        //this is a local, important, printer
-        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
-                ,HostEnrichments.IMPORTANT
-                ,HostEnrichments.PRINTER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, SRC_IP, fnc))
-        );
-        enriched = true;
-      }
-      if (ips.contains(indexedDoc.get(fnc.convert(DST_IP)))) {
-        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
-                ,HostEnrichments.IMPORTANT
-                ,HostEnrichments.PRINTER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, DST_IP, fnc))
-        );
-        enriched = true;
-      }
-    }
-    //important local webservers
-    {
-      Set<String> ips = setOf("10.1.128.236");
-      if (ips.contains(indexedDoc.get(fnc.convert(SRC_IP)))) {
-        //this is a local, important, printer
-        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
-                ,HostEnrichments.IMPORTANT
-                ,HostEnrichments.WEBSERVER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, SRC_IP, fnc))
-        );
-        enriched = true;
-      }
-      if (ips.contains(indexedDoc.get(fnc.convert(DST_IP)))) {
-        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
-                ,HostEnrichments.IMPORTANT
-                ,HostEnrichments.WEBSERVER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, DST_IP, fnc))
-        );
-        enriched = true;
-      }
-    }
-    if(!enriched) {
-      Assert.assertFalse(keyPatternExists("enrichments.host", indexedDoc));
-    }
-  }
-
-
-  private static boolean keyPatternExists(String pattern, Map<String, Object> indexedObj) {
-    for(String k : indexedObj.keySet()) {
-      if(k.startsWith(pattern)) {
-        return true;
-      }
-    }
-    return false;
-  }
-  private static Set<String> setOf(String... items) {
-    Set<String> ret = new HashSet<>();
-    for(String item : items) {
-      ret.add(item);
-    }
-    return ret;
-  }
-
-  abstract public FieldNameConverter getFieldNameConverter();
-  abstract public InMemoryComponent getSearchComponent(Properties topologyProperties) throws Exception;
-  abstract public Processor<List<Map<String, Object>>> getProcessor(List<byte[]> inputMessages);
-  abstract public void setAdditionalProperties(Properties topologyProperties);
-  abstract public String cleanField(String field);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
deleted file mode 100644
index 7e0f325..0000000
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.components;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.configuration.SensorParserConfig;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.integration.InMemoryComponent;
-import org.apache.metron.integration.UnableToStartException;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-public class ConfigUploadComponent implements InMemoryComponent {
-
-  private Properties topologyProperties;
-  private String globalConfigPath;
-  private String parserConfigsPath;
-  private String enrichmentConfigsPath;
-  private Optional<String> globalConfig = Optional.empty();
-  private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
-  public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
-    this.topologyProperties = topologyProperties;
-    return this;
-  }
-
-  public ConfigUploadComponent withGlobalConfigsPath(String globalConfigPath) {
-    this.globalConfigPath = globalConfigPath;
-    return this;
-  }
-
-  public ConfigUploadComponent withParserConfigsPath(String parserConfigsPath) {
-    this.parserConfigsPath = parserConfigsPath;
-    return this;
-  }
-  public ConfigUploadComponent withEnrichmentConfigsPath(String enrichmentConfigsPath) {
-    this.enrichmentConfigsPath = enrichmentConfigsPath;
-    return this;
-  }
-
-  public ConfigUploadComponent withParserSensorConfig(String name, SensorParserConfig config) {
-    parserSensorConfigs.put(name, config);
-    return this;
-  }
-
-  public ConfigUploadComponent withGlobalConfig(String globalConfig) {
-    this.globalConfig = Optional.ofNullable(globalConfig);
-    return this;
-  }
-
-  @Override
-  public void start() throws UnableToStartException {
-    try {
-      if(globalConfigPath != null) {
-        ConfigurationsUtils.uploadConfigsToZookeeper(globalConfigPath
-                , parserConfigsPath
-                , enrichmentConfigsPath
-                , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
-        );
-      }
-      for(Map.Entry<String, SensorParserConfig> kv : parserSensorConfigs.entrySet()) {
-        ConfigurationsUtils.writeSensorParserConfigToZookeeper( kv.getKey()
-                                                              , kv.getValue()
-                                                              , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
-                                                              );
-      }
-      if(globalConfig.isPresent()) {
-        ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.get().getBytes()
-                                                        , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
-                                                        );
-      }
-
-    } catch (Exception e) {
-      throw new UnableToStartException(e.getMessage(), e);
-    }
-  }
-
-  public SensorParserConfig getSensorParserConfig(String sensorType) {
-    SensorParserConfig sensorParserConfig = new SensorParserConfig();
-    CuratorFramework client = ConfigurationsUtils.getClient(topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY));
-    client.start();
-    try {
-      sensorParserConfig = ConfigurationsUtils.readSensorParserConfigFromZookeeper(sensorType, client);
-    } catch (Exception e) {
-      e.printStackTrace();
-    } finally {
-      client.close();
-    }
-    return sensorParserConfig;
-  }
-
-  @Override
-  public void stop() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockGeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockGeoAdapter.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockGeoAdapter.java
deleted file mode 100644
index 7316112..0000000
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockGeoAdapter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.mock;
-
-import com.google.common.base.Joiner;
-import org.apache.metron.enrichment.bolt.CacheKey;
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.json.simple.JSONObject;
-
-import java.io.Serializable;
-
-public class MockGeoAdapter implements EnrichmentAdapter<CacheKey>,
-        Serializable {
-
-  public static final String DEFAULT_LOC_ID = "1";
-  public static final String DEFAULT_COUNTRY = "test country";
-  public static final String DEFAULT_CITY = "test city";
-  public static final String DEFAULT_POSTAL_CODE = "test postalCode";
-  public static final String DEFAULT_LATITUDE = "test latitude";
-  public static final String DEFAULT_LONGITUDE = "test longitude";
-  public static final String DEFAULT_DMACODE= "test dmaCode";
-  public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE);
-
-  @Override
-  public void logAccess(CacheKey value) {
-
-  }
-
-  public JSONObject enrich(CacheKey cache ) {
-    JSONObject enriched = new JSONObject();
-    enriched.put("locID", DEFAULT_LOC_ID);
-    enriched.put("country", DEFAULT_COUNTRY);
-    enriched.put("city", DEFAULT_CITY);
-    enriched.put("postalCode", DEFAULT_POSTAL_CODE);
-    enriched.put("latitude", DEFAULT_LATITUDE);
-    enriched.put("longitude", DEFAULT_LONGITUDE);
-    enriched.put("dmaCode", DEFAULT_DMACODE);
-    enriched.put("location_point", DEFAULT_LOCATION_POINT);
-    return enriched;
-  }
-
-  public boolean initializeAdapter() {
-    return true;
-  }
-
-  public void cleanup() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockHBaseConnector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockHBaseConnector.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockHBaseConnector.java
deleted file mode 100644
index f40b366..0000000
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockHBaseConnector.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.mock;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.metron.hbase.Connector;
-import org.apache.metron.hbase.TupleTableConfig;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class MockHBaseConnector extends Connector {
-    static List<Put> puts = Collections.synchronizedList(new ArrayList<Put>());
-    public MockHBaseConnector(TupleTableConfig conf, String _quorum, String _port) throws IOException {
-        super(conf, _quorum, _port);
-    }
-
-    @Override
-    public void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
-        puts.add(put);
-    }
-
-    @Override
-    public void close() {
-
-    }
-    public static void clear() {
-        puts.clear();
-    }
-    public static List<Put> getPuts() {
-        return puts;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockTableProvider.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockTableProvider.java
deleted file mode 100644
index becadce..0000000
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/mock/MockTableProvider.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.integration.mock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.test.mock.MockHTable;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-public class MockTableProvider implements TableProvider, Serializable {
-  static MockHTable.Provider provider = new MockHTable.Provider();
-  @Override
-  public HTableInterface getTable(Configuration config, String tableName) throws IOException {
-    return provider.getTable(config, tableName);
-  }
-  public static void addTable(String tableName, String... cf) {
-    provider.addToCache(tableName, cf);
-  }
-  public static MockHTable getTable(String tableName) {
-    try {
-      return (MockHTable) provider.getTable(null, tableName);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to get table: " + tableName);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/SampleUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/SampleUtil.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/SampleUtil.java
deleted file mode 100644
index 8e293c0..0000000
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/SampleUtil.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.utils;
-
-import org.apache.metron.TestConstants;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
-import org.apache.metron.common.configuration.ParserConfigurations;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class SampleUtil {
-
-  public static Configurations getSampleConfigs() throws IOException {
-    Configurations configurations = new Configurations();
-    configurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
-    return configurations;
-  }
-
-  public static ParserConfigurations getSampleParserConfigs() throws IOException {
-    ParserConfigurations configurations = new ParserConfigurations();
-    configurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
-    Map<String, byte[]> sensorParserConfigs = ConfigurationsUtils.readSensorParserConfigsFromFile(TestConstants.PARSER_CONFIGS_PATH);
-    for(String sensorType: sensorParserConfigs.keySet()) {
-      configurations.updateSensorParserConfig(sensorType, sensorParserConfigs.get(sensorType));
-    }
-    return configurations;
-  }
-
-  public static EnrichmentConfigurations getSampleEnrichmentConfigs() throws IOException {
-    EnrichmentConfigurations configurations = new EnrichmentConfigurations();
-    configurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
-    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
-    for(String sensorType: sensorEnrichmentConfigs.keySet()) {
-      configurations.updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
-    }
-    return configurations;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/pom.xml b/metron-platform/metron-parsers/pom.xml
index c801a7f..47f7bdd 100644
--- a/metron-platform/metron-parsers/pom.xml
+++ b/metron-platform/metron-parsers/pom.xml
@@ -33,8 +33,20 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-writer</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-enrichment</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-enrichment</artifactId>
             <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
@@ -108,12 +120,12 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
-            <version>2.2.3</version>
+            <version>${global_jackson_version}</version>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-annotations</artifactId>
-            <version>2.2.3</version>
+            <version>${global_jackson_version}</version>
         </dependency>
         <dependency>
             <groupId>io.thekraken</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/asa/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/asa/remote.yaml b/metron-platform/metron-parsers/src/main/flux/asa/remote.yaml
index 052728e..828e1a5 100644
--- a/metron-platform/metron-parsers/src/main/flux/asa/remote.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/asa/remote.yaml
@@ -23,7 +23,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.asa.GrokAsaParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/asa/test.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/asa/test.yaml b/metron-platform/metron-parsers/src/main/flux/asa/test.yaml
index c816b45..4694fff 100644
--- a/metron-platform/metron-parsers/src/main/flux/asa/test.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/asa/test.yaml
@@ -23,7 +23,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.asa.GrokAsaParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/fireeye/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/fireeye/remote.yaml b/metron-platform/metron-parsers/src/main/flux/fireeye/remote.yaml
index a745d38..dc1ff6b 100644
--- a/metron-platform/metron-parsers/src/main/flux/fireeye/remote.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/fireeye/remote.yaml
@@ -22,7 +22,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.fireeye.BasicFireEyeParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/fireeye/test.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/fireeye/test.yaml b/metron-platform/metron-parsers/src/main/flux/fireeye/test.yaml
index 9f4c06f..f676b2a 100644
--- a/metron-platform/metron-parsers/src/main/flux/fireeye/test.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/fireeye/test.yaml
@@ -22,7 +22,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.fireeye.BasicFireEyeParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/ise/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/ise/remote.yaml b/metron-platform/metron-parsers/src/main/flux/ise/remote.yaml
index 53ed5fc..49c4c19 100644
--- a/metron-platform/metron-parsers/src/main/flux/ise/remote.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/ise/remote.yaml
@@ -22,7 +22,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.ise.BasicIseParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/ise/test.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/ise/test.yaml b/metron-platform/metron-parsers/src/main/flux/ise/test.yaml
index 2e88594..8b292e0 100644
--- a/metron-platform/metron-parsers/src/main/flux/ise/test.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/ise/test.yaml
@@ -22,7 +22,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.ise.BasicIseParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/lancope/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/lancope/remote.yaml b/metron-platform/metron-parsers/src/main/flux/lancope/remote.yaml
index 4fe7c3b..f60e86a 100644
--- a/metron-platform/metron-parsers/src/main/flux/lancope/remote.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/lancope/remote.yaml
@@ -22,7 +22,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.lancope.BasicLancopeParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/lancope/test.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/lancope/test.yaml b/metron-platform/metron-parsers/src/main/flux/lancope/test.yaml
index eb8a1ef..f6ade8e 100644
--- a/metron-platform/metron-parsers/src/main/flux/lancope/test.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/lancope/test.yaml
@@ -22,7 +22,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.lancope.BasicLancopeParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/paloalto/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/paloalto/remote.yaml b/metron-platform/metron-parsers/src/main/flux/paloalto/remote.yaml
index 4287fce..74ea3ad 100644
--- a/metron-platform/metron-parsers/src/main/flux/paloalto/remote.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/paloalto/remote.yaml
@@ -22,7 +22,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.paloalto.BasicPaloAltoFirewallParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/paloalto/test.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/paloalto/test.yaml b/metron-platform/metron-parsers/src/main/flux/paloalto/test.yaml
index cef5dc5..160b0df 100644
--- a/metron-platform/metron-parsers/src/main/flux/paloalto/test.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/paloalto/test.yaml
@@ -22,7 +22,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.paloalto.BasicPaloAltoFirewallParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/sourcefire/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/sourcefire/remote.yaml b/metron-platform/metron-parsers/src/main/flux/sourcefire/remote.yaml
index 312dce0..a79bfde 100644
--- a/metron-platform/metron-parsers/src/main/flux/sourcefire/remote.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/sourcefire/remote.yaml
@@ -22,7 +22,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.sourcefire.BasicSourcefireParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/sourcefire/test.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/sourcefire/test.yaml b/metron-platform/metron-parsers/src/main/flux/sourcefire/test.yaml
index a0a00d0..5a6516d 100644
--- a/metron-platform/metron-parsers/src/main/flux/sourcefire/test.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/sourcefire/test.yaml
@@ -22,7 +22,7 @@ components:
     -   id: "parser"
         className: "org.apache.metron.parsers.sourcefire.BasicSourcefireParser"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/websphere/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/websphere/remote.yaml b/metron-platform/metron-parsers/src/main/flux/websphere/remote.yaml
index 7d1c103..9f925dd 100644
--- a/metron-platform/metron-parsers/src/main/flux/websphere/remote.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/websphere/remote.yaml
@@ -32,7 +32,7 @@ components:
                 args:
                     - "yyyy MMM dd HH:mm:ss"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/flux/websphere/test.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/flux/websphere/test.yaml b/metron-platform/metron-parsers/src/main/flux/websphere/test.yaml
index 045fc0e..20ac9ca 100644
--- a/metron-platform/metron-parsers/src/main/flux/websphere/test.yaml
+++ b/metron-platform/metron-parsers/src/main/flux/websphere/test.yaml
@@ -32,7 +32,7 @@ components:
                 args:
                     - "yyyy MMM dd HH:mm:ss"
     -   id: "writer"
-        className: "org.apache.metron.parsers.writer.KafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         constructorArgs:
             - "${kafka.broker}"
     -   id: "zkHosts"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index aebb8d7..6c40768 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -26,29 +26,19 @@ import backtype.storm.tuple.Values;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
 import org.apache.metron.common.configuration.FieldValidator;
-import org.apache.metron.common.configuration.ParserConfigurations;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
-import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.interfaces.BulkMessageWriter;
-import org.apache.metron.common.writer.BulkWriterComponent;
-import org.apache.metron.common.writer.WriterToBulkWriter;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.common.configuration.FieldTransformer;
-import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.parsers.filters.GenericMessageFilter;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.common.interfaces.MessageWriter;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.*;
-import java.util.function.Function;
 
 public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 



Mime
View raw message