metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [2/6] incubator-metron git commit: METRON-93: Generalize the HBase threat intel infrastructure to support enrichments closes apache/incubator-metron#64
Date Thu, 07 Apr 2016 12:56:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/taxii/TaxiiIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/taxii/TaxiiIntegrationTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/taxii/TaxiiIntegrationTest.java
deleted file mode 100644
index f61d729..0000000
--- a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/taxii/TaxiiIntegrationTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.dataloads.taxii;
-
-import com.google.common.base.Splitter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.metron.dataloads.extractor.stix.StixExtractor;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
-import org.apache.metron.integration.util.UnitTestHelper;
-import org.apache.metron.integration.util.mock.MockHTable;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-public class TaxiiIntegrationTest {
-
-    @Before
-    public void setup() throws IOException {
-        MockTaxiiService.start(8282);
-    }
-
-    @After
-    public void teardown() {
-        MockTaxiiService.shutdown();
-        MockHTable.Provider.clear();
-    }
-
-
-
-    @Test
-    public void testTaxii() throws Exception {
-        /**
-         {
-            "endpoint" : "http://localhost:8282/taxii-discovery-service"
-           ,"type" : "DISCOVER"
-           ,"collection" : "guest.Abuse_ch"
-           ,"tableMap" : {
-                    "DomainName:FQDN" : "malicious_domain:cf"
-                   ,"Address:IPV_4_ADDR" : "malicious_address:cf"
-                         }
-         }
-         */
-        String taxiiConnectionConfig = "{\n" +
-                "            \"endpoint\" : \"http://localhost:8282/taxii-discovery-service\"\n" +
-                "           ,\"type\" : \"DISCOVER\"\n" +
-                "           ,\"collection\" : \"guest.Abuse_ch\"\n" +
-                "           ,\"tableMap\" : {\n" +
-                "                    \"DomainName:FQDN\" : \"malicious_domain:cf\"\n" +
-                "                   ,\"Address:IPV_4_ADDR\" : \"malicious_address:cf\"\n" +
-                "                         }\n" +
-                "         }";
-        final MockHTable.Provider provider = new MockHTable.Provider();
-        final Configuration config = HBaseConfiguration.create();
-        TaxiiHandler handler = new TaxiiHandler(TaxiiConnectionConfig.load(taxiiConnectionConfig), new StixExtractor(), config ) {
-            @Override
-            protected synchronized HTableInterface createHTable(TableInfo tableInfo) throws IOException {
-                return provider.addToCache(tableInfo.getTableName(), tableInfo.getColumnFamily());
-            }
-        };
-        //UnitTestHelper.verboseLogging();
-        handler.run();
-        Set<String> maliciousDomains;
-        {
-            MockHTable table = (MockHTable) provider.getTable(config, "malicious_domain");
-            maliciousDomains = getIndicators(table.getPutLog(), "cf");
-        }
-        Assert.assertTrue(maliciousDomains.contains("www.office-112.com"));
-        Assert.assertEquals(numStringsMatch(MockTaxiiService.pollMsg, "DomainNameObj:Value condition=\"Equals\""), maliciousDomains.size());
-        Set<String> maliciousAddresses;
-        {
-            MockHTable table = (MockHTable) provider.getTable(config, "malicious_address");
-            maliciousAddresses= getIndicators(table.getPutLog(), "cf");
-        }
-        Assert.assertTrue(maliciousAddresses.contains("94.102.53.142"));
-        Assert.assertEquals(numStringsMatch(MockTaxiiService.pollMsg, "AddressObj:Address_Value condition=\"Equal\""), maliciousAddresses.size());
-        MockHTable.Provider.clear();
-    }
-
-    private static int numStringsMatch(String xmlBundle, String text) {
-        int cnt = 0;
-        for(String line : Splitter.on("\n").split(xmlBundle)) {
-            if(line.contains(text)) {
-                cnt++;
-            }
-        }
-        return cnt;
-    }
-
-    private static Set<String> getIndicators(Iterable<Put> puts, String cf) throws IOException {
-        ThreatIntelConverter converter = new ThreatIntelConverter();
-        Set<String> ret = new HashSet<>();
-        for(Put p : puts) {
-            ret.add(converter.fromPut(p, cf).getKey().indicator);
-        }
-        return ret;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/pom.xml b/metron-streaming/Metron-Elasticsearch/pom.xml
index ab9242a..d2eade3 100644
--- a/metron-streaming/Metron-Elasticsearch/pom.xml
+++ b/metron-streaming/Metron-Elasticsearch/pom.xml
@@ -27,6 +27,11 @@
     </properties>
     <dependencies>
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_hbase_guava_version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>Metron-Common</artifactId>
             <version>${project.parent.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
index 45631f2..81aeec2 100644
--- a/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
+++ b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -76,7 +76,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
         indexName = sensorEnrichmentConfig.getIndex();
       }
       IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_index_" + indexPostfix,
-              sensorType);
+              sensorType + "_doc");
 
       indexRequestBuilder.setSource(message.toJSONString());
       bulkRequest.add(indexRequestBuilder);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
