metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [01/15] incubator-metron git commit: METRON 86: Adding Solr indexing support (merrimanr via cestella) closes apache/incubator-metron#67
Date Tue, 05 Apr 2016 19:41:57 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master edeec014c -> e59b1a31d


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
deleted file mode 100644
index 1aa729c..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.*;
-import com.google.common.collect.Iterables;
-import com.google.common.io.Files;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.Constants;
-import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
-import org.apache.metron.integration.util.TestUtils;
-import org.apache.metron.integration.util.UnitTestHelper;
-import org.apache.metron.integration.util.integration.ComponentRunner;
-import org.apache.metron.integration.util.integration.Processor;
-import org.apache.metron.integration.util.integration.ReadinessState;
-import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
-import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
-import org.apache.metron.integration.util.integration.components.KafkaWithZKComponent;
-import org.apache.metron.integration.util.mock.MockGeoAdapter;
-import org.apache.metron.integration.util.mock.MockHTable;
-import org.apache.metron.integration.util.threatintel.ThreatIntelHelper;
-import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.utils.SourceConfigUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.metron.utils.JSONUtils;
-
-import javax.annotation.Nullable;
-import java.io.*;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-public class EnrichmentIntegrationTest {
-  private static final String SRC_IP = "ip_src_addr";
-  private static final String DST_IP = "ip_dst_addr";
-  private String fluxPath = "src/main/resources/Metron_Configs/topologies/enrichment/test.yaml";
-  private String indexDir = "target/elasticsearch";
-  private String hdfsDir = "target/enrichmentIntegrationTest/hdfs";
-  private String sampleParsedPath = "src/main/resources/SampleParsed/YafExampleParsed";
-  private String sampleIndexedPath = "src/main/resources/SampleIndexed/YafIndexed";
-  private Map<String, String> sourceConfigs = new HashMap<>();
-
-  public static class Provider implements TableProvider, Serializable {
-    MockHTable.Provider  provider = new MockHTable.Provider();
-    @Override
-    public HTableInterface getTable(Configuration config, String tableName) throws IOException {
-      return provider.getTable(config, tableName);
-    }
-  }
-
-  public static void cleanHdfsDir(String hdfsDirStr) {
-    File hdfsDir = new File(hdfsDirStr);
-    Stack<File> fs = new Stack<>();
-    if(hdfsDir.exists()) {
-      fs.push(hdfsDir);
-      while(!fs.empty()) {
-        File f = fs.pop();
-        if (f.isDirectory()) {
-          for(File child : f.listFiles()) {
-            fs.push(child);
-          }
-        }
-        else {
-          if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
-            f.delete();
-          }
-        }
-      }
-    }
-  }
-
-  public static List<Map<String, Object> > readDocsFromDisk(String hdfsDirStr) throws IOException {
-    List<Map<String, Object>> ret = new ArrayList<>();
-    File hdfsDir = new File(hdfsDirStr);
-    Stack<File> fs = new Stack<>();
-    if(hdfsDir.exists()) {
-      fs.push(hdfsDir);
-      while(!fs.empty()) {
-        File f = fs.pop();
-        if(f.isDirectory()) {
-          for (File child : f.listFiles()) {
-            fs.push(child);
-          }
-        }
-        else {
-          System.out.println("Processed " + f);
-          if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
-            List<byte[]> data = TestUtils.readSampleData(f.getPath());
-            Iterables.addAll(ret, Iterables.transform(data, new Function<byte[], Map<String, Object>>() {
-              @Nullable
-              @Override
-              public Map<String, Object> apply(@Nullable byte[] bytes) {
-                String s = new String(bytes);
-                try {
-                  return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() {
-                  });
-                } catch (IOException e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            }));
-          }
-        }
-      }
-    }
-    return ret;
-  }
-
-
-  @Test
-  public void test() throws Exception {
-    cleanHdfsDir(hdfsDir);
-    final String dateFormat = "yyyy.MM.dd.hh";
-    final String index = "test_index_" + new SimpleDateFormat(dateFormat).format(new Date());
-    String yafConfig = "{\n" +
-            "  \"index\": \"test\",\n" +
-            "  \"batchSize\": 5,\n" +
-            "  \"enrichmentFieldMap\":\n" +
-            "  {\n" +
-            "    \"geo\": [\"" + SRC_IP + "\", \"" + DST_IP + "\"],\n" +
-            "    \"host\": [\"" + SRC_IP + "\", \"" + DST_IP + "\"]\n" +
-            "  },\n" +
-            "  \"threatIntelFieldMap\":\n" +
-            "  {\n" +
-            "    \"ip\": [\"" + SRC_IP + "\", \"" + DST_IP + "\"]\n" +
-            "  }\n" +
-            "}";
-    sourceConfigs.put("test", yafConfig);
-    final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
-    final String cf = "cf";
-    final String trackerHBaseTable = "tracker";
-    final String ipThreatIntelTable = "ip_threat_intel";
-    final Properties topologyProperties = new Properties() {{
-      setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"},\n" +
-              "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"},\n" +
-              "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"},\n" +
-              "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
-      setProperty("hbase.provider.impl","" + Provider.class.getName());
-      setProperty("threat.intel.tracker.table", trackerHBaseTable);
-      setProperty("threat.intel.tracker.cf", cf);
-      setProperty("threat.intel.ip.table", ipThreatIntelTable);
-      setProperty("threat.intel.ip.cf", cf);
-      setProperty("es.clustername", "metron");
-      setProperty("es.port", "9300");
-      setProperty("es.ip", "localhost");
-      setProperty("index.date.format", dateFormat);
-      setProperty("index.hdfs.output", hdfsDir);
-    }};
-    final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
-      add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
-    }})
-            .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
-              @Nullable
-              @Override
-              public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
-                topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
-                try {
-                  for(String sourceType: sourceConfigs.keySet()) {
-                    SourceConfigUtils.writeToZookeeper(sourceType, sourceConfigs.get(sourceType).getBytes(), kafkaWithZKComponent.getZookeeperConnect());
-                  }
-                } catch (Exception e) {
-                  e.printStackTrace();
-                }
-                return null;
-              }
-            });
-
-    ElasticSearchComponent esComponent = new ElasticSearchComponent.Builder()
-            .withHttpPort(9211)
-            .withIndexDir(new File(indexDir))
-            .build();
-
-    //create MockHBaseTables
-    final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTable, cf);
-    final MockHTable ipTable = (MockHTable)MockHTable.Provider.addToCache(ipThreatIntelTable, cf);
-    ThreatIntelHelper.INSTANCE.load(ipTable, cf, new ArrayList<LookupKV<ThreatIntelKey, ThreatIntelValue>>(){{
-      add(new LookupKV<>(new ThreatIntelKey("10.0.2.3"), new ThreatIntelValue(new HashMap<String, String>())));
-    }});
-
-    FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
-            .withTopologyLocation(new File(fluxPath))
-            .withTopologyName("test")
-            .withTopologyProperties(topologyProperties)
-            .build();
-
-    UnitTestHelper.verboseLogging();
-    ComponentRunner runner = new ComponentRunner.Builder()
-            .withComponent("kafka", kafkaComponent)
-            .withComponent("elasticsearch", esComponent)
-            .withComponent("storm", fluxComponent)
-            .withMillisecondsBetweenAttempts(10000)
-            .withNumRetries(10)
-            .build();
-    runner.start();
-    try {
-      fluxComponent.submitTopology();
-      kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
-      List<Map<String, Object>> docs =
-              runner.process(new Processor<List<Map<String, Object>>>() {
-                List<Map<String, Object>> docs = null;
-
-                public ReadinessState process(ComponentRunner runner) {
-                  ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
-                  if (elasticSearchComponent.hasIndex(index)) {
-                    List<Map<String, Object>> docsFromDisk;
-                    try {
-                      docs = elasticSearchComponent.getAllIndexedDocs(index, "test_doc");
-                      docsFromDisk = readDocsFromDisk(hdfsDir);
-                      System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
-                    } catch (IOException e) {
-                      throw new IllegalStateException("Unable to retrieve indexed documents.", e);
-                    }
-                    if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
-                      return ReadinessState.NOT_READY;
-                    } else {
-                      return ReadinessState.READY;
-                    }
-                  } else {
-                    return ReadinessState.NOT_READY;
-                  }
-                }
-
-                public List<Map<String, Object>> getResult() {
-                  return docs;
-                }
-              });
-
-
-      Assert.assertEquals(inputMessages.size(), docs.size());
-
-      for (Map<String, Object> doc : docs) {
-        baseValidation(doc);
-        hostEnrichmentValidation(doc);
-        geoEnrichmentValidation(doc);
-        threatIntelValidation(doc);
-
-      }
-      List<Map<String, Object>> docsFromDisk = readDocsFromDisk(hdfsDir);
-      Assert.assertEquals(docsFromDisk.size(), docs.size()) ;
-
-      Assert.assertEquals(new File(hdfsDir).list().length, 1);
-      Assert.assertEquals(new File(hdfsDir).list()[0], "test_doc");
-      for (Map<String, Object> doc : docsFromDisk) {
-        baseValidation(doc);
-        hostEnrichmentValidation(doc);
-        geoEnrichmentValidation(doc);
-        threatIntelValidation(doc);
-
-      }
-    }
-    finally {
-      cleanHdfsDir(hdfsDir);
-      runner.stop();
-    }
-  }
-
-  public static void baseValidation(Map<String, Object> jsonDoc) {
-    assertEnrichmentsExists("threatintels.", setOf("ip"), jsonDoc.keySet());
-    assertEnrichmentsExists("enrichments.", setOf("geo", "host"), jsonDoc.keySet());
-    for(Map.Entry<String, Object> kv : jsonDoc.entrySet()) {
-      //ensure no values are empty.
-      Assert.assertTrue(kv.getValue().toString().length() > 0);
-    }
-    //ensure we always have a source ip and destination ip
-    Assert.assertNotNull(jsonDoc.get(SRC_IP));
-    Assert.assertNotNull(jsonDoc.get(DST_IP));
-  }
-
-  private static class EvaluationPayload {
-    Map<String, Object> indexedDoc;
-    String key;
-    public EvaluationPayload(Map<String, Object> indexedDoc, String key) {
-      this.indexedDoc = indexedDoc;
-      this.key = key;
-    }
-  }
-
-  private static enum HostEnrichments implements Predicate<EvaluationPayload>{
-    LOCAL_LOCATION(new Predicate<EvaluationPayload>() {
-
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.local").equals("YES");
-      }
-    })
-    ,UNKNOWN_LOCATION(new Predicate<EvaluationPayload>() {
-
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.local").equals("UNKNOWN");
-      }
-    })
-    ,IMPORTANT(new Predicate<EvaluationPayload>() {
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.asset_value").equals("important");
-      }
-    })
-    ,PRINTER_TYPE(new Predicate<EvaluationPayload>() {
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("printer");
-      }
-    })
-    ,WEBSERVER_TYPE(new Predicate<EvaluationPayload>() {
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("webserver");
-      }
-    })
-    ,UNKNOWN_TYPE(new Predicate<EvaluationPayload>() {
-      @Override
-      public boolean apply(@Nullable EvaluationPayload evaluationPayload) {
-        return evaluationPayload.indexedDoc.get("enrichments.host." + evaluationPayload.key + ".known_info.type").equals("unknown");
-      }
-    })
-    ;
-
-    Predicate<EvaluationPayload> _predicate;
-    HostEnrichments(Predicate<EvaluationPayload> predicate) {
-      this._predicate = predicate;
-    }
-
-    public boolean apply(EvaluationPayload payload) {
-      return _predicate.apply(payload);
-    }
-
-  }
-
-  private static void assertEnrichmentsExists(String topLevel, Set<String> expectedEnrichments, Set<String> keys) {
-    for(String key : keys) {
-      if(key.startsWith(topLevel)) {
-        String secondLevel = Iterables.get(Splitter.on(".").split(key), 1);
-        String message = "Found an enrichment/threat intel (" + secondLevel + ") that I didn't expect (expected enrichments :"
-                       + Joiner.on(",").join(expectedEnrichments) + "), but it was not there.  If you've created a new"
-                       + " enrichment, then please add a validation method to this unit test.  Otherwise, it's a solid error"
-                       + " and should be investigated.";
-        Assert.assertTrue( message, expectedEnrichments.contains(secondLevel));
-      }
-    }
-  }
-  private static void threatIntelValidation(Map<String, Object> indexedDoc) {
-    if(keyPatternExists("threatintels.", indexedDoc)) {
-      //if we have any threat intel messages, we want to tag is_alert to true
-      Assert.assertEquals(indexedDoc.get("is_alert"), "true");
-    }
-    else {
-      //For YAF this is the case, but if we do snort later on, this will be invalid.
-      Assert.assertNull(indexedDoc.get("is_alert"));
-    }
-    //ip threat intels
-    if(keyPatternExists("threatintels.ip.", indexedDoc)) {
-      if(indexedDoc.get(SRC_IP).equals("10.0.2.3")) {
-        Assert.assertEquals(indexedDoc.get("threatintels.ip." + SRC_IP + ".ip_threat_intel"), "alert");
-      }
-      else if(indexedDoc.get(DST_IP).equals("10.0.2.3")) {
-        Assert.assertEquals(indexedDoc.get("threatintels.ip." + DST_IP + ".ip_threat_intel"), "alert");
-      }
-      else {
-        Assert.fail("There was a threat intels that I did not expect.");
-      }
-    }
-
-  }
-
-  private static void geoEnrichmentValidation(Map<String, Object> indexedDoc) {
-    //should have geo enrichment on every message due to mock geo adapter
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP +".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
-  }
-
-  private static void hostEnrichmentValidation(Map<String, Object> indexedDoc) {
-    boolean enriched = false;
-    //important local printers
-    {
-      Set<String> ips = setOf("10.0.2.15", "10.60.10.254");
-      if (ips.contains(indexedDoc.get(SRC_IP))) {
-        //this is a local, important, printer
-        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
-                ,HostEnrichments.IMPORTANT
-                ,HostEnrichments.PRINTER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, SRC_IP))
-        );
-        enriched = true;
-      }
-      if (ips.contains(indexedDoc.get(DST_IP))) {
-        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
-                ,HostEnrichments.IMPORTANT
-                ,HostEnrichments.PRINTER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, DST_IP))
-        );
-        enriched = true;
-      }
-    }
-    //important local webservers
-    {
-      Set<String> ips = setOf("10.1.128.236");
-      if (ips.contains(indexedDoc.get(SRC_IP))) {
-        //this is a local, important, printer
-        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
-                ,HostEnrichments.IMPORTANT
-                ,HostEnrichments.WEBSERVER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, SRC_IP))
-        );
-        enriched = true;
-      }
-      if (ips.contains(indexedDoc.get(DST_IP))) {
-        Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
-                ,HostEnrichments.IMPORTANT
-                ,HostEnrichments.WEBSERVER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, DST_IP))
-        );
-        enriched = true;
-      }
-    }
-    if(!enriched) {
-      Assert.assertFalse(keyPatternExists("enrichments.host", indexedDoc));
-    }
-  }
-
-
-  private static boolean keyPatternExists(String pattern, Map<String, Object> indexedObj) {
-    for(String k : indexedObj.keySet()) {
-      if(k.startsWith(pattern)) {
-        return true;
-      }
-    }
-    return false;
-  }
-  private static Set<String> setOf(String... items) {
-    Set<String> ret = new HashSet<>();
-    for(String item : items) {
-      ret.add(item);
-    }
-    return ret;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
index 3c6972a..10e13da 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.metron.integration;
 
-import com.google.common.base.Function;
 import org.apache.metron.Constants;
 import org.apache.metron.integration.util.TestUtils;
 import org.apache.metron.integration.util.UnitTestHelper;
@@ -26,50 +25,38 @@ import org.apache.metron.integration.util.integration.Processor;
 import org.apache.metron.integration.util.integration.ReadinessState;
 import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.util.integration.components.KafkaWithZKComponent;
-import org.apache.metron.utils.SourceConfigUtils;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
-public abstract class ParserIntegrationTest {
+public abstract class ParserIntegrationTest extends BaseIntegrationTest {
 
   public abstract String getFluxPath();
   public abstract String getSampleInputPath();
   public abstract String getSampleParsedPath();
-  public abstract String getSourceType();
-  public abstract String getSourceConfig();
+  public abstract String getSensorType();
   public abstract String getFluxTopicProperty();
 
   @Test
   public void test() throws Exception {
 
-    final String kafkaTopic = "test";
+    final String kafkaTopic = getSensorType();
 
     final List<byte[]> inputMessages = TestUtils.readSampleData(getSampleInputPath());
 
     final Properties topologyProperties = new Properties() {{
       setProperty(getFluxTopicProperty(), kafkaTopic);
     }};
-    final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
+    final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
       add(new KafkaWithZKComponent.Topic(kafkaTopic, 1));
-    }})
-            .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
-              @Nullable
-              @Override
-              public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
-                topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
-                try {
-                  SourceConfigUtils.writeToZookeeper(getSourceType(), getSourceConfig().getBytes(), kafkaWithZKComponent.getZookeeperConnect());
-                } catch (Exception e) {
-                  e.printStackTrace();
-                }
-                return null;
-              }
-            });
+    }});
 
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/PcapParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/PcapParserIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/PcapParserIntegrationTest.java
index 284e3c0..6a220a4 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/PcapParserIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/PcapParserIntegrationTest.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,7 +17,6 @@
  */
 package org.apache.metron.integration;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -28,6 +27,7 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.metron.Constants;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.integration.util.UnitTestHelper;
 import org.apache.metron.integration.util.integration.ComponentRunner;
