metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [01/15] incubator-metron git commit: METRON-142 Simplify Parser configuration (merrimanr via cestella) closes apache/incubator-metron#120
Date Mon, 16 May 2016 18:07:42 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 00f8588d8 -> df8d682e8


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
new file mode 100644
index 0000000..68ee36b
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.topology;
+
+import backtype.storm.topology.TopologyBuilder;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.spout.kafka.SpoutConfig;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.bolt.ParserBolt;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.writer.KafkaWriter;
+import org.json.simple.JSONObject;
+import storm.kafka.KafkaSpout;
+import storm.kafka.ZkHosts;
+
+public class ParserTopologyBuilder {
+
+  public static TopologyBuilder build(String zookeeperUrl,
+                         String brokerUrl,
+                         String sensorType,
+                         SpoutConfig.Offset offset,
+                         int spoutParallelism,
+                         int parserParallelism) throws Exception {
+    CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
+    client.start();
+    SensorParserConfig sensorParserConfig = ConfigurationsUtils.readSensorParserConfigFromZookeeper(sensorType, client);
+    client.close();
+    String sensorTopic = sensorParserConfig.getSensorTopic() != null ? sensorParserConfig.getSensorTopic() : sensorType;
+    TopologyBuilder builder = new TopologyBuilder();
+    ZkHosts zkHosts = new ZkHosts(zookeeperUrl);
+    SpoutConfig spoutConfig = new SpoutConfig(zkHosts, sensorTopic, "", sensorTopic).from(offset);
+    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+    builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism);
+    MessageParser<JSONObject> parser = ReflectionUtils.createInstance(sensorParserConfig.getParserClassName());
+    parser.configure(sensorParserConfig.getParserConfig());
+    KafkaWriter writer = new KafkaWriter(brokerUrl);
+    ParserBolt parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, writer);
+    builder.setBolt("parserBolt", parserBolt, parserParallelism).shuffleGrouping("kafkaSpout");
+    return builder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
new file mode 100644
index 0000000..b26cbc3
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.topology;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.metron.common.spout.kafka.SpoutConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParserTopologyCLI {
+
+  public static void main(String[] args) {
+    Options options = new Options();
+    {
+      Option o = new Option("h", "help", false, "This screen");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("z", "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+      o.setArgName("ZK_QUORUM");
+      o.setRequired(true);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("k", "kafka", true, "Kafka Broker URL");
+      o.setArgName("BROKER_URL");
+      o.setRequired(true);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("s", "sensor", true, "Sensor Type");
+      o.setArgName("SENSOR_TYPE");
+      o.setRequired(true);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("sp", "spout_p", true, "Spout Parallelism");
+      o.setArgName("SPOUT_PARALLELISM");
+      o.setRequired(false);
+      o.setType(Number.class);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("pp", "parser_p", true, "Parser Parallelism");
+      o.setArgName("PARSER_PARALLELISM");
+      o.setRequired(false);
+      o.setType(Number.class);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("t", "test", true, "Run in Test Mode");
+      o.setArgName("TEST");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    try {
+      CommandLineParser parser = new PosixParser();
+      CommandLine cmd = null;
+      try {
+        cmd = parser.parse(options, args);
+      } catch (ParseException pe) {
+        pe.printStackTrace();
+        final HelpFormatter usageFormatter = new HelpFormatter();
+        usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true);
+        System.exit(-1);
+      }
+      if (cmd.hasOption("h")) {
+        final HelpFormatter usageFormatter = new HelpFormatter();
+        usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true);
+        System.exit(0);
+      }
+      String zookeeperUrl = cmd.getOptionValue("z");
+      String brokerUrl = cmd.getOptionValue("k");
+      String sensoryType = cmd.getOptionValue("s");
+      int spoutParallelism = Integer.parseInt(cmd.getOptionValue("sp", "1"));
+      int parserParallelism = Integer.parseInt(cmd.getOptionValue("pp", "1"));
+      SpoutConfig.Offset offset = cmd.hasOption("t") ? SpoutConfig.Offset.BEGINNING : SpoutConfig.Offset.WHERE_I_LEFT_OFF;
+      TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
+              brokerUrl,
+              sensoryType,
+              offset,
+              spoutParallelism,
+              parserParallelism);
+      if (cmd.hasOption("t")) {
+        Map<String, Object> stormConf = new HashMap<>();
+        stormConf.put(Config.TOPOLOGY_DEBUG, true);
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(sensoryType, stormConf, builder.createTopology());
+        Utils.sleep(300000);
+        cluster.shutdown();
+      } else {
+        StormSubmitter.submitTopology(sensoryType, new HashMap<>(), builder.createTopology());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      System.exit(-1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
index 254b97a..178719b 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
@@ -29,10 +29,6 @@ public class GrokWebSphereParser extends GrokParser {
 
 	private static final long serialVersionUID = 4860439408055777358L;
 
-	public GrokWebSphereParser(String grokHdfsPath, String patternLabel) {
-		super(grokHdfsPath, patternLabel);
-	}
-
 	@Override
 	protected long formatTimestamp(Object value) {
 		long epochTimestamp = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh b/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh
index c5a5629..05fb9f7 100755
--- a/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh
+++ b/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh
@@ -19,4 +19,4 @@
 METRON_VERSION=${project.version}
 METRON_HOME=/usr/metron/$METRON_VERSION
 TOPOLOGY_JAR=metron-parsers-$METRON_VERSION.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/$1/remote.yaml --filter $METRON_HOME/config/parsers.properties
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.metron.parsers.topology.ParserTopologyCLI "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
index 7abcc87..b1deb0e 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
@@ -27,10 +27,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public abstract class GrokParserTest {
 
@@ -44,17 +41,14 @@ public abstract class GrokParserTest {
 
   @Test
   public void test() throws IOException, ParseException {
-    String metronHdfsHome = "";
-    GrokParser grokParser = new GrokParser(getGrokPath(), getGrokPatternLabel());
-    String[] timeFields = getTimeFields();
-    if (timeFields != null) {
-      grokParser.withTimeFields(getTimeFields());
-    }
-    String dateFormat = getDateFormat();
-    if (dateFormat != null) {
-      grokParser.withDateFormat(getDateFormat());
-    }
-    grokParser.withTimestampField(getTimestampField());
+    Map<String, Object> parserConfig = new HashMap<>();
+    parserConfig.put("grokPath", getGrokPath());
+    parserConfig.put("patternLabel", getGrokPatternLabel());
+    parserConfig.put("timestampField", getTimestampField());
+    parserConfig.put("dateFormat", getDateFormat());
+    parserConfig.put("timeFields", getTimeFields());
+    GrokParser grokParser = new GrokParser();
+    grokParser.configure(parserConfig);
     grokParser.init();
     byte[] rawMessage = getRawMessage().getBytes();
     List<JSONObject> parsedList = grokParser.parse(rawMessage);
@@ -91,7 +85,7 @@ public abstract class GrokParserTest {
   public abstract String getExpectedParsedString();
   public abstract String getGrokPath();
   public abstract String getGrokPatternLabel();
-  public abstract String[] getTimeFields();
+  public abstract List<String> getTimeFields();
   public abstract String getDateFormat();
   public abstract String getTimestampField();
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
index 6f489a4..fe11c3f 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java
@@ -19,6 +19,9 @@ package org.apache.metron.parsers;
 
 import org.adrianwalker.multilinestring.Multiline;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class SampleGrokParserTest extends GrokParserTest {
 
   /**
@@ -61,15 +64,17 @@ public class SampleGrokParserTest extends GrokParserTest {
   }
 
   public String getGrokPath() {
-    return "../metron-integration-test/src/main/resources/sample/patterns/test";
+    return "../metron-integration-test/src/main/sample/patterns/test";
   }
 
   public String getGrokPatternLabel() {
     return "YAF_DELIMITED";
   }
 
-  public String[] getTimeFields() {
-    return new String[]{"end_time"};
+  public List<String> getTimeFields() {
+    return new ArrayList<String>() {{
+      add("end_time");
+    }};
   }
 
   public String getDateFormat() {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
index 1218595..d97025e 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java
@@ -19,6 +19,9 @@ package org.apache.metron.parsers;
 
 import org.adrianwalker.multilinestring.Multiline;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class SquidParserTest extends GrokParserTest {
 
   @Override
@@ -59,8 +62,8 @@ public class SquidParserTest extends GrokParserTest {
   }
 
   @Override
-  public String[] getTimeFields() {
-    return new String[0];
+  public List<String> getTimeFields() {
+    return new ArrayList<>();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
index 9e887f5..1c1da4d 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java
@@ -19,6 +19,9 @@ package org.apache.metron.parsers;
 
 import org.adrianwalker.multilinestring.Multiline;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class YafParserTest extends GrokParserTest {
 
   @Override
@@ -74,8 +77,11 @@ public class YafParserTest extends GrokParserTest {
   }
 
   @Override
-  public String[] getTimeFields() {
-    return new String[]{"start_time", "end_time"};
+  public List<String> getTimeFields() {
+    return new ArrayList<String>() {{
+      add("start_time");
+      add("end_time");
+    }};
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/BluecoatIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/BluecoatIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/BluecoatIntegrationTest.java
index 8b6207c..e732761 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/BluecoatIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/BluecoatIntegrationTest.java
@@ -17,32 +17,21 @@
  */
 package org.apache.metron.parsers.integration;
 
-import org.apache.metron.TestConstants;
+import org.apache.metron.parsers.integration.validation.SampleDataValidation;
 
-public class BluecoatIntegrationTest extends ParserIntegrationTest {
-
-  @Override
-  public String getFluxPath() {
-    return "./src/main/flux/bluecoat/test.yaml";
-  }
-
-  @Override
-  public String getSampleInputPath() {
-    return TestConstants.SAMPLE_DATA_INPUT_PATH + "BluecoatSyslog.txt";
-  }
-
-  @Override
-  public String getSampleParsedPath() {
-    return TestConstants.SAMPLE_DATA_PARSED_PATH + "BluecoatParsed";
-  }
+import java.util.ArrayList;
+import java.util.List;
 
+public class BluecoatIntegrationTest extends ParserIntegrationTest {
   @Override
-  public String getSensorType() {
+  String getSensorType() {
     return "bluecoat";
   }
 
   @Override
-  public String getFluxTopicProperty() {
-    return "spout.kafka.topic.yaf";
+  List<ParserValidation> getValidations() {
+    return new ArrayList<ParserValidation>() {{
+      add(new SampleDataValidation());
+    }};
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/BroIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/BroIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/BroIntegrationTest.java
new file mode 100644
index 0000000..c9cfe0a
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/BroIntegrationTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.integration;
+
+import org.apache.metron.parsers.integration.validation.SampleDataValidation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BroIntegrationTest extends ParserIntegrationTest {
+  @Override
+  String getSensorType() {
+    return "bro";
+  }
+
+  @Override
+  List<ParserValidation> getValidations() {
+    return new ArrayList<ParserValidation>() {{
+      add(new SampleDataValidation());
+    }};
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
index e0b9432..116e262 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
@@ -17,63 +17,61 @@
  */
 package org.apache.metron.parsers.integration;
 
+import junit.framework.Assert;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
 import org.apache.metron.integration.BaseIntegrationTest;
-import org.apache.metron.integration.utils.TestUtils;
-import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.Processor;
 import org.apache.metron.integration.ReadinessState;
-import org.apache.metron.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.components.KafkaWithZKComponent;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.junit.Assert;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
+import org.apache.metron.parsers.integration.validation.SampleDataValidation;
+import org.apache.metron.test.TestDataType;
+import org.apache.metron.test.utils.SampleDataUtils;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 public abstract class ParserIntegrationTest extends BaseIntegrationTest {
 
-  public abstract String getFluxPath();
-  public abstract String getSampleInputPath();
-  public abstract String getSampleParsedPath();
-  public abstract String getSensorType();
-  public abstract String getFluxTopicProperty();
-
   @Test
   public void test() throws Exception {
-
-    final String kafkaTopic = getSensorType();
-
-    final List<byte[]> inputMessages = TestUtils.readSampleData(getSampleInputPath());
+    final String sensorType = getSensorType();
+    final List<byte[]> inputMessages = TestUtils.readSampleData(SampleDataUtils.getSampleDataPath(sensorType, TestDataType.RAW));
 
     final Properties topologyProperties = new Properties();
     final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
-      add(new KafkaWithZKComponent.Topic(kafkaTopic, 1));
+      add(new KafkaWithZKComponent.Topic(sensorType, 1));
     }});
-
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
-    FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
-            .withTopologyLocation(new File(getFluxPath()))
-            .withTopologyName("test")
+
+    ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
             .withTopologyProperties(topologyProperties)
-            .build();
+            .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
+            .withParserConfigsPath(TestConstants.PARSER_CONFIGS_PATH);
+
+    ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
+            .withSensorType(sensorType)
+            .withTopologyProperties(topologyProperties)
+            .withBrokerUrl(kafkaComponent.getBrokerList()).build();
 
     UnitTestHelper.verboseLogging();
     ComponentRunner runner = new ComponentRunner.Builder()
             .withComponent("kafka", kafkaComponent)
-            .withComponent("storm", fluxComponent)
+            .withComponent("config", configUploadComponent)
+            .withComponent("storm", parserTopologyComponent)
             .withMillisecondsBetweenAttempts(5000)
             .withNumRetries(10)
             .build();
     runner.start();
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(kafkaTopic, inputMessages);
+    kafkaComponent.writeMessages(sensorType, inputMessages);
     List<byte[]> outputMessages =
             runner.process(new Processor<List<byte[]>>() {
               List<byte[]> messages = null;
@@ -93,43 +91,29 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest {
                 return messages;
               }
             });
-    List<byte[]> sampleParsedMessages = TestUtils.readSampleData(getSampleParsedPath());
-    Assert.assertEquals(sampleParsedMessages.size(), outputMessages.size());
-    for (int i = 0; i < outputMessages.size(); i++) {
-      String sampleParsedMessage = new String(sampleParsedMessages.get(i));
-      String outputMessage = new String(outputMessages.get(i));
-      try {
-        assertJSONEqual(sampleParsedMessage, outputMessage);
-      } catch (Throwable t) {
-        System.out.println("expected: " + sampleParsedMessage);
-        System.out.println("actual: " + outputMessage);
-        throw t;
+    List<ParserValidation> validations = getValidations();
+    if (validations == null || validations.isEmpty()) {
+      System.out.println("No validations configured for sensorType " + sensorType + ".  Dumping parsed messages");
+      System.out.println();
+      dumpParsedMessages(outputMessages);
+      System.out.println();
+      Assert.fail();
+    } else {
+      for (ParserValidation validation : validations) {
+        System.out.println("Running " + validation.getName() + " on sensorType " + sensorType);
+        validation.validate(sensorType, outputMessages);
       }
     }
     runner.stop();
-
   }
 
-  public static void assertJSONEqual(String doc1, String doc2) throws IOException {
-    ObjectMapper mapper = new ObjectMapper();
-    Map m1 = mapper.readValue(doc1, Map.class);
-    Map m2 = mapper.readValue(doc2, Map.class);
-    for(Object k : m1.keySet()) {
-      Object v1 = m1.get(k);
-      Object v2 = m2.get(k);
-
-      if(v2 == null) {
-        Assert.fail("Unable to find key: " + k + " in output");
-      }
-      if(k.equals("timestamp")) {
-        //TODO: Take the ?!?@ timestamps out of the reference file.
-        Assert.assertEquals(v1.toString().length(), v2.toString().length());
-      }
-      else if(!v2.equals(v1)) {
-        Assert.assertEquals("value mismatch for " + k ,v1, v2);
-      }
+  public void dumpParsedMessages(List<byte[]> outputMessages) {
+    for (byte[] outputMessage : outputMessages) {
+      System.out.println(new String(outputMessage));
     }
-    Assert.assertEquals(m1.size(), m2.size());
   }
 
+  abstract String getSensorType();
+  abstract List<ParserValidation> getValidations();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserValidation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserValidation.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserValidation.java
new file mode 100644
index 0000000..a92fd32
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserValidation.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.integration;
+
+import java.util.List;
+
+public interface ParserValidation {
+
+  String getName();
+  void validate(String sensorType, List<byte[]> actualMessages) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/SnortIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/SnortIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/SnortIntegrationTest.java
index 983f7e3..244dd89 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/SnortIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/SnortIntegrationTest.java
@@ -17,32 +17,21 @@
  */
 package org.apache.metron.parsers.integration;
 
-import org.apache.metron.TestConstants;
+import org.apache.metron.parsers.integration.validation.SampleDataValidation;
 
-public class SnortIntegrationTest extends ParserIntegrationTest {
-
-  @Override
-  public String getFluxPath() {
-    return "./src/main/flux/snort/test.yaml";
-  }
-
-  @Override
-  public String getSampleInputPath() {
-    return TestConstants.SAMPLE_DATA_INPUT_PATH + "SnortOutput";
-  }
-
-  @Override
-  public String getSampleParsedPath() {
-    return TestConstants.SAMPLE_DATA_PARSED_PATH + "SnortParsed";
-  }
+import java.util.ArrayList;
+import java.util.List;
 
+public class SnortIntegrationTest extends ParserIntegrationTest {
   @Override
-  public String getSensorType() {
+  String getSensorType() {
     return "snort";
   }
 
   @Override
-  public String getFluxTopicProperty() {
-    return "spout.kafka.topic.snort";
+  List<ParserValidation> getValidations() {
+    return new ArrayList<ParserValidation>() {{
+      add(new SampleDataValidation());
+    }};
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/SquidIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/SquidIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/SquidIntegrationTest.java
index 023278d..9c03e81 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/SquidIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/SquidIntegrationTest.java
@@ -17,33 +17,21 @@
  */
 package org.apache.metron.parsers.integration;
 
-import org.apache.metron.TestConstants;
-import org.json.simple.JSONObject;
+import org.apache.metron.parsers.integration.validation.SampleDataValidation;
 
-public class SquidIntegrationTest extends ParserIntegrationTest {
-
-  @Override
-  public String getFluxPath() {
-    return "./src/main/flux/squid/test.yaml";
-  }
-
-  @Override
-  public String getSampleInputPath() {
-    return TestConstants.SAMPLE_DATA_INPUT_PATH + "SquidExampleOutput";
-  }
-
-  @Override
-  public String getSampleParsedPath() {
-    return TestConstants.SAMPLE_DATA_PARSED_PATH + "SquidExampleParsed";
-  }
+import java.util.ArrayList;
+import java.util.List;
 
+public class SquidIntegrationTest extends ParserIntegrationTest {
   @Override
-  public String getSensorType() {
+  String getSensorType() {
     return "squid";
   }
 
   @Override
-  public String getFluxTopicProperty() {
-    return "spout.kafka.topic.yaf";
+  List<ParserValidation> getValidations() {
+    return new ArrayList<ParserValidation>() {{
+      add(new SampleDataValidation());
+    }};
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/WebSphereIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/WebSphereIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/WebSphereIntegrationTest.java
index 04761b2..5dc0ac6 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/WebSphereIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/WebSphereIntegrationTest.java
@@ -18,33 +18,23 @@
 
 package org.apache.metron.parsers.integration;
 
-import org.apache.metron.TestConstants;
+import org.apache.metron.parsers.integration.validation.SampleDataValidation;
 
-public class WebSphereIntegrationTest extends ParserIntegrationTest {
-	
-	  @Override
-	  public String getFluxPath() {
-	    return "./src/main/flux/websphere/test.yaml";
-	  }
-
-	  @Override
-	  public String getSampleInputPath() {
-	    return TestConstants.SAMPLE_DATA_INPUT_PATH + "WebsphereOutput.txt";
-	  }
+import java.util.ArrayList;
+import java.util.List;
 
-	  @Override
-	  public String getSampleParsedPath() {
-	    return TestConstants.SAMPLE_DATA_PARSED_PATH + "WebsphereParsed";
-	  }
-
-	  @Override
-	  public String getSensorType() {
-	    return "websphere";
-	  }
+public class WebSphereIntegrationTest extends ParserIntegrationTest {
 
-	  @Override
-	  public String getFluxTopicProperty() {
-	    return "spout.kafka.topic.websphere";
-	  }
+	@Override
+	public String getSensorType() {
+		return "websphere";
+	}
+
+	@Override
+	List<ParserValidation> getValidations() {
+		return new ArrayList<ParserValidation>() {{
+			add(new SampleDataValidation());
+		}};
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/YafIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/YafIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/YafIntegrationTest.java
index 67fe2d6..9930c2c 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/YafIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/YafIntegrationTest.java
@@ -17,32 +17,21 @@
  */
 package org.apache.metron.parsers.integration;
 
-import org.apache.metron.TestConstants;
+import org.apache.metron.parsers.integration.validation.SampleDataValidation;
 
-public class YafIntegrationTest extends ParserIntegrationTest {
-
-  @Override
-  public String getFluxPath() {
-    return "./src/main/flux/yaf/test.yaml";
-  }
-
-  @Override
-  public String getSampleInputPath() {
-    return TestConstants.SAMPLE_DATA_INPUT_PATH + "YafExampleOutput";
-  }
-
-  @Override
-  public String getSampleParsedPath() {
-    return TestConstants.SAMPLE_DATA_PARSED_PATH + "YafExampleParsed";
-  }
+import java.util.ArrayList;
+import java.util.List;
 
+public class YafIntegrationTest extends ParserIntegrationTest {
   @Override
-  public String getSensorType() {
+  String getSensorType() {
     return "yaf";
   }
 
   @Override
-  public String getFluxTopicProperty() {
-    return "spout.kafka.topic.yaf";
+  List<ParserValidation> getValidations() {
+    return new ArrayList<ParserValidation>() {{
+      add(new SampleDataValidation());
+    }};
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
new file mode 100644
index 0000000..a5c6659
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.integration.components;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.topology.TopologyBuilder;
+import org.apache.metron.common.spout.kafka.SpoutConfig;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.integration.UnableToStartException;
+import org.apache.metron.parsers.topology.ParserTopologyBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class ParserTopologyComponent implements InMemoryComponent {
+
+  private Properties topologyProperties;
+  private String brokerUrl;
+  private String sensorType;
+  private LocalCluster stormCluster;
+
+  public static class Builder {
+    Properties topologyProperties;
+    String brokerUrl;
+    String sensorType;
+    public Builder withTopologyProperties(Properties topologyProperties) {
+      this.topologyProperties = topologyProperties;
+      return this;
+    }
+    public Builder withBrokerUrl(String brokerUrl) {
+      this.brokerUrl = brokerUrl;
+      return this;
+    }
+    public Builder withSensorType(String sensorType) {
+      this.sensorType = sensorType;
+      return this;
+    }
+
+    public ParserTopologyComponent build() {
+      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType);
+    }
+  }
+
+  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType) {
+    this.topologyProperties = topologyProperties;
+    this.brokerUrl = brokerUrl;
+    this.sensorType = sensorType;
+  }
+
+  @Override
+  public void start() throws UnableToStartException {
+    try {
+      TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty("kafka.zk"), brokerUrl, sensorType, SpoutConfig.Offset.BEGINNING, 1, 1);
+      Map<String, Object> stormConf = new HashMap<>();
+      stormConf.put(Config.TOPOLOGY_DEBUG, true);
+      stormCluster = new LocalCluster();
+      stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.createTopology());
+    } catch (Exception e) {
+      throw new UnableToStartException("Unable to start parser topology for sensorType: " + sensorType, e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    stormCluster.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/validation/SampleDataValidation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/validation/SampleDataValidation.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/validation/SampleDataValidation.java
new file mode 100644
index 0000000..9ea9b71
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/validation/SampleDataValidation.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.integration.validation;
+
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.parsers.integration.ParserValidation;
+import org.apache.metron.test.TestDataType;
+import org.apache.metron.test.utils.SampleDataUtils;
+import org.apache.metron.test.utils.ValidationUtils;
+import org.junit.Assert;
+
+import java.util.List;
+
+public class SampleDataValidation implements ParserValidation {
+
+  @Override
+  public String getName() {
+    return "Sample Data Validation";
+  }
+
+  @Override
+  public void validate(String sensorType, List<byte[]> actualMessages) throws Exception {
+    List<byte[]> expectedMessages = TestUtils.readSampleData(SampleDataUtils.getSampleDataPath(sensorType, TestDataType.PARSED));
+    Assert.assertEquals(expectedMessages.size(), actualMessages.size());
+    for (int i = 0; i < actualMessages.size(); i++) {
+      String expectedMessage = new String(expectedMessages.get(i));
+      String actualMessage = new String(actualMessages.get(i));
+      try {
+        ValidationUtils.assertJSONEqual(expectedMessage, actualMessage);
+      } catch (Throwable t) {
+        System.out.println("expected: " + expectedMessage);
+        System.out.println("actual: " + actualMessage);
+        throw t;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
index b90c906..a4789cd 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java
@@ -19,23 +19,34 @@
 package org.apache.metron.parsers.websphere;
 
 import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
 import org.json.simple.JSONObject;
+import org.junit.Before;
 import org.junit.Test;
 
 public class GrokWebSphereParserTest {
 
-	private final String grokPath = "../metron-parsers/src/main/resources/patterns/websphere";
-	private final String grokLabel = "WEBSPHERE";
-	private final String dateFormat = "yyyy MMM dd HH:mm:ss";
-	private final String timestampField = "timestamp_string";
+	private Map<String, Object> parserConfig;
+
+	@Before
+	public void setup() {
+		parserConfig = new HashMap<>();
+		parserConfig.put("grokPath", "../metron-parsers/src/main/resources/patterns/websphere");
+		parserConfig.put("patternLabel", "WEBSPHERE");
+		parserConfig.put("timestampField", "timestamp_string");
+		parserConfig.put("dateFormat", "yyyy MMM dd HH:mm:ss");
+	}
 	
 	@Test
 	public void testParseLoginLine() throws Exception {
 		
 		//Set up parser, parse message
-		GrokWebSphereParser parser = new GrokWebSphereParser(grokPath, grokLabel);
-		parser.withDateFormat(dateFormat).withTimestampField(timestampField);
+		GrokWebSphereParser parser = new GrokWebSphereParser();
+		parser.configure(parserConfig);
 		String testString = "<133>Apr 15 17:47:28 ABCXML1413 [rojOut][0x81000033][auth][notice] user(rick007): "
 				+ "[120.43.200.6]: User logged into 'cohlOut'.";
 		List<JSONObject> result = parser.parse(testString.getBytes());
@@ -58,8 +69,8 @@ public class GrokWebSphereParserTest {
 	public void tetsParseLogoutLine() throws Exception {
 		
 		//Set up parser, parse message
-		GrokWebSphereParser parser = new GrokWebSphereParser(grokPath, grokLabel);
-		parser.withDateFormat(dateFormat).withTimestampField(timestampField);
+		GrokWebSphereParser parser = new GrokWebSphereParser();
+		parser.configure(parserConfig);
 		String testString = "<134>Apr 15 18:02:27 PHIXML3RWD [0x81000019][auth][info] [14.122.2.201]: "
 				+ "User 'hjpotter' logged out from 'default'.";
 		List<JSONObject> result = parser.parse(testString.getBytes());
@@ -81,8 +92,8 @@ public class GrokWebSphereParserTest {
 	public void tetsParseRBMLine() throws Exception {
 		
 		//Set up parser, parse message
-		GrokWebSphereParser parser = new GrokWebSphereParser(grokPath, grokLabel);
-		parser.withDateFormat(dateFormat).withTimestampField(timestampField);
+		GrokWebSphereParser parser = new GrokWebSphereParser();
+		parser.configure(parserConfig);
 		String testString = "<131>Apr 15 17:36:35 ROBXML3QRS [0x80800018][auth][error] rbm(RBM-Settings): "
 				+ "trans(3502888135)[request] gtid(3502888135): RBM: Resource access denied.";
 		List<JSONObject> result = parser.parse(testString.getBytes());
@@ -103,8 +114,8 @@ public class GrokWebSphereParserTest {
 	public void tetsParseOtherLine() throws Exception {
 		
 		//Set up parser, parse message
-		GrokWebSphereParser parser = new GrokWebSphereParser(grokPath, grokLabel);
-		parser.withDateFormat(dateFormat).withTimestampField(timestampField);
+		GrokWebSphereParser parser = new GrokWebSphereParser();
+		parser.configure(parserConfig);
 		String testString = "<134>Apr 15 17:17:34 SAGPXMLQA333 [0x8240001c][audit][info] trans(191): (admin:default:system:*): "
 				+ "ntp-service 'NTP Service' - Operational state down";
 		List<JSONObject> result = parser.parse(testString.getBytes());
@@ -125,8 +136,8 @@ public class GrokWebSphereParserTest {
 	public void testParseMalformedLoginLine() throws Exception {
 		
 		//Set up parser, attempt to parse malformed message
-		GrokWebSphereParser parser = new GrokWebSphereParser(grokPath, grokLabel);
-		parser.withDateFormat(dateFormat).withTimestampField(timestampField);
+		GrokWebSphereParser parser = new GrokWebSphereParser();
+		parser.configure(parserConfig);
 		String testString = "<133>Apr 15 17:47:28 ABCXML1413 [rojOut][0x81000033][auth][notice] rick007): "
 				+ "[120.43.200. User logged into 'cohlOut'.";
 		List<JSONObject> result = parser.parse(testString.getBytes());		
@@ -149,8 +160,8 @@ public class GrokWebSphereParserTest {
 	public void tetsParseMalformedLogoutLine() throws Exception {
 		
 		//Set up parser, attempt to parse malformed message
-		GrokWebSphereParser parser = new GrokWebSphereParser(grokPath, grokLabel);
-		parser.withDateFormat(dateFormat).withTimestampField(timestampField);
+		GrokWebSphereParser parser = new GrokWebSphereParser();
+		parser.configure(parserConfig);
 		String testString = "<134>Apr 15 18:02:27 PHIXML3RWD [0x81000019][auth][info] [14.122.2.201: "
 				+ "User 'hjpotter' logged out from 'default.";
 		List<JSONObject> result = parser.parse(testString.getBytes());
@@ -172,8 +183,8 @@ public class GrokWebSphereParserTest {
 	public void tetsParseMalformedRBMLine() throws Exception {
 		
 		//Set up parser, parse message
-		GrokWebSphereParser parser = new GrokWebSphereParser(grokPath, grokLabel);
-		parser.withDateFormat(dateFormat).withTimestampField(timestampField);
+		GrokWebSphereParser parser = new GrokWebSphereParser();
+		parser.configure(parserConfig);
 		String testString = "<131>Apr 15 17:36:35 ROBXML3QRS [0x80800018][auth][error] rbmRBM-Settings): "
 				+ "trans3502888135)[request] gtid3502888135) RBM: Resource access denied.";
 		List<JSONObject> result = parser.parse(testString.getBytes());
@@ -194,8 +205,8 @@ public class GrokWebSphereParserTest {
 	public void tetsParseMalformedOtherLine() throws Exception {
 		
 		//Set up parser, parse message
-		GrokWebSphereParser parser = new GrokWebSphereParser(grokPath, grokLabel);
-		parser.withDateFormat(dateFormat).withTimestampField(timestampField);
+		GrokWebSphereParser parser = new GrokWebSphereParser();
+		parser.configure(parserConfig);
 		String testString = "<134>Apr 15 17:17:34 SAGPXMLQA333 [0x8240001c][audit][info] trans 191)  admindefaultsystem*): "
 				+ "ntp-service 'NTP Service' - Operational state down:";
 		List<JSONObject> result = parser.parse(testString.getBytes());
@@ -218,7 +229,8 @@ public class GrokWebSphereParserTest {
 	public void testParseEmptyLine() throws Exception {
 		
 		//Set up parser, attempt to parse malformed message
-		GrokWebSphereParser parser = new GrokWebSphereParser(grokPath, grokLabel);
+		GrokWebSphereParser parser = new GrokWebSphereParser();
+		parser.configure(parserConfig);
 		String testString = "";
 		List<JSONObject> result = parser.parse(testString.getBytes());		
 		assertEquals(null, result);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 587ec7c..1142da6 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -195,7 +195,7 @@ public class PcapTopologyIntegrationTest {
     //Assert.assertEquals(0, numFiles(outDir));
     Assert.assertNotNull(topologiesDir);
     Assert.assertNotNull(targetDir);
-    Path pcapFile = new Path("../metron-integration-test/src/main/resources/sample/data/SampleInput/PCAPExampleOutput");
+    Path pcapFile = new Path("../metron-integration-test/src/main/sample/data/SampleInput/PCAPExampleOutput");
     final List<Map.Entry<byte[], byte[]>> pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders));
     Assert.assertTrue(Iterables.size(pcapEntries) > 0);
     final Properties topologyProperties = new Properties() {{

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapHelperTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapHelperTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapHelperTest.java
index b86aaac..5d2bee0 100644
--- a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapHelperTest.java
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/PcapHelperTest.java
@@ -57,7 +57,7 @@ public class PcapHelperTest {
 
   @Test
   public void testLittleEndianHeaderization() throws Exception {
-    String pcapSampleFiles = "../metron-integration-test/src/main/resources/sample/data/SampleInput/PCAPExampleOutput";
+    String pcapSampleFiles = "../metron-integration-test/src/main/sample/data/SampleInput/PCAPExampleOutput";
     List<byte[]> pcaps = readSamplePackets(pcapSampleFiles);
     for(byte[] pcap : pcaps)
     {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
index 45d5615..bdf61e8 100644
--- a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
+++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
@@ -19,6 +19,7 @@ package org.apache.metron.solr.writer;
 
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.interfaces.BulkMessageWriter;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.response.UpdateResponse;
@@ -53,7 +54,7 @@ public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public void init(Map stormConf, Configurations configurations) throws IOException, SolrServerException {
+  public void init(Map stormConf, EnrichmentConfigurations configurations) throws IOException, SolrServerException {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
     if(solr == null) solr = new MetronSolrClient((String) globalConfiguration.get("solr.zookeeper"));
     String collection = getCollection(configurations);
@@ -62,7 +63,7 @@ public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public void write(String sourceType, Configurations configurations, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
+  public void write(String sourceType, EnrichmentConfigurations configurations, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
     for(JSONObject message: messages) {
       SolrInputDocument document = new SolrInputDocument();
       document.addField("id", getIdValue(message));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java
index e878912..8aa201e 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.Processor;
 import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
 import org.apache.metron.solr.integration.components.SolrComponent;
 import org.apache.metron.integration.utils.SampleUtil;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
@@ -38,7 +39,6 @@ import java.util.Properties;
 public class SolrEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
 
   private String collection = "metron";
-  private String solrZookeeperUrl;
 
   @Override
   public InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception {
@@ -50,7 +50,7 @@ public class SolrEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
               public Void apply(@Nullable SolrComponent solrComponent) {
                 topologyProperties.setProperty("solr.zk", solrComponent.getZookeeperUrl());
                 try {
-                  String testZookeeperUrl = topologyProperties.getProperty("kafka.zk");
+                  String testZookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
                   Configurations configurations = SampleUtil.getSampleConfigs();
                   Map<String, Object> globalConfig = configurations.getGlobalConfig();
                   globalConfig.put("solr.zookeeper", solrComponent.getZookeeperUrl());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
index 24c8eab..580fd31 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
@@ -19,6 +19,7 @@ package org.apache.metron.solr.writer;
 
 import backtype.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.integration.utils.SampleUtil;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrInputDocument;
@@ -92,7 +93,7 @@ public class SolrWriterTest {
 
   @Test
   public void testWriter() throws Exception {
-    Configurations configurations = SampleUtil.getSampleConfigs();
+    EnrichmentConfigurations configurations = SampleUtil.getSampleEnrichmentConfigs();
     JSONObject message1 = new JSONObject();
     message1.put("intField", 100);
     message1.put("doubleField", 100.0);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/TestConstants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/TestConstants.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/TestConstants.java
index c798158..2b39f9d 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/TestConstants.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/TestConstants.java
@@ -19,8 +19,11 @@ package org.apache.metron;
 
 public class TestConstants {
 
-  public final static String SAMPLE_CONFIG_PATH = "../metron-integration-test/src/main/resources/sample/config/";
-  public final static String SAMPLE_DATA_INPUT_PATH = "../metron-integration-test/src/main/resources/sample/data/SampleInput/";
-  public final static String SAMPLE_DATA_PARSED_PATH = "../metron-integration-test/src/main/resources/sample/data/SampleParsed/";
-  public final static String SAMPLE_DATA_INDEXED_PATH = "../metron-integration-test/src/main/resources/sample/data/SampleIndexed/";
+  public final static String SAMPLE_CONFIG_PATH = "../metron-integration-test/src/main/config/zookeeper/";
+  public final static String PARSER_CONFIGS_PATH = "../metron-parsers/src/main/config/zookeeper/";
+  public final static String ENRICHMENTS_CONFIGS_PATH = "../metron-enrichment/src/main/config/zookeeper/";
+  public final static String SAMPLE_DATA_PATH = "../metron-integration-test/src/main/sample/data/";
+  public final static String SAMPLE_DATA_INPUT_PATH = "../metron-integration-test/src/main/sample/data/yaf/raw/";
+  public final static String SAMPLE_DATA_PARSED_PATH = "../metron-integration-test/src/main/sample/data/test/parsed/";
+  public final static String SAMPLE_DATA_INDEXED_PATH = "../metron-integration-test/src/main/sample/data/test/indexed/";
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/TestDataType.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/TestDataType.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/TestDataType.java
new file mode 100644
index 0000000..d31cbfe
--- /dev/null
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/TestDataType.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.test;
+
+public enum TestDataType {
+
+  RAW("raw"),PARSED("parsed"),INDEXED("indexed");
+
+  private String directoryName;
+  TestDataType(String directoryName) {
+    this.directoryName = directoryName;
+  }
+  public String getDirectoryName() {
+    return directoryName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
index 83bcb92..69c2174 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
@@ -29,16 +29,16 @@ import java.util.Set;
 
 public class BaseEnrichmentBoltTest extends BaseBoltTest {
 
-  public String sampleSensorEnrichmentConfigPath = TestConstants.SAMPLE_CONFIG_PATH + "sensors/yaf.json";
+  public String sampleSensorEnrichmentConfigPath = TestConstants.SAMPLE_CONFIG_PATH + "enrichments/test.json";
   protected Set<String> streamIds = new HashSet<>();
   protected String key = "someKey";
-  protected String sensorType = "yaf";
+  protected String sensorType = "test";
 
   /**
    * {
    * "ip_src_addr": "ip1",
    * "ip_dst_addr": "ip2",
-   * "source.type": "yaf"
+   * "source.type": "test"
    * }
    */
   @Multiline
@@ -48,7 +48,7 @@ public class BaseEnrichmentBoltTest extends BaseBoltTest {
    * {
    * "enrichments.geo.ip_src_addr": "ip1",
    * "enrichments.geo.ip_dst_addr": "ip2",
-   * "source.type": "yaf"
+   * "source.type": "test"
    * }
    */
   @Multiline
@@ -58,7 +58,7 @@ public class BaseEnrichmentBoltTest extends BaseBoltTest {
    * {
    * "enrichments.host.ip_src_addr": "ip1",
    * "enrichments.host.ip_dst_addr": "ip2",
-   * "source.type": "yaf"
+   * "source.type": "test"
    * }
    */
   @Multiline
@@ -68,7 +68,7 @@ public class BaseEnrichmentBoltTest extends BaseBoltTest {
    * {
    * "enrichments.hbaseEnrichment.ip_src_addr": "ip1",
    * "enrichments.hbaseEnrichment.ip_dst_addr": "ip2",
-   * "source.type": "yaf"
+   * "source.type": "test"
    * }
    */
   @Multiline

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/utils/SampleDataUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/utils/SampleDataUtils.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/utils/SampleDataUtils.java
new file mode 100644
index 0000000..0e3e4e6
--- /dev/null
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/utils/SampleDataUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.test.utils;
+
+import org.apache.metron.TestConstants;
+import org.apache.metron.test.TestDataType;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+public class SampleDataUtils {
+
+  public static String getSampleDataPath(String sensorType, TestDataType testDataType) throws FileNotFoundException {
+    File sensorSampleDataPath = new File(TestConstants.SAMPLE_DATA_PATH, sensorType);
+    if (sensorSampleDataPath.exists() && sensorSampleDataPath.isDirectory()) {
+      File sampleDataPath = new File(sensorSampleDataPath, testDataType.getDirectoryName());
+      if (sampleDataPath.exists() && sampleDataPath.isDirectory()) {
+        File[] children = sampleDataPath.listFiles();
+        if (children != null && children.length > 0) {
+          return children[0].getAbsolutePath();
+        }
+      }
+    }
+    throw new FileNotFoundException("Could not find data in " + TestConstants.SAMPLE_DATA_PATH + sensorType + "/" + testDataType.getDirectoryName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/utils/ValidationUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/utils/ValidationUtils.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/utils/ValidationUtils.java
new file mode 100644
index 0000000..e08c697
--- /dev/null
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/utils/ValidationUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.test.utils;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class ValidationUtils {
+
+  public static void assertJSONEqual(String expected, String actual) throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    Map m1 = mapper.readValue(expected, Map.class);
+    Map m2 = mapper.readValue(actual, Map.class);
+    for(Object k : m1.keySet()) {
+      Object v1 = m1.get(k);
+      Object v2 = m2.get(k);
+
+      if(v2 == null) {
+        Assert.fail("Unable to find key: " + k + " in output");
+      }
+      if(k.equals("timestamp")) {
+        //TODO: Take the ?!?@ timestamps out of the reference file.
+        Assert.assertEquals(v1.toString().length(), v2.toString().length());
+      }
+      else if(!v2.equals(v1)) {
+        Assert.assertEquals("value mismatch for " + k ,v1, v2);
+      }
+    }
+    Assert.assertEquals(m1.size(), m2.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index e7d818c..14bf06a 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -203,9 +203,9 @@
 						<exclude>**/*.json</exclude>
 						<exclude>**/*.log</exclude>
 						<exclude>**/src/main/resources/patterns/**</exclude>
-						<exclude>**/src/main/resources/sample/data/SampleIndexed/**</exclude>
-						<exclude>**/src/main/resources/sample/data/SampleInput/**</exclude>
-						<exclude>**/src/main/resources/sample/data/SampleParsed/**</exclude>
+						<exclude>**/src/main/sample/data/SampleIndexed/**</exclude>
+						<exclude>**/src/main/sample/data/SampleInput/**</exclude>
+						<exclude>**/src/main/sample/data/SampleParsed/**</exclude>
 						<exclude>**/dependency-reduced-pom.xml</exclude>
 					</excludes>
 				</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 762abe6..f6a3b63 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,9 +77,9 @@
                         <exclude>metron-ui/lib/public/font/**</exclude>
                         <exclude>metron-ui/node_modules/**</exclude>
                         <exclude>**/src/main/resources/patterns/**</exclude>
-                        <exclude>**/src/main/resources/sample/patterns/**</exclude>
+                        <exclude>**/src/main/sample/patterns/**</exclude>
                         <exclude>**/src/test/resources/**</exclude>
-                        <exclude>**/src/main/resources/sample/data/Sample*/**</exclude>
+                        <exclude>**/src/main/sample/data/**</exclude>
                         <exclude>**/dependency-reduced-pom.xml</exclude>
                         <exclude>**/files/opensoc-ui</exclude>
                         <exclude>**/target/**</exclude>


Mime
View raw message