index 2765c25..4f26365 100644
--- a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
@@ -54,7 +54,7 @@ public class ElasticsearchEnrichmentIntegrationTest extends EnrichmentIntegratio
         if (elasticSearchComponent.hasIndex(index)) {
           List<Map<String, Object>> docsFromDisk;
           try {
-            docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf");
+            docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf_doc");
             docsFromDisk = readDocsFromDisk(hdfsDir);
             System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
           } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/pom.xml b/metron-streaming/Metron-EnrichmentAdapters/pom.xml
index 83e1aed..7399ade 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/pom.xml
+++ b/metron-streaming/Metron-EnrichmentAdapters/pom.xml
@@ -28,7 +28,7 @@
         <mysql.version>5.1.31</mysql.version>
         <slf4j.version>1.7.7</slf4j.version>
         <storm.hdfs.version>0.1.2</storm.hdfs.version>
-        <guava.version>${global_guava_version}</guava.version>
+        <guava.version>${global_hbase_guava_version}</guava.version>
     </properties>
     <dependencies>
         <dependency>
@@ -99,6 +99,7 @@
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>${global_hadoop_version}</version>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <artifactId>servlet-api</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
index 1d0b5c1..73a7ad5 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
@@ -20,12 +20,13 @@ package org.apache.metron.enrichment.adapters.cif;
 
 import java.io.Serializable;
 
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 
-public abstract class AbstractCIFAdapter implements EnrichmentAdapter,Serializable{
+public abstract class AbstractCIFAdapter implements EnrichmentAdapter<CacheKey>,Serializable{
 
 	/**
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
index 1ab3b83..63d6c0b 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
 import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.log4j.Logger;
 
 @SuppressWarnings("unchecked")
-public class CIFHbaseAdapter implements EnrichmentAdapter<String>,Serializable {
+public class CIFHbaseAdapter implements EnrichmentAdapter<CacheKey>,Serializable {
 
 	private static final long serialVersionUID = 1L;
 	private String _tableName;
@@ -55,12 +56,12 @@ public class CIFHbaseAdapter implements EnrichmentAdapter<String>,Serializable {
 			.getLogger(CIFHbaseAdapter.class);
 
 	@Override
-	public void logAccess(String value) {
+	public void logAccess(CacheKey value) {
 
 	}
 
-	public JSONObject enrich(String metadata) {
-
+	public JSONObject enrich(CacheKey k) {
+		String metadata = k.getValue();
 		JSONObject output = new JSONObject();
 		LOGGER.debug("=======Looking Up For:" + metadata);
 		output.putAll(getCIFObject(metadata));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
index 4e48756..5d12a29 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
@@ -19,6 +19,7 @@ package org.apache.metron.enrichment.adapters.geo;
 
 import org.apache.commons.validator.routines.InetAddressValidator;
 import org.apache.metron.enrichment.adapters.jdbc.JdbcAdapter;
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.json.simple.JSONObject;
 
 import java.net.InetAddress;
@@ -29,19 +30,19 @@ public class GeoAdapter extends JdbcAdapter {
   private InetAddressValidator ipvalidator = new InetAddressValidator();
 
   @Override
-  public void logAccess(String value) {
+  public void logAccess(CacheKey value) {
 
   }
 
   @SuppressWarnings("unchecked")
   @Override
-  public JSONObject enrich(String value) {
+  public JSONObject enrich(CacheKey value) {
     JSONObject enriched = new JSONObject();
     try {
-      InetAddress addr = InetAddress.getByName(value);
+      InetAddress addr = InetAddress.getByName(value.getValue());
       if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()
               || addr.isSiteLocalAddress() || addr.isMulticastAddress()
-              || !ipvalidator.isValidInet4Address(value)) {
+              || !ipvalidator.isValidInet4Address(value.getValue())) {
         return new JSONObject();
       }
       String locidQuery = "select IPTOLOCID(\"" + value
@@ -67,8 +68,7 @@ public class GeoAdapter extends JdbcAdapter {
       }
       resultSet.close();
     } catch (Exception e) {
-      e.printStackTrace();
-      _LOG.error("Enrichment failure: " + e);
+      _LOG.error("Enrichment failure: " + e.getMessage(), e);
       return new JSONObject();
     }
     return enriched;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
index 5118e5f..329456f 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
@@ -20,13 +20,14 @@ package org.apache.metron.enrichment.adapters.host;
 
 import java.io.Serializable;
 
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 
-public abstract class AbstractHostAdapter implements EnrichmentAdapter<String>,
+public abstract class AbstractHostAdapter implements EnrichmentAdapter<CacheKey>,
 				Serializable{
 
 	/**
@@ -37,7 +38,7 @@ public abstract class AbstractHostAdapter implements EnrichmentAdapter<String>,
 			.getLogger(AbstractHostAdapter.class);
 	
 	abstract public boolean initializeAdapter();
-	abstract public JSONObject enrich(String metadata);
+	abstract public JSONObject enrich(CacheKey metadata);
 
 	@Override
 	public void cleanup() {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
index c55b918..640e548 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.enrichment.adapters.host;
 
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
@@ -53,14 +54,14 @@ public class HostFromJSONListAdapter extends AbstractHostAdapter {
   }
 
   @Override
-  public void logAccess(String value) {
+  public void logAccess(CacheKey value) {
 
   }
 
   @SuppressWarnings("unchecked")
   @Override
-  public JSONObject enrich(String metadata) {
-
+  public JSONObject enrich(CacheKey k) {
+    String metadata = k.getValue();
 
     if(!_known_hosts.containsKey(metadata))
       return new JSONObject();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
index d7fcfbd..f92bd3f 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
@@ -20,6 +20,7 @@ package org.apache.metron.enrichment.adapters.host;
 
 import java.util.Map;
 
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.json.simple.JSONObject;
 
 @SuppressWarnings("serial")
@@ -43,20 +44,20 @@ public class HostFromPropertiesFileAdapter extends AbstractHostAdapter {
 	}
 
 	@Override
-	public void logAccess(String value) {
+	public void logAccess(CacheKey value) {
 
 	}
 
 	@SuppressWarnings("unchecked")
     @Override
-	public JSONObject enrich(String metadata) {
+	public JSONObject enrich(CacheKey metadata) {
 		
 		
-		if(!_known_hosts.containsKey(metadata))
+		if(!_known_hosts.containsKey(metadata.getValue()))
 			return new JSONObject();
 		
 		JSONObject enrichment = new JSONObject();
-		enrichment.put("known_info", (JSONObject) _known_hosts.get(metadata));
+		enrichment.put("known_info", (JSONObject) _known_hosts.get(metadata.getValue()));
 		return enrichment;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
index 5eabdd2..b21044f 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.enrichment.adapters.jdbc;
 
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,7 +26,7 @@ import java.io.Serializable;
 import java.net.InetAddress;
 import java.sql.*;
 
-public abstract class JdbcAdapter implements EnrichmentAdapter<String>,
+public abstract class JdbcAdapter implements EnrichmentAdapter<CacheKey>,
         Serializable {
 
   protected static final Logger _LOG = LoggerFactory

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
new file mode 100644
index 0000000..2b7d1a0
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.adapters.simplehbase;
+
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentValue;
+import org.apache.metron.hbase.lookup.EnrichmentLookup;
+import org.apache.metron.reference.lookup.LookupKV;
+import org.apache.metron.reference.lookup.accesstracker.NoopAccessTracker;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class SimpleHBaseAdapter implements EnrichmentAdapter<CacheKey>,Serializable {
+  protected static final Logger _LOG = LoggerFactory.getLogger(SimpleHBaseAdapter.class);
+  protected SimpleHBaseConfig config;
+  protected EnrichmentLookup lookup;
+
+  public SimpleHBaseAdapter() {
+  }
+  public SimpleHBaseAdapter(SimpleHBaseConfig config) {
+    withConfig(config);
+  }
+
+  public SimpleHBaseAdapter withConfig(SimpleHBaseConfig config) {
+    this.config = config;
+    return this;
+  }
+
+  @Override
+  public void logAccess(CacheKey value) {
+  }
+
+
+  @Override
+  public JSONObject enrich(CacheKey value) {
+    JSONObject enriched = new JSONObject();
+    List<String> enrichmentTypes = value.getConfig()
+                                        .getFieldToEnrichmentTypeMap()
+                                        .get(EnrichmentUtils.toTopLevelField(value.getField()));
+    if(enrichmentTypes != null && value.getValue() != null) {
+      try {
+        for (LookupKV<EnrichmentKey, EnrichmentValue> kv :
+                lookup.get(Iterables.transform(enrichmentTypes
+                                              , new EnrichmentUtils.TypeToKey(value.getValue())
+                                              )
+                          , lookup.getTable()
+                          , false
+                          )
+            )
+        {
+          if (kv != null && kv.getValue() != null && kv.getValue().getMetadata() != null) {
+            for (Map.Entry<String, String> values : kv.getValue().getMetadata().entrySet()) {
+              enriched.put(kv.getKey().type + "." + values.getKey(), values.getValue());
+            }
+            _LOG.trace("Enriched type " + kv.getKey().type + " => " + enriched);
+          }
+        }
+      }
+      catch (IOException e) {
+        _LOG.error("Unable to retrieve value: " + e.getMessage(), e);
+        throw new RuntimeException("Unable to retrieve value: " + e.getMessage(), e);
+      }
+    }
+    return enriched;
+  }
+
+  @Override
+  public boolean initializeAdapter() {
+    String hbaseTable = config.getHBaseTable();
+    Configuration hbaseConfig = HBaseConfiguration.create();
+    try {
+      lookup = new EnrichmentLookup( config.getProvider().getTable(hbaseConfig, hbaseTable)
+                                   , config.getHBaseCF()
+                                   , new NoopAccessTracker()
+                                   );
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to initialize adapter: " + e.getMessage(), e);
+    }
+    return true;
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      lookup.close();
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to cleanup access tracker", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java
new file mode 100644
index 0000000..fefe008
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseConfig.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.simplehbase;
+
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.Serializable;
+
+
+public class SimpleHBaseConfig implements Serializable {
+  private String hBaseTable;
+  private String hBaseCF;
+  private TableProvider provider = new HTableProvider();
+  public String getHBaseTable() {
+    return hBaseTable;
+  }
+  public String getHBaseCF() {
+    return hBaseCF;
+  }
+
+  public TableProvider getProvider() {
+    return provider;
+  }
+
+  public SimpleHBaseConfig withProviderImpl(String connectorImpl) {
+    provider = EnrichmentUtils.getTableProvider(connectorImpl, new HTableProvider());
+    return this;
+  }
+  public SimpleHBaseConfig withHBaseTable(String hBaseTable) {
+    this.hBaseTable = hBaseTable;
+    return this;
+  }
+
+  public SimpleHBaseConfig withHBaseCF(String cf) {
+    this.hBaseCF= cf;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
deleted file mode 100644
index 9c14cdf..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.adapters.threat;
-
-import java.io.Serializable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-
-public abstract class AbstractThreatAdapter implements EnrichmentAdapter,Serializable{
-
-	
-	private static final long serialVersionUID = 1524030932856141771L;
-	protected static final Logger LOG = LoggerFactory
-			.getLogger(AbstractThreatAdapter.class);
-	
-	abstract public boolean initializeAdapter();
-
-	@Override
-	public void cleanup() {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
deleted file mode 100644
index 1ce99cb..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.adapters.threat;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("unchecked")
-public class ThreatHbaseAdapter implements EnrichmentAdapter<String>,
-				Serializable {
-
-	protected static final org.slf4j.Logger LOG = LoggerFactory
-					.getLogger(ThreatHbaseAdapter.class);
-	private static final long serialVersionUID = 1L;
-	private String _tableName;
-	private HTableInterface table;
-	private String _quorum;
-	private String _port;
-
-	public ThreatHbaseAdapter(String quorum, String port, String tableName) {
-		_quorum = quorum;
-		_port = port;
-		_tableName = tableName;
-	}
-
-	/** The LOGGER. */
-	private static final Logger LOGGER = Logger
-			.getLogger(ThreatHbaseAdapter.class);
-
-	@Override
-	public void logAccess(String value) {
-
-	}
-
-	public JSONObject enrich(String metadata) {
-
-		JSONObject output = new JSONObject();
-		LOGGER.debug("=======Looking Up For:" + metadata);
-		output.putAll(getThreatObject(metadata));
-
-		return output;
-	}
-
-	@SuppressWarnings({ "rawtypes", "deprecation" })
-	protected Map getThreatObject(String key) {
-
-		LOGGER.debug("=======Pinging HBase For:" + key);
-		
-		Get get = new Get(Bytes.toBytes(key));
-		Result rs;
-		Map output = new HashMap();
-
-		try {
-			rs = table.get(get);
-
-			if (!rs.isEmpty()) {
-				byte[] source_family = Bytes.toBytes("source");
-				JSONParser parser = new JSONParser();
-				
-				Map<byte[], byte[]> sourceFamilyMap = rs.getFamilyMap(source_family);
-				
-				for (Map.Entry<byte[], byte[]> entry  : sourceFamilyMap.entrySet()) {
-					String k = Bytes.toString(entry.getKey());
-					LOGGER.debug("=======Found intel from source: " + k);
-					output.put(k,parser.parse(Bytes.toString(entry.getValue())));
-	            }
-			}
-		} catch (IOException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		} catch (ParseException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		return output;
-	}
-
-	@Override
-	public boolean initializeAdapter() {
-
-		// Initialize HBase Table
-		Configuration conf = null;
-		conf = HBaseConfiguration.create();
-		conf.set("hbase.zookeeper.quorum", _quorum);
-		conf.set("hbase.zookeeper.property.clientPort", _port);
-
-		try {
-			LOGGER.debug("=======Connecting to HBASE===========");
-			LOGGER.debug("=======ZOOKEEPER = "
-					+ conf.get("hbase.zookeeper.quorum"));
-			HConnection connection = HConnectionManager.createConnection(conf);
-			table = connection.getTable(_tableName);
-			return true;
-		} catch (IOException e) {
-			// TODO Auto-generated catch block
-			LOGGER.debug("=======Unable to Connect to HBASE===========");
-			e.printStackTrace();
-		}
-
-		return false;
-	}
-
-	@Override
-	public void cleanup() {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
new file mode 100644
index 0000000..c80b57a
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapter.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.threatintel;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+import org.apache.metron.hbase.lookup.EnrichmentLookup;
+import org.apache.metron.reference.lookup.accesstracker.BloomAccessTracker;
+import org.apache.metron.reference.lookup.accesstracker.PersistentAccessTracker;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.UUID;
+
+public class ThreatIntelAdapter implements EnrichmentAdapter<CacheKey>,Serializable {
+  protected static final Logger _LOG = LoggerFactory.getLogger(ThreatIntelAdapter.class);
+  protected ThreatIntelConfig config;
+  protected EnrichmentLookup lookup;
+
+  public ThreatIntelAdapter() {
+  }
+  public ThreatIntelAdapter(ThreatIntelConfig config) {
+    withConfig(config);
+  }
+
+  public ThreatIntelAdapter withConfig(ThreatIntelConfig config) {
+    this.config = config;
+    return this;
+  }
+
+  @Override
+  public void logAccess(CacheKey value) {
+    List<String> enrichmentTypes = value.getConfig().getFieldToThreatIntelTypeMap().get(value.getField());
+    if(enrichmentTypes != null) {
+      for(String enrichmentType : enrichmentTypes) {
+        lookup.getAccessTracker().logAccess(new EnrichmentKey(enrichmentType, value.getValue()));
+      }
+    }
+  }
+
+
+  @Override
+  public JSONObject enrich(CacheKey value) {
+    JSONObject enriched = new JSONObject();
+    List<String> enrichmentTypes = value.getConfig()
+                                        .getFieldToThreatIntelTypeMap()
+                                        .get(EnrichmentUtils.toTopLevelField(value.getField()));
+    if(enrichmentTypes != null) {
+      int i = 0;
+      try {
+        for (Boolean isThreat :
+                lookup.exists(Iterables.transform(enrichmentTypes
+                                                 , new EnrichmentUtils.TypeToKey(value.getValue())
+                                                 )
+                             , lookup.getTable()
+                             , false
+                             )
+            )
+        {
+          String enrichmentType = enrichmentTypes.get(i++);
+          if (isThreat) {
+            enriched.put(enrichmentType, "alert");
+            _LOG.trace("Enriched value => " + enriched);
+          }
+        }
+      }
+      catch(IOException e) {
+        throw new RuntimeException("Unable to retrieve value", e);
+      }
+    }
+    //throw new RuntimeException("Unable to retrieve value " + value);
+    return enriched;
+  }
+
+  @Override
+  public boolean initializeAdapter() {
+    PersistentAccessTracker accessTracker;
+    String hbaseTable = config.getHBaseTable();
+    int expectedInsertions = config.getExpectedInsertions();
+    double falsePositives = config.getFalsePositiveRate();
+    String trackerHBaseTable = config.getTrackerHBaseTable();
+    String trackerHBaseCF = config.getTrackerHBaseCF();
+    long millisecondsBetweenPersist = config.getMillisecondsBetweenPersists();
+    BloomAccessTracker bat = new BloomAccessTracker(hbaseTable, expectedInsertions, falsePositives);
+    Configuration hbaseConfig = HBaseConfiguration.create();
+    try {
+      accessTracker = new PersistentAccessTracker( hbaseTable
+              , UUID.randomUUID().toString()
+              , config.getProvider().getTable(hbaseConfig, trackerHBaseTable)
+              , trackerHBaseCF
+              , bat
+              , millisecondsBetweenPersist
+      );
+      lookup = new EnrichmentLookup(config.getProvider().getTable(hbaseConfig, hbaseTable), config.getHBaseCF(), accessTracker);
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to initialize ThreatIntelAdapter", e);
+    }
+
+    return true;
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      lookup.close();
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to cleanup access tracker", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java
new file mode 100644
index 0000000..ff29aea
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelConfig.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.threatintel;
+
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+
+public class ThreatIntelConfig implements Serializable {
+  public static final long MS_IN_HOUR = 10000*60*60;
+  private String hBaseTable;
+  private String hBaseCF;
+  private double falsePositiveRate = 0.03;
+  private int expectedInsertions = 100000;
+  private String trackerHBaseTable;
+  private String trackerHBaseCF;
+  private long millisecondsBetweenPersists = 2*MS_IN_HOUR;
+  private TableProvider provider = new HTableProvider();
+
+  public String getHBaseTable() {
+    return hBaseTable;
+  }
+
+  public int getExpectedInsertions() {
+    return expectedInsertions;
+  }
+
+  public double getFalsePositiveRate() {
+    return falsePositiveRate;
+  }
+
+  public String getTrackerHBaseTable() {
+    return trackerHBaseTable;
+  }
+
+  public String getTrackerHBaseCF() {
+    return trackerHBaseCF;
+  }
+
+  public long getMillisecondsBetweenPersists() {
+    return millisecondsBetweenPersists;
+  }
+
+  public String getHBaseCF() {
+    return hBaseCF;
+  }
+
+  public TableProvider getProvider() {
+    return provider;
+  }
+
+  public ThreatIntelConfig withProviderImpl(String connectorImpl) {
+    provider = EnrichmentUtils.getTableProvider(connectorImpl, new HTableProvider());
+    return this;
+  }
+
+  public ThreatIntelConfig withTrackerHBaseTable(String hBaseTable) {
+    this.trackerHBaseTable = hBaseTable;
+    return this;
+  }
+
+  public ThreatIntelConfig withTrackerHBaseCF(String cf) {
+    this.trackerHBaseCF = cf;
+    return this;
+  }
+  public ThreatIntelConfig withHBaseTable(String hBaseTable) {
+    this.hBaseTable = hBaseTable;
+    return this;
+  }
+
+  public ThreatIntelConfig withHBaseCF(String cf) {
+    this.hBaseCF= cf;
+    return this;
+  }
+
+  public ThreatIntelConfig withFalsePositiveRate(double falsePositiveRate) {
+    this.falsePositiveRate = falsePositiveRate;
+    return this;
+  }
+
+  public ThreatIntelConfig withExpectedInsertions(int expectedInsertions) {
+    this.expectedInsertions = expectedInsertions;
+    return this;
+  }
+
+  public ThreatIntelConfig withMillisecondsBetweenPersists(long millisecondsBetweenPersists) {
+    this.millisecondsBetweenPersists = millisecondsBetweenPersists;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
index 442e609..8f0f589 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
 
@@ -36,7 +37,7 @@ import org.apache.metron.tldextractor.BasicTldExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class WhoisHBaseAdapter implements EnrichmentAdapter<String>,
+public class WhoisHBaseAdapter implements EnrichmentAdapter<CacheKey>,
 				Serializable {
 
 	protected static final Logger LOG = LoggerFactory
@@ -80,7 +81,7 @@ public class WhoisHBaseAdapter implements EnrichmentAdapter<String>,
 
 			LOG.trace("--------CONNECTED TO TABLE: " + table);
 
-			JSONObject tester = enrich("cisco.com");
+			JSONObject tester = enrich(new CacheKey("whois", "cisco.com", null));
 
 			if (tester.keySet().size() == 0)
 				throw new IOException(
@@ -96,13 +97,13 @@ public class WhoisHBaseAdapter implements EnrichmentAdapter<String>,
 	}
 
 	@Override
-	public void logAccess(String value) {
+	public void logAccess(CacheKey value) {
 
 	}
 
 	@SuppressWarnings({ "unchecked", "deprecation" })
-	public JSONObject enrich(String metadataIn) {
-		
+	public JSONObject enrich(CacheKey k) {
+		String metadataIn = k.getValue();
 		String metadata = tldex.extract2LD(metadataIn);
 
 		LOG.trace("[Metron] Pinging HBase For:" + metadata);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/CacheKey.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/CacheKey.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/CacheKey.java
new file mode 100644
index 0000000..5d5b1e1
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/CacheKey.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.apache.metron.domain.SensorEnrichmentConfig;
+
+public class CacheKey {
+  private String field;
+  private String value;
+  private SensorEnrichmentConfig config;
+
+  public CacheKey(String field, String value, SensorEnrichmentConfig config) {
+    this.field = field;
+    this.value = value;
+    this.config = config;
+  }
+
+  public String getField() {
+    return field;
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public SensorEnrichmentConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public String toString() {
+    return "CacheKey{" +
+            "field='" + field + '\'' +
+            ", value='" + value + '\'' +
+            '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    CacheKey cacheKey = (CacheKey) o;
+
+    if (getField() != null ? !getField().equals(cacheKey.getField()) : cacheKey.getField() != null) return false;
+    if (getValue() != null ? !getValue().equals(cacheKey.getValue()) : cacheKey.getValue() != null) return false;
+    return config != null ? config.equals(cacheKey.config) : cacheKey.config == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = getField() != null ? getField().hashCode() : 0;
+    result = 31 * result + (getValue() != null ? getValue().hashCode() : 0);
+    result = 31 * result + (config != null ? config.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index bfb4d91..9b47d71 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -20,6 +20,7 @@ package org.apache.metron.enrichment.bolt;
 import backtype.storm.task.TopologyContext;
 import org.apache.metron.bolt.JoinBolt;
 import org.apache.metron.domain.Enrichment;
+import org.apache.metron.domain.SensorEnrichmentConfig;
 import org.apache.metron.topology.TopologyUtils;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
@@ -56,8 +57,11 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
   public Set<String> getStreamIds(JSONObject message) {
     Set<String> streamIds = new HashSet<>();
     String sourceType = TopologyUtils.getSensorType(message);
-    for (String enrichmentType : getFieldMap(sourceType).keySet()) {
-      streamIds.add(enrichmentType);
+    Map<String, List<String>>  fieldMap = getFieldMap(sourceType);
+    if(fieldMap != null) {
+      for (String enrichmentType : getFieldMap(sourceType).keySet()) {
+        streamIds.add(enrichmentType);
+      }
     }
     streamIds.add("message");
     return streamIds;
@@ -85,7 +89,19 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
     return message;
   }
 
-  protected Map<String, List<String>> getFieldMap(String sensorType) {
-    return configurations.getSensorEnrichmentConfig(sensorType).getEnrichmentFieldMap();
+  public Map<String, List<String>> getFieldMap(String sourceType) {
+    if(sourceType != null) {
+      SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+      if (config != null) {
+        return config.getEnrichmentFieldMap();
+      }
+      else {
+        LOG.error("Unable to retrieve a sensor enrichment config of " + sourceType);
+      }
+    }
+    else {
+      LOG.error("Trying to retrieve a field map with source type of null");
+    }
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 08b223c..7b76c57 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -30,6 +30,7 @@ import com.google.common.cache.LoadingCache;
 import org.apache.metron.Constants;
 import org.apache.metron.bolt.ConfiguredBolt;
 import org.apache.metron.domain.Enrichment;
+import org.apache.metron.domain.SensorEnrichmentConfig;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.apache.metron.helpers.topology.ErrorUtils;
 import org.json.simple.JSONObject;
@@ -65,13 +66,13 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
           .getLogger(GenericEnrichmentBolt.class);
   private OutputCollector collector;
 
-
   protected String enrichmentType;
-  protected EnrichmentAdapter adapter;
-  protected transient CacheLoader<String, JSONObject> loader;
-  protected transient LoadingCache<String, JSONObject> cache;
+  protected EnrichmentAdapter<CacheKey> adapter;
+  protected transient CacheLoader<CacheKey, JSONObject> loader;
+  protected transient LoadingCache<CacheKey, JSONObject> cache;
   protected Long maxCacheSize;
   protected Long maxTimeRetain;
+  protected boolean invalidateCacheOnReload = false;
 
   public GenericEnrichmentBolt(String zookeeperUrl) {
     super(zookeeperUrl);
@@ -108,9 +109,23 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
     return this;
   }
 
+  public GenericEnrichmentBolt withCacheInvalidationOnReload(boolean cacheInvalidationOnReload) {
+    this.invalidateCacheOnReload= cacheInvalidationOnReload;
+    return this;
+  }
+  @Override
+  protected void reloadCallback() {
+    if(invalidateCacheOnReload) {
+      if (cache != null) {
+        cache.invalidateAll();
+      }
+    }
+  }
+
   @Override
   public void prepare(Map conf, TopologyContext topologyContext,
                       OutputCollector collector) {
+    super.prepare(conf, topologyContext, collector);
     this.collector = collector;
     if (this.maxCacheSize == null)
       throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified");
@@ -118,8 +133,8 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
       throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
     if (this.adapter == null)
       throw new IllegalStateException("Adapter must be specified");
-    loader = new CacheLoader<String, JSONObject>() {
-      public JSONObject load(String key) throws Exception {
+    loader = new CacheLoader<CacheKey, JSONObject>() {
+      public JSONObject load(CacheKey key) throws Exception {
         return adapter.enrich(key);
       }
     };
@@ -144,6 +159,7 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
   public void execute(Tuple tuple) {
     String key = tuple.getStringByField("key");
     JSONObject rawMessage = (JSONObject) tuple.getValueByField("message");
+
     JSONObject enrichedMessage = new JSONObject();
     enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis());
     try {
@@ -151,6 +167,13 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
         throw new Exception("Could not parse binary stream to JSON");
       if (key == null)
         throw new Exception("Key is not valid");
+      String sourceType = null;
+      if(rawMessage.containsKey(Constants.SENSOR_TYPE)) {
+        sourceType = rawMessage.get(Constants.SENSOR_TYPE).toString();
+      }
+      else {
+        throw new RuntimeException("Source type is missing from enrichment fragment: " + rawMessage.toJSONString());
+      }
       for (Object o : rawMessage.keySet()) {
         String field = (String) o;
         String value = (String) rawMessage.get(field);
@@ -159,8 +182,13 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
         } else {
           JSONObject enrichedField = new JSONObject();
           if (value != null && value.length() != 0) {
-            adapter.logAccess(value);
-            enrichedField = cache.getUnchecked(value);
+            SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+            if(config == null) {
+              throw new RuntimeException("Unable to find " + config);
+            }
+            CacheKey cacheKey= new CacheKey(field, value, config);
+            adapter.logAccess(cacheKey);
+            enrichedField = cache.getUnchecked(cacheKey);
             if (enrichedField == null)
               throw new Exception("[Metron] Could not enrich string: "
                       + value);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index 014e0a9..d0bc833 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.enrichment.bolt;
 
+import org.apache.metron.domain.SensorEnrichmentConfig;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,8 +35,15 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
   }
 
   @Override
-  public Map<String, List<String>> getFieldMap(String sensorType) {
-    return configurations.getSensorEnrichmentConfig(sensorType).getThreatIntelFieldMap();
+  public Map<String, List<String>> getFieldMap(String sourceType) {
+    SensorEnrichmentConfig config = configurations.getSensorEnrichmentConfig(sourceType);
+    if(config != null) {
+      return config.getThreatIntelFieldMap();
+    }
+    else {
+      LOG.error("Unable to retrieve sensor config: " + sourceType);
+      return null;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
index 228f844..7a58673 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java
@@ -17,7 +17,16 @@
  */
 package org.apache.metron.enrichment.utils;
 
+import com.google.common.base.Function;
 import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.converters.enrichment.EnrichmentKey;
+
+import javax.annotation.Nullable;
+import java.lang.reflect.InvocationTargetException;
 
 public class EnrichmentUtils {
 
@@ -27,6 +36,46 @@ public class EnrichmentUtils {
     return Joiner.on(".").join(new String[]{KEY_PREFIX, enrichmentName, field});
   }
 
+  public static class TypeToKey implements Function<String, EnrichmentKey> {
+    private final String indicator;
+
+    public TypeToKey(String indicator) {
+      this.indicator = indicator;
+
+    }
+    @Nullable
+    @Override
+    public EnrichmentKey apply(@Nullable String enrichmentType) {
+      return new EnrichmentKey(enrichmentType, indicator);
+    }
+  }
+  public static String toTopLevelField(String field) {
+    if(field == null) {
+      return null;
+    }
+    return Iterables.getLast(Splitter.on('.').split(field));
+  }
 
+  public static TableProvider getTableProvider(String connectorImpl, TableProvider defaultImpl) {
+    if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
+      return defaultImpl;
+    }
+    else {
+      try {
+        Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
+        return clazz.getConstructor().newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalStateException("Unable to instantiate connector.", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
+      } catch (InvocationTargetException e) {
+        throw new IllegalStateException("Unable to instantiate connector", e);
+      } catch (NoSuchMethodException e) {
+        throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException("Unable to instantiate connector: class not found", e);
+      }
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
deleted file mode 100644
index b95f4b8..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.threatintel;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
-import org.apache.metron.reference.lookup.accesstracker.BloomAccessTracker;
-import org.apache.metron.reference.lookup.accesstracker.PersistentAccessTracker;
-import org.apache.metron.threatintel.hbase.ThreatIntelLookup;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.UUID;
-
-public class ThreatIntelAdapter implements EnrichmentAdapter<String>,Serializable {
-    protected static final Logger _LOG = LoggerFactory.getLogger(ThreatIntelAdapter.class);
-    protected ThreatIntelConfig config;
-    protected ThreatIntelLookup lookup;
-
-    public ThreatIntelAdapter() {
-    }
-    public ThreatIntelAdapter(ThreatIntelConfig config) {
-        withConfig(config);
-    }
-
-    public ThreatIntelAdapter withConfig(ThreatIntelConfig config) {
-        this.config = config;
-        return this;
-    }
-
-    @Override
-    public void logAccess(String value) {
-        lookup.getAccessTracker().logAccess(new ThreatIntelKey(value));
-    }
-
-    @Override
-    public JSONObject enrich(String value) {
-        JSONObject enriched = new JSONObject();
-        boolean isThreat = false;
-        try {
-            isThreat = lookup.exists(new ThreatIntelKey(value), lookup.getTable(), false);
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to retrieve value", e);
-        }
-        if(isThreat) {
-            enriched.put(config.getHBaseTable(), "alert");
-            _LOG.trace("Enriched value => " + enriched);
-        }
-        //throw new RuntimeException("Unable to retrieve value " + value);
-        return enriched;
-    }
-
-    @Override
-    public boolean initializeAdapter() {
-        PersistentAccessTracker accessTracker;
-        String hbaseTable = config.getHBaseTable();
-        int expectedInsertions = config.getExpectedInsertions();
-        double falsePositives = config.getFalsePositiveRate();
-        String trackerHBaseTable = config.getTrackerHBaseTable();
-        String trackerHBaseCF = config.getTrackerHBaseCF();
-        long millisecondsBetweenPersist = config.getMillisecondsBetweenPersists();
-        BloomAccessTracker bat = new BloomAccessTracker(hbaseTable, expectedInsertions, falsePositives);
-        Configuration hbaseConfig = HBaseConfiguration.create();
-        try {
-            accessTracker = new PersistentAccessTracker( hbaseTable
-                                                        , UUID.randomUUID().toString()
-                                                        , config.getProvider().getTable(hbaseConfig, trackerHBaseTable)
-                                                        , trackerHBaseCF
-                                                        , bat
-                                                        , millisecondsBetweenPersist
-                                                        );
-            lookup = new ThreatIntelLookup(config.getProvider().getTable(hbaseConfig, hbaseTable), config.getHBaseCF(), accessTracker);
-        } catch (IOException e) {
-            throw new IllegalStateException("Unable to initialize ThreatIntelAdapter", e);
-        }
-
-        return true;
-    }
-
-    @Override
-    public void cleanup() {
-        try {
-            lookup.close();
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to cleanup access tracker", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
deleted file mode 100644
index bb45468..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.threatintel;
-
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
-
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-
-public class ThreatIntelConfig implements Serializable {
-    public static final long MS_IN_HOUR = 10000*60*60;
-    private String hBaseTable;
-    private String hBaseCF;
-    private double falsePositiveRate = 0.03;
-    private int expectedInsertions = 100000;
-    private String trackerHBaseTable;
-    private String trackerHBaseCF;
-    private long millisecondsBetweenPersists = 2*MS_IN_HOUR;
-    private TableProvider provider = new HTableProvider();
-
-    public String getHBaseTable() {
-        return hBaseTable;
-    }
-
-    public int getExpectedInsertions() {
-        return expectedInsertions;
-    }
-
-    public double getFalsePositiveRate() {
-        return falsePositiveRate;
-    }
-
-    public String getTrackerHBaseTable() {
-        return trackerHBaseTable;
-    }
-
-    public String getTrackerHBaseCF() {
-        return trackerHBaseCF;
-    }
-
-    public long getMillisecondsBetweenPersists() {
-        return millisecondsBetweenPersists;
-    }
-
-    public String getHBaseCF() {
-        return hBaseCF;
-    }
-
-    public TableProvider getProvider() {
-        return provider;
-    }
-
-    public ThreatIntelConfig withProviderImpl(String connectorImpl) {
-        if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
-            provider = new HTableProvider();
-        }
-        else {
-            try {
-                Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
-                provider = clazz.getConstructor().newInstance();
-            } catch (InstantiationException e) {
-                throw new IllegalStateException("Unable to instantiate connector.", e);
-            } catch (IllegalAccessException e) {
-                throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
-            } catch (InvocationTargetException e) {
-                throw new IllegalStateException("Unable to instantiate connector", e);
-            } catch (NoSuchMethodException e) {
-                throw new IllegalStateException("Unable to instantiate connector: no such method", e);
-            } catch (ClassNotFoundException e) {
-                throw new IllegalStateException("Unable to instantiate connector: class not found", e);
-            }
-        }
-        return this;
-    }
-
-    public ThreatIntelConfig withTrackerHBaseTable(String hBaseTable) {
-        this.trackerHBaseTable = hBaseTable;
-        return this;
-    }
-
-    public ThreatIntelConfig withTrackerHBaseCF(String cf) {
-        this.trackerHBaseCF = cf;
-        return this;
-    }
-    public ThreatIntelConfig withHBaseTable(String hBaseTable) {
-        this.hBaseTable = hBaseTable;
-        return this;
-    }
-
-    public ThreatIntelConfig withHBaseCF(String cf) {
-        this.hBaseCF= cf;
-        return this;
-    }
-
-    public ThreatIntelConfig withFalsePositiveRate(double falsePositiveRate) {
-        this.falsePositiveRate = falsePositiveRate;
-        return this;
-    }
-
-    public ThreatIntelConfig withExpectedInsertions(int expectedInsertions) {
-        this.expectedInsertions = expectedInsertions;
-        return this;
-    }
-
-    public ThreatIntelConfig withMillisecondsBetweenPersists(long millisecondsBetweenPersists) {
-        this.millisecondsBetweenPersists = millisecondsBetweenPersists;
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
index 6421a21..6ccc21e 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
@@ -19,6 +19,7 @@ package org.apache.metron.enrichment.adapters.cif;
 import java.net.InetAddress;
 import java.util.Properties;
 
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.apache.metron.test.AbstractTestContext;
 import org.junit.Assert;
@@ -174,17 +175,17 @@ public class CIFHbaseAdapterTest extends AbstractTestContext {
     }
 
     /**
-     * Test method for {@link org.apache.metron.enrichment.adapters.cif.CIFHbaseAdapter#enrich(java.lang.String)}.
+     * Test method for {@link org.apache.metron.enrichment.adapters.cif.CIFHbaseAdapter#enrich(CacheKey)}.
      */
     public void testEnrich() {
         if(skipTests(this.getMode())){
             return;//skip tests
        }else{
             cifHbaseAdapter.initializeAdapter();
-            Assert.assertNotNull(cifHbaseAdapter.enrich("testinvalid.metadata"));
+            Assert.assertNotNull(cifHbaseAdapter.enrich(new CacheKey("cif", "testinvalid.metadata", null)));
             
-            Assert.assertNotNull(cifHbaseAdapter.enrich("ivalid.ip"));
-            Assert.assertNotNull(cifHbaseAdapter.enrich("1.1.1.10"));
+            Assert.assertNotNull(cifHbaseAdapter.enrich(new CacheKey("cif", "ivalid.ip", null)));
+            Assert.assertNotNull(cifHbaseAdapter.enrich(new CacheKey("cif", "1.1.1.10", null)));
        }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
index 9a0fe2a..8898c64 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
@@ -21,6 +21,7 @@ import java.util.Properties;
 
 import org.apache.metron.enrichment.adapters.jdbc.JdbcAdapter;
 import org.apache.metron.enrichment.adapters.jdbc.MySqlConfig;
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
 
@@ -113,7 +114,7 @@ public class GeoMysqlAdapterTest extends AbstractSchemaTest {
        }else{
            
          try {           
-                JSONObject json = geoMySqlAdapter.enrich("72.163.4.161");
+                JSONObject json = geoMySqlAdapter.enrich(new CacheKey("dummy", "72.163.4.161", null));
                 
                 //assert Geo Response is not null
                 System.out.println("json ="+json);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
index a56f5ba..ccf2ea3 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
@@ -19,6 +19,7 @@ package org.apache.metron.enrichment.adapters.whois;
 import java.net.InetAddress;
 import java.util.Properties;
 
+import org.apache.metron.enrichment.bolt.CacheKey;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
 
@@ -125,13 +126,13 @@ public class WhoisHBaseAdapterTest extends AbstractTestContext {
     }
 
     /**
-     * Test method for {@link org.apache.metron.enrichment.adapters.whois.WhoisHBaseAdapter#enrich(java.lang.String)}.
+     * Test method for {@link org.apache.metron.enrichment.adapters.whois.WhoisHBaseAdapter#enrich(CacheKey)}.
      */
     public void testEnrich() {
         if(skipTests(this.getMode())){
             return;//skip tests
        }else{
-            JSONObject json = whoisHbaseAdapter.enrich("72.163.4.161");
+            JSONObject json = whoisHbaseAdapter.enrich(new CacheKey("whois", "72.163.4.161", null));
             
             //assert Geo Response is not null
             Assert.assertNotNull(json);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-MessageParsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/pom.xml b/metron-streaming/Metron-MessageParsers/pom.xml
index a697aa8..2687213 100644
--- a/metron-streaming/Metron-MessageParsers/pom.xml
+++ b/metron-streaming/Metron-MessageParsers/pom.xml
@@ -37,6 +37,17 @@
 			<version>${project.parent.version}</version>
 		</dependency>
 		<dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+		<dependency>
 			<groupId>org.apache.storm</groupId>
 			<artifactId>storm-core</artifactId>
 			<version>${global_storm_version}</version>
@@ -56,11 +67,12 @@
 			<groupId>junit</groupId>
 			<artifactId>junit</artifactId>
 			<version>${global_junit_version}</version>
+			<scope>test</scope>
 		</dependency>
 		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
-			<version>${global_guava_version}</version>
+			<version>${global_hbase_guava_version}</version>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9621c55e/metron-streaming/Metron-Solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/pom.xml b/metron-streaming/Metron-Solr/pom.xml
index cbb7395..925c219 100644
--- a/metron-streaming/Metron-Solr/pom.xml
+++ b/metron-streaming/Metron-Solr/pom.xml
@@ -27,6 +27,11 @@
     </properties>
     <dependencies>
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_hbase_guava_version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>Metron-Common</artifactId>
             <version>${project.parent.version}</version>


Mime
View raw message