@@ -42,13 +42,16 @@ import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
-public class PcapParserIntegrationTest {
+public class PcapParserIntegrationTest extends BaseIntegrationTest {
 
   private static String BASE_DIR = "pcap";
   private static String DATA_DIR = BASE_DIR + "/data_dir";
@@ -57,7 +60,8 @@ public class PcapParserIntegrationTest {
   private String targetDir = "target";
 
   public static class Provider implements TableProvider, Serializable {
-    MockHTable.Provider  provider = new MockHTable.Provider();
+    MockHTable.Provider provider = new MockHTable.Provider();
+
     @Override
     public HTableInterface getTable(Configuration config, String tableName) throws IOException {
       return provider.getTable(config, tableName);
@@ -80,8 +84,9 @@ public class PcapParserIntegrationTest {
     }
     return outDir;
   }
+
   private static void clearOutDir(File outDir) {
-    for(File f : outDir.listFiles()) {
+    for (File f : outDir.listFiles()) {
       f.delete();
     }
   }
@@ -89,13 +94,13 @@ public class PcapParserIntegrationTest {
   private static Map<String, byte[]> readPcaps(Path pcapFile) throws IOException {
     SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
             Reader.file(pcapFile)
-            );
+    );
     Map<String, byte[]> ret = new HashMap<>();
     IntWritable key = new IntWritable();
     BytesWritable value = new BytesWritable();
     PcapParser parser = new PcapParser();
     parser.init();
-    while(reader.next(key, value)) {
+    while (reader.next(key, value)) {
       int keyInt = key.get();
       byte[] valueBytes = value.copyBytes();
       JSONObject message = parser.parse(valueBytes).get(0);
@@ -124,29 +129,19 @@ public class PcapParserIntegrationTest {
     File baseDir = new File(new File(targetDir), BASE_DIR);
     Assert.assertNotNull(topologiesDir);
     Assert.assertNotNull(targetDir);
-    Path pcapFile = new Path(topologiesDir + "/../../SampleInput/PCAPExampleOutput");
+    Path pcapFile = new Path("../Metron-Testing/src/main/resources/sample/data/SampleInput/PCAPExampleOutput");
     final Map<String, byte[]> pcapEntries = readPcaps(pcapFile);
     Assert.assertTrue(Iterables.size(pcapEntries.keySet()) > 0);
     final Properties topologyProperties = new Properties() {{
-      setProperty("hbase.provider.impl","" + Provider.class.getName());
+      setProperty("hbase.provider.impl", "" + Provider.class.getName());
       setProperty("spout.kafka.topic.pcap", kafkaTopic);
-      setProperty("bolt.hbase.table.name",tableName);
+      setProperty("bolt.hbase.table.name", tableName);
       setProperty("bolt.hbase.table.fields", columnFamily + ":" + columnIdentifier);
     }};
-    final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
+    final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
       add(new KafkaWithZKComponent.Topic(kafkaTopic, 1));
-    }})
-            .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
-                                     @Nullable
-                                     @Override
-                                     public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
-
-                                       topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
-                                       return null;
-                                     }
-                                   }
-            );
-    //.withExistingZookeeper("localhost:2000");
+      add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+    }});
 
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
             .withTopologyLocation(new File(topologiesDir + "/pcap/test.yaml"))
@@ -154,7 +149,7 @@ public class PcapParserIntegrationTest {
             .withTopologyProperties(topologyProperties)
             .build();
 
-    final MockHTable pcapTable = (MockHTable)MockHTable.Provider.addToCache(tableName, columnFamily);
+    final MockHTable pcapTable = (MockHTable) MockHTable.Provider.addToCache(tableName, columnFamily);
 
     UnitTestHelper.verboseLogging();
     ComponentRunner runner = new ComponentRunner.Builder()
@@ -166,7 +161,6 @@ public class PcapParserIntegrationTest {
             .build();
     try {
       runner.start();
-      System.out.println("Components started...");
       fluxComponent.submitTopology();
       kafkaComponent.writeMessages(kafkaTopic, pcapEntries.values());
       System.out.println("Sent pcap data: " + pcapEntries.size());
@@ -178,8 +172,9 @@ public class PcapParserIntegrationTest {
         public ReadinessState process(ComponentRunner runner) {
           int hbaseCount = 0;
           try {
+            System.out.println("Waiting...");
             ResultScanner resultScanner = pcapTable.getScanner(columnFamily.getBytes(), columnIdentifier.getBytes());
-            while(resultScanner.next() != null) hbaseCount++;
+            while (resultScanner.next() != null) hbaseCount++;
           } catch (IOException e) {
             e.printStackTrace();
           }
@@ -198,7 +193,7 @@ public class PcapParserIntegrationTest {
       ResultScanner resultScanner = pcapTable.getScanner(columnFamily.getBytes(), columnIdentifier.getBytes());
       Result result;
       int rowCount = 0;
-      while((result = resultScanner.next()) != null) {
+      while ((result = resultScanner.next()) != null) {
         String rowKey = new String(result.getRow());
         byte[] hbaseValue = result.getValue(columnFamily.getBytes(), columnIdentifier.getBytes());
         byte[] originalValue = pcapEntries.get(rowKey);
@@ -208,8 +203,7 @@ public class PcapParserIntegrationTest {
       }
       Assert.assertEquals(pcapEntries.size(), rowCount);
       System.out.println("Ended");
-    }
-    finally {
+    } finally {
       runner.stop();
       clearOutDir(outDir);
       clearOutDir(queryDir);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java
index 7508ad7..752c622 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/SnortIntegrationTest.java
@@ -21,41 +21,25 @@ public class SnortIntegrationTest extends ParserIntegrationTest {
 
   @Override
   public String getFluxPath() {
-    return "src/main/resources/Metron_Configs/topologies/snort/test.yaml";
+    return "../Metron-Topologies/src/main/resources/Metron_Configs/topologies/snort/test.yaml";
   }
 
   @Override
   public String getSampleInputPath() {
-    return "src/main/resources/SampleInput/SnortOutput";
+    return "../Metron-Testing/src/main/resources/sample/data/SampleInput/SnortOutput";
   }
 
   @Override
   public String getSampleParsedPath() {
-    return "src/main/resources/SampleParsed/SnortParsed";
+    return "../Metron-Testing/src/main/resources/sample/data/SampleParsed/SnortParsed";
   }
 
   @Override
-  public String getSourceType() {
+  public String getSensorType() {
     return "snort";
   }
 
   @Override
-  public String getSourceConfig() {
-    return "{\"index\": \"snort\"," +
-            " \"batchSize\": 1," +
-            " \"enrichmentFieldMap\":" +
-            "  {" +
-            "    \"geo\": [\"src\", \"dst\"]," +
-            "    \"host\": [\"src\", \"dst\"]" +
-            "  }," +
-            "  \"threatIntelFieldMap\":" +
-            "  {" +
-            "    \"ip\": [\"src\", \"dst\"]" +
-            "  }" +
-            "}";
-  }
-
-  @Override
   public String getFluxTopicProperty() {
     return "spout.kafka.topic.snort";
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
index f255a0a..114fa98 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
@@ -21,41 +21,25 @@ public class YafIntegrationTest extends ParserIntegrationTest {
 
   @Override
   public String getFluxPath() {
-    return "src/main/resources/Metron_Configs/topologies/yaf/test.yaml";
+    return "../Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml";
   }
 
   @Override
   public String getSampleInputPath() {
-    return "src/main/resources/SampleInput/YafExampleOutput";
+    return "../Metron-Testing/src/main/resources/sample/data/SampleInput/YafExampleOutput";
   }
 
   @Override
   public String getSampleParsedPath() {
-    return "src/main/resources/SampleParsed/YafExampleParsed";
+    return "../Metron-Testing/src/main/resources/sample/data/SampleParsed/YafExampleParsed";
   }
 
   @Override
-  public String getSourceType() {
+  public String getSensorType() {
     return "yaf";
   }
 
   @Override
-  public String getSourceConfig() {
-    return "{\"index\": \"yaf\"," +
-            " \"batchSize\": 5," +
-            " \"enrichmentFieldMap\":" +
-            "  {" +
-            "    \"geo\": [\"ip_src_addr\", \"ip_dst_addr\"]," +
-            "    \"host\": [\"ip_src_addr\", \"ip_dst_addr\"]" +
-            "  }," +
-            "  \"threatIntelFieldMap\":" +
-            "  {" +
-            "    \"ip\": [\"ip_src_addr\", \"ip_dst_addr\"]" +
-            "  }" +
-            "}";
-  }
-
-  @Override
   public String getFluxTopicProperty() {
     return "spout.kafka.topic.yaf";
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
deleted file mode 100644
index a3db041..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/TestUtils.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class TestUtils {
-
-  public static List<byte[]> readSampleData(String samplePath) throws IOException {
-    BufferedReader br = new BufferedReader(new FileReader(samplePath));
-    List<byte[]> ret = new ArrayList<>();
-    for (String line = null; (line = br.readLine()) != null; ) {
-      ret.add(line.getBytes());
-    }
-    br.close();
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockGeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockGeoAdapter.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockGeoAdapter.java
deleted file mode 100644
index ee71cda..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockGeoAdapter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.mock;
-
-import com.google.common.base.Joiner;
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.json.simple.JSONObject;
-
-import java.io.Serializable;
-
-public class MockGeoAdapter implements EnrichmentAdapter<String>,
-        Serializable {
-
-  public static final String DEFAULT_LOC_ID = "1";
-  public static final String DEFAULT_COUNTRY = "test country";
-  public static final String DEFAULT_CITY = "test city";
-  public static final String DEFAULT_POSTAL_CODE = "test postalCode";
-  public static final String DEFAULT_LATITUDE = "test latitude";
-  public static final String DEFAULT_LONGITUDE = "test longitude";
-  public static final String DEFAULT_DMACODE= "test dmaCode";
-  public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LONGITUDE, DEFAULT_LATITUDE);
-
-  @Override
-  public void logAccess(String value) {
-
-  }
-
-  public JSONObject enrich(String metadata) {
-    JSONObject enriched = new JSONObject();
-    enriched.put("locID", DEFAULT_LOC_ID);
-    enriched.put("country", DEFAULT_COUNTRY);
-    enriched.put("city", DEFAULT_CITY);
-    enriched.put("postalCode", DEFAULT_POSTAL_CODE);
-    enriched.put("latitude", DEFAULT_LATITUDE);
-    enriched.put("longitude", DEFAULT_LONGITUDE);
-    enriched.put("dmaCode", DEFAULT_DMACODE);
-    enriched.put("location_point", DEFAULT_LOCATION_POINT);
-    return enriched;
-  }
-
-  public boolean initializeAdapter() {
-    return true;
-  }
-
-  public void cleanup() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHBaseConnector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHBaseConnector.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHBaseConnector.java
deleted file mode 100644
index b6b4a9d..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHBaseConnector.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.mock;
-
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.metron.hbase.Connector;
-import org.apache.metron.hbase.TupleTableConfig;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class MockHBaseConnector extends Connector {
-    static List<Put> puts = Collections.synchronizedList(new ArrayList<Put>());
-    public MockHBaseConnector(TupleTableConfig conf, String _quorum, String _port) throws IOException {
-        super(conf, _quorum, _port);
-    }
-
-    @Override
-    public void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
-        puts.add(put);
-    }
-
-    @Override
-    public void close() {
-
-    }
-    public static void clear() {
-        puts.clear();
-    }
-    public static List<Put> getPuts() {
-        return puts;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
deleted file mode 100644
index 1e64362..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.threatintel;
-
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
-import org.apache.metron.reference.lookup.LookupKV;
-import org.apache.metron.threatintel.ThreatIntelResults;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
-
-import java.io.IOException;
-
-public enum ThreatIntelHelper {
-    INSTANCE;
-    ThreatIntelConverter converter = new ThreatIntelConverter();
-
-    public void load(HTableInterface table, String cf, Iterable<LookupKV<ThreatIntelKey, ThreatIntelValue>> results) throws IOException {
-        for(LookupKV<ThreatIntelKey, ThreatIntelValue> result : results) {
-            Put put = converter.toPut(cf, result.getKey(), result.getValue());
-            table.put(put);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/pom.xml b/metron-streaming/pom.xml
index 86486ab..c4222e8 100644
--- a/metron-streaming/pom.xml
+++ b/metron-streaming/pom.xml
@@ -47,6 +47,8 @@
 		<global_json_schema_validator_version>2.2.5</global_json_schema_validator_version>
 		<global_slf4j_version>1.7.7</global_slf4j_version>
 		<global_opencsv_version>3.7</global_opencsv_version>
+		<global_solr_version>5.2.1</global_solr_version>
+		<global_mockito_version>1.9.5</global_mockito_version>
 	</properties>
 	<licenses>
 		<license>
@@ -75,6 +77,8 @@
 		<module>Metron-DataLoads</module>
 		<module>Metron-Topologies</module>
 		<module>Metron-Pcap_Service</module>
+		<module>Metron-Elasticsearch</module>
+		<module>Metron-Solr</module>
 		<module>Metron-Testing</module>
 	</modules>
 	<dependencies>
@@ -137,7 +141,9 @@
 						<exclude>**/*.json</exclude>
 						<exclude>**/*.log</exclude>
 						<exclude>**/src/main/resources/patterns/**</exclude>
-						<exclude>**/src/main/resources/SampleInput/**</exclude>
+						<exclude>**/src/main/resources/sample/data/SampleIndexed/**</exclude>
+						<exclude>**/src/main/resources/sample/data/SampleInput/**</exclude>
+						<exclude>**/src/main/resources/sample/data/SampleParsed/**</exclude>
 						<exclude>**/dependency-reduced-pom.xml</exclude>
 					</excludes>
 				</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9474cf0..8554419 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,70 +19,72 @@
 	code base of Metron.
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<groupId>org.apache.metron</groupId>
-	<artifactId>Metron</artifactId>
-	<version>0.1BETA</version>
-	<packaging>pom</packaging>
-	<name>Metron</name>
-	<description>Performs release auditing for Metron.</description>
-	<url>https://metron.incubator.apache.org/</url>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.rat</groupId>
-				<artifactId>apache-rat-plugin</artifactId>
-				<version>0.11</version>
-				<configuration>
-				<excludes>
-					<exclude>**/README.md</exclude>
-					<exclude>**/VERSION</exclude>
-					<exclude>**/*.json</exclude>
-					<exclude>**/*.log</exclude>
-					<exclude>**/*.template</exclude>
-					<exclude>**/.*</exclude>
-					<exclude>**/.*/**</exclude>
-					<exclude>**/*.seed</exclude>
-					<exclude>**/*.iml</exclude>
-					<exclude>**/ansible.cfg</exclude>
-					<exclude>site/**</exclude>
-          <exclude>metron-ui/lib/public/css/normalize.min.css</exclude>
-          <exclude>metron-ui/lib/public/app/panels/pcap/lib/showdown.js</exclude>
-          <!-- 3rd party bundled javascript dependencies -->
-          <exclude>metron-ui/lib/public/vendor/**</exclude>
-          <!-- Kibana panels copied from kibana and bundled -->
-          <exclude>metron-ui/lib/public/app/panels/dashcontrol/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/filtering/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/histogram/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/hits/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/map/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/query/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/sparklines/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/table/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/terms/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/text/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/timepicker/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/trends/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/bettermap/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/column/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/derivequeries/**</exclude>
-          <exclude>metron-ui/lib/public/app/panels/stats/**</exclude>
-          <exclude>metron-ui/lib/public/app/partials/**</exclude>
-          <exclude>metron-ui/lib/public/app/services/**</exclude>
-          <!-- fontawesome fonts are declared in the license, so we can exclude here -->
-          <exclude>metron-ui/lib/public/css/font-awesome.min.css</exclude>
-          <exclude>metron-ui/lib/public/font/**</exclude>
-					<exclude>**/src/main/resources/patterns/**</exclude>
-					<exclude>**/src/test/resources/**</exclude>
-					<exclude>**/src/main/resources/Sample*/**</exclude>
-					<exclude>**/dependency-reduced-pom.xml</exclude>
-				        <exclude>**/files/opensoc-ui</exclude>
-					<exclude>**/target/**</exclude>
-					<exclude>**/bro-plugin-kafka/build/**</exclude>
-				</excludes>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.metron</groupId>
+    <artifactId>Metron</artifactId>
+    <version>0.1BETA</version>
+    <packaging>pom</packaging>
+    <name>Metron</name>
+    <description>Performs release auditing for Metron.</description>
+    <url>https://metron.incubator.apache.org/</url>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.11</version>
+                <configuration>
+                    <excludes>
+                        <exclude>**/README.md</exclude>
+                        <exclude>**/VERSION</exclude>
+                        <exclude>**/*.json</exclude>
+                        <exclude>**/*.log</exclude>
+                        <exclude>**/*.template</exclude>
+                        <exclude>**/.*</exclude>
+                        <exclude>**/.*/**</exclude>
+                        <exclude>**/*.seed</exclude>
+                        <exclude>**/*.iml</exclude>
+                        <exclude>**/ansible.cfg</exclude>
+                        <exclude>site/**</exclude>
+                        <exclude>metron-ui/lib/public/css/normalize.min.css</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/pcap/lib/showdown.js</exclude>
+                        <!-- 3rd party bundled javascript dependencies -->
+                        <exclude>metron-ui/lib/public/vendor/**</exclude>
+                        <!-- Kibana panels copied from kibana and bundled -->
+                        <exclude>metron-ui/lib/public/app/panels/dashcontrol/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/filtering/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/histogram/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/hits/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/map/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/query/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/sparklines/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/table/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/terms/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/text/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/timepicker/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/trends/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/bettermap/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/column/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/derivequeries/**</exclude>
+                        <exclude>metron-ui/lib/public/app/panels/stats/**</exclude>
+                        <exclude>metron-ui/lib/public/app/partials/**</exclude>
+                        <exclude>metron-ui/lib/public/app/services/**</exclude>
+                        <exclude>metron-ui/lib/public/app/services/**</exclude>
+                        <!-- fontawesome fonts are declared in the license, so we can exclude here -->
+                        <exclude>metron-ui/lib/public/css/font-awesome.min.css</exclude>
+                        <exclude>metron-ui/lib/public/font/**</exclude>
+                        <exclude>metron-ui/node_modules/**</exclude>
+                        <exclude>**/src/main/resources/patterns/**</exclude>
+                        <exclude>**/src/test/resources/**</exclude>
+                        <exclude>**/src/main/resources/sample/data/Sample*/**</exclude>
+                        <exclude>**/dependency-reduced-pom.xml</exclude>
+                        <exclude>**/files/opensoc-ui</exclude>
+                        <exclude>**/target/**</exclude>
+                        <exclude>**/bro-plugin-kafka/build/**</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>


Mime
View raw message