metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [3/5] incubator-metron git commit: METRON-35 Implement threat intelligence message enrichment closes apache/incubator-metron#22
Date Tue, 16 Feb 2016 17:10:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
new file mode 100644
index 0000000..0673bb3
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
@@ -0,0 +1,62 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.accesstracker.AccessTracker;
+import org.apache.metron.reference.lookup.accesstracker.AccessTrackerUtil;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class PrunerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
+    public static final String ACCESS_TRACKER_TABLE_CONF = "access_tracker_table";
+    public static final String ACCESS_TRACKER_CF_CONF = "access_tracker_cf";
+    public static final String TIMESTAMP_CONF = "access_tracker_timestamp";
+    public static final String ACCESS_TRACKER_NAME_CONF = "access_tracker_name";
+    AccessTracker tracker;
+    @Override
+    public void setup(Context context) throws IOException
+    {
+        String atTable = context.getConfiguration().get(ACCESS_TRACKER_TABLE_CONF);
+        String atCF = context.getConfiguration().get(ACCESS_TRACKER_CF_CONF);
+        String atName = context.getConfiguration().get(ACCESS_TRACKER_NAME_CONF);
+        HTable table = new HTable(context.getConfiguration(), atTable);
+        long timestamp = context.getConfiguration().getLong(TIMESTAMP_CONF, -1);
+        if(timestamp < 0) {
+            throw new IllegalStateException("Must specify a timestamp that is positive.");
+        }
+        try {
+            tracker = AccessTrackerUtil.INSTANCE.loadAll(AccessTrackerUtil.INSTANCE.loadAll(table, atCF, atName, timestamp));
+        } catch (Exception e) {
+            throw new IllegalStateException("Unable to load the accesstrackers from the directory", e);
+        }
+    }
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
+        if(tracker == null || key == null) {
+            throw new RuntimeException("Tracker = " + tracker + " key = " + key);
+        }
+        if(!tracker.hasSeen(toLookupKey(key.get()))) {
+            Delete d = new Delete(key.get());
+            context.write(key, d);
+        }
+    }
+
+    protected LookupKey toLookupKey(final byte[] bytes) {
+        return new LookupKey() {
+            @Override
+            public byte[] toBytes() {
+                return bytes;
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/resources/hbase-site.xml b/metron-streaming/Metron-DataLoads/src/main/resources/hbase-site.xml
deleted file mode 100644
index a73469d..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/resources/hbase-site.xml
+++ /dev/null
@@ -1,100 +0,0 @@
-<!--Tue Feb 11 02:34:08 2014 -->
-<configuration>
-
-	<property>
-		<name>hbase.regionserver.global.memstore.lowerLimit</name>
-		<value>0.38</value>
-	</property>
-	<property>
-		<name>zookeeper.session.timeout</name>
-		<value>30000</value>
-	</property>
-
-	<property>
-		<name>hbase.security.authorization</name>
-		<value>false</value>
-	</property>
-	<property>
-		<name>hbase.cluster.distributed</name>
-		<value>true</value>
-	</property>
-	
-	<property>
-		<name>hbase.hstore.flush.retries.number</name>
-		<value>120</value>
-	</property>
-	<property>
-		<name>hbase.hregion.memstore.block.multiplier</name>
-		<value>4</value>
-	</property>
-	<property>
-		<name>hbase.hstore.blockingStoreFiles</name>
-		<value>200</value>
-	</property>
-	<property>
-		<name>hbase.defaults.for.version.skip</name>
-		<value>true</value>
-	</property>
-	<property>
-		<name>hbase.regionserver.global.memstore.upperLimit</name>
-		<value>0.4</value>
-	</property>
-	<property>
-		<name>hbase.hregion.memstore.mslab.enabled</name>
-		<value>true</value>
-	</property>
-	<property>
-		<name>hbase.client.keyvalue.maxsize</name>
-		<value>10485760</value>
-	</property>
-	<property>
-		<name>hbase.superuser</name>
-		<value>hbase</value>
-	</property>
-	<property>
-		<name>hfile.block.cache.size</name>
-		<value>0.40</value>
-	</property>
-	<property>
-		<name>zookeeper.znode.parent</name>
-		<value>/hbase-unsecure</value>
-	</property>
-	<property>
-		<name>hbase.hregion.max.filesize</name>
-		<value>10737418240</value>
-	</property>
-	<property>
-		<name>hbase.zookeeper.property.clientPort</name>
-		<value>2181</value>
-	</property>
-	<property>
-		<name>hbase.security.authentication</name>
-		<value>simple</value>
-	</property>
-	<property>
-		<name>hbase.client.scanner.caching</name>
-		<value>100</value>
-	</property>
-	<property>
-		<name>hbase.hregion.memstore.flush.size</name>
-		<value>134217728</value>
-	</property>
-	<property>
-		<name>hbase.hregion.majorcompaction</name>
-		<value>86400000</value>
-	</property>
-	 <property>
-      <name>hbase.zookeeper.property.clientPort</name>
-      <value>2181</value>
-    </property>
-
-    <property>
-      <name>hbase.zookeeper.quorum</name>
-      <value>zkpr1</value>
-    </property>
-
-	<property>
-		<name>hbase.client.write.buffer</name>
-		<value>500000000</value>
-	</property>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
new file mode 100644
index 0000000..e949cc7
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/ExtractorTest.java
@@ -0,0 +1,61 @@
+package org.apache.metron.dataloads.extractor;
+
+import com.google.common.collect.Iterables;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class ExtractorTest {
+    public static class DummyExtractor implements Extractor
+    {
+
+        @Override
+        public Iterable<ThreatIntelResults> extract(String line) throws IOException {
+            ThreatIntelKey key = new ThreatIntelKey();
+            key.indicator = "dummy";
+            Map<String, String> value = new HashMap<>();
+            value.put("indicator", "dummy");
+            return Arrays.asList(new ThreatIntelResults(key, value));
+        }
+
+        @Override
+        public void initialize(Map<String, Object> config) {
+
+        }
+    }
+    @Test
+    public void testDummyExtractor() throws IllegalAccessException, InstantiationException, ClassNotFoundException, IOException {
+        Extractor extractor = Extractors.create(DummyExtractor.class.getName());
+        ThreatIntelResults results = Iterables.getFirst(extractor.extract(null), null);
+        Assert.assertEquals("dummy", results.getKey().indicator);
+        Assert.assertEquals("dummy", results.getValue().get("indicator"));
+    }
+
+    @Test
+    public void testExtractionLoading() throws Exception {
+        /**
+         config:
+         {
+            "config" : {}
+            ,"extractor" : "org.apache.metron.dataloads.extractor.ExtractorTest$DummyExtractor"
+         }
+         */
+        String config = "{\n" +
+                "            \"config\" : {}\n" +
+                "            ,\"extractor\" : \"org.apache.metron.dataloads.extractor.ExtractorTest$DummyExtractor\"\n" +
+                "         }";
+        ExtractorHandler handler = ExtractorHandler.load(config);
+        ThreatIntelResults results = Iterables.getFirst(handler.getExtractor().extract(null), null);
+        Assert.assertEquals("dummy", results.getKey().indicator);
+        Assert.assertEquals("dummy", results.getValue().get("indicator"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java
new file mode 100644
index 0000000..923cbb5
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java
@@ -0,0 +1,102 @@
+package org.apache.metron.dataloads.extractor.csv;
+
+import com.google.common.collect.Iterables;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class CSVExtractorTest {
+    @Test
+    public void testCSVExtractorMapColumns() throws Exception {
+        /**
+         {
+            "config" : {
+                        "columns" : {
+                                "host" : 0
+                                ,"meta" : 2
+                                    }
+                       ,"indicator_column" : "host"
+                       ,"separator" : ","
+                       }
+            ,"extractor" : "CSV"
+         }
+         */
+        String config = "{\n" +
+                "            \"config\" : {\n" +
+                "                        \"columns\" : [\"host:0\",\"meta:2\"]\n" +
+                "                       ,\"indicator_column\" : \"host\"\n" +
+                "                       ,\"separator\" : \",\"\n" +
+                "                       }\n" +
+                "            ,\"extractor\" : \"CSV\"\n" +
+                "         }";
+        ExtractorHandler handler = ExtractorHandler.load(config);
+        validate(handler);
+    }
+    @Test
+    public void testCSVExtractorListColumns() throws Exception {
+        /**
+         {
+            "config" : {
+                        "columns" : ["host:0","meta:2"]
+                       ,"indicator_column" : "host"
+                       ,"separator" : ","
+                       }
+            ,"extractor" : "CSV"
+         }
+         */
+        String config = "{\n" +
+                "            \"config\" : {\n" +
+                "                        \"columns\" : [\"host:0\",\"meta:2\"]\n" +
+                "                       ,\"indicator_column\" : \"host\"\n" +
+                "                       ,\"separator\" : \",\"\n" +
+                "                       }\n" +
+                "            ,\"extractor\" : \"CSV\"\n" +
+                "         }";
+        ExtractorHandler handler = ExtractorHandler.load(config);
+        validate(handler);
+    }
+
+    @Test
+    public void testCSVExtractor() throws Exception {
+        /**
+         {
+            "config" : {
+                        "columns" : "host:0,meta:2"
+                       ,"indicator_column" : "host"
+                       ,"separator" : ","
+                       }
+            ,"extractor" : "CSV"
+         }
+         */
+        String config = "{\n" +
+                "            \"config\" : {\n" +
+                "                        \"columns\" : \"host:0,meta:2\"\n" +
+                "                       ,\"indicator_column\" : \"host\"\n" +
+                "                       ,\"separator\" : \",\"\n" +
+                "                       }\n" +
+                "            ,\"extractor\" : \"CSV\"\n" +
+                "         }";
+        ExtractorHandler handler = ExtractorHandler.load(config);
+        validate(handler);
+    }
+
+    public void validate(ExtractorHandler handler) throws IOException {
+        {
+            ThreatIntelResults results = Iterables.getFirst(handler.getExtractor().extract("google.com,1.0,foo"), null);
+            Assert.assertEquals("google.com", results.getKey().indicator);
+            Assert.assertEquals("google.com", results.getValue().get("host"));
+            Assert.assertEquals("foo", results.getValue().get("meta"));
+            Assert.assertEquals(2, results.getValue().size());
+        }
+        {
+            Iterable<ThreatIntelResults> results = handler.getExtractor().extract("#google.com,1.0,foo");
+            Assert.assertEquals(0, Iterables.size(results));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java
new file mode 100644
index 0000000..d362241
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/extractor/stix/StixExtractorTest.java
@@ -0,0 +1,185 @@
+package org.apache.metron.dataloads.extractor.stix;
+
+import com.google.common.collect.Iterables;
+import org.apache.metron.dataloads.ThreatIntelBulkLoader;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Created by cstella on 2/9/16.
+ */
+public class StixExtractorTest {
+    @Test
+    public void testStixAddresses() throws Exception {
+        /**
+         <!--
+         STIX IP Watchlist Example
+
+         Copyright (c) 2015, The MITRE Corporation. All rights reserved.
+         The contents of this file are subject to the terms of the STIX License located at http://stix.mitre.org/about/termsofuse.html.
+
+         This example demonstrates a simple usage of STIX to represent a list of IP address indicators (watchlist of IP addresses). Cyber operations and malware analysis centers often share a list of suspected malicious IP addresses with information about what those IPs might indicate. This STIX package represents a list of three IP addresses with a short dummy description of what they represent.
+
+         It demonstrates the use of:
+
+         * STIX Indicators
+         * CybOX within STIX
+         * The CybOX Address Object (IP)
+         * CybOX Patterns (apply_condition="ANY")
+         * Controlled vocabularies
+
+         Created by Mark Davidson
+         -->
+         <stix:STIX_Package
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns:stix="http://stix.mitre.org/stix-1"
+         xmlns:indicator="http://stix.mitre.org/Indicator-2"
+         xmlns:cybox="http://cybox.mitre.org/cybox-2"
+         xmlns:AddressObject="http://cybox.mitre.org/objects#AddressObject-2"
+         xmlns:cyboxVocabs="http://cybox.mitre.org/default_vocabularies-2"
+         xmlns:stixVocabs="http://stix.mitre.org/default_vocabularies-1"
+         xmlns:example="http://example.com/"
+         id="example:STIXPackage-33fe3b22-0201-47cf-85d0-97c02164528d"
+         timestamp="2014-05-08T09:00:00.000000Z"
+         version="1.2">
+         <stix:STIX_Header>
+         <stix:Title>Example watchlist that contains IP information.</stix:Title>
+         <stix:Package_Intent xsi:type="stixVocabs:PackageIntentVocab-1.0">Indicators - Watchlist</stix:Package_Intent>
+         </stix:STIX_Header>
+         <stix:Indicators>
+         <stix:Indicator xsi:type="indicator:IndicatorType" id="example:Indicator-33fe3b22-0201-47cf-85d0-97c02164528d" timestamp="2014-05-08T09:00:00.000000Z">
+         <indicator:Type xsi:type="stixVocabs:IndicatorTypeVocab-1.1">IP Watchlist</indicator:Type>
+         <indicator:Description>Sample IP Address Indicator for this watchlist. This contains one indicator with a set of three IP addresses in the watchlist.</indicator:Description>
+         <indicator:Observable  id="example:Observable-1c798262-a4cd-434d-a958-884d6980c459">
+         <cybox:Object id="example:Object-1980ce43-8e03-490b-863a-ea404d12242e">
+         <cybox:Properties xsi:type="AddressObject:AddressObjectType" category="ipv4-addr">
+         <AddressObject:Address_Value condition="Equals" apply_condition="ANY">10.0.0.0##comma##10.0.0.1##comma##10.0.0.2</AddressObject:Address_Value>
+         </cybox:Properties>
+         </cybox:Object>
+         </indicator:Observable>
+         </stix:Indicator>
+         </stix:Indicators>
+         </stix:STIX_Package>
+
+
+         */
+        String stixDoc = "<!--\n" +
+                "STIX IP Watchlist Example\n" +
+                "\n" +
+                "Copyright (c) 2015, The MITRE Corporation. All rights reserved.\n" +
+                "The contents of this file are subject to the terms of the STIX License located at http://stix.mitre.org/about/termsofuse.html.\n" +
+                "\n" +
+                "This example demonstrates a simple usage of STIX to represent a list of IP address indicators (watchlist of IP addresses). Cyber operations and malware analysis centers often share a list of suspected malicious IP addresses with information about what those IPs might indicate. This STIX package represents a list of three IP addresses with a short dummy description of what they represent.\n" +
+                "\n" +
+                "It demonstrates the use of:\n" +
+                "\n" +
+                "* STIX Indicators\n" +
+                "* CybOX within STIX\n" +
+                "* The CybOX Address Object (IP)\n" +
+                "* CybOX Patterns (apply_condition=\"ANY\")\n" +
+                "* Controlled vocabularies\n" +
+                "\n" +
+                "Created by Mark Davidson\n" +
+                "-->\n" +
+                "<stix:STIX_Package\n" +
+                "    xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n" +
+                "    xmlns:stix=\"http://stix.mitre.org/stix-1\"\n" +
+                "    xmlns:indicator=\"http://stix.mitre.org/Indicator-2\"\n" +
+                "    xmlns:cybox=\"http://cybox.mitre.org/cybox-2\"\n" +
+                "    xmlns:AddressObject=\"http://cybox.mitre.org/objects#AddressObject-2\"\n" +
+                "    xmlns:cyboxVocabs=\"http://cybox.mitre.org/default_vocabularies-2\"\n" +
+                "    xmlns:stixVocabs=\"http://stix.mitre.org/default_vocabularies-1\"\n" +
+                "    xmlns:example=\"http://example.com/\"\n" +
+                "    id=\"example:STIXPackage-33fe3b22-0201-47cf-85d0-97c02164528d\"\n" +
+                "    timestamp=\"2014-05-08T09:00:00.000000Z\"\n" +
+                "    version=\"1.2\">\n" +
+                "    <stix:STIX_Header>\n" +
+                "        <stix:Title>Example watchlist that contains IP information.</stix:Title>\n" +
+                "        <stix:Package_Intent xsi:type=\"stixVocabs:PackageIntentVocab-1.0\">Indicators - Watchlist</stix:Package_Intent>\n" +
+                "    </stix:STIX_Header>\n" +
+                "    <stix:Indicators>\n" +
+                "        <stix:Indicator xsi:type=\"indicator:IndicatorType\" id=\"example:Indicator-33fe3b22-0201-47cf-85d0-97c02164528d\" timestamp=\"2014-05-08T09:00:00.000000Z\">\n" +
+                "            <indicator:Type xsi:type=\"stixVocabs:IndicatorTypeVocab-1.1\">IP Watchlist</indicator:Type>\n" +
+                "            <indicator:Description>Sample IP Address Indicator for this watchlist. This contains one indicator with a set of three IP addresses in the watchlist.</indicator:Description>\n" +
+                "            <indicator:Observable  id=\"example:Observable-1c798262-a4cd-434d-a958-884d6980c459\">\n" +
+                "                <cybox:Object id=\"example:Object-1980ce43-8e03-490b-863a-ea404d12242e\">\n" +
+                "                    <cybox:Properties xsi:type=\"AddressObject:AddressObjectType\" category=\"ipv4-addr\">\n" +
+                "                        <AddressObject:Address_Value condition=\"Equals\" apply_condition=\"ANY\">10.0.0.0##comma##10.0.0.1##comma##10.0.0.2</AddressObject:Address_Value>\n" +
+                "                    </cybox:Properties>\n" +
+                "                </cybox:Object>\n" +
+                "            </indicator:Observable>\n" +
+                "        </stix:Indicator>\n" +
+                "    </stix:Indicators>\n" +
+                "</stix:STIX_Package>\n" +
+                "\n";
+        {
+            /**
+             {
+             "config" : {
+             "stix_address_categories" : "IPV_4_ADDR"
+             }
+             ,"extractor" : "STIX"
+             }
+             */
+            String config = "{\n" +
+                    "            \"config\" : {\n" +
+                    "                       \"stix_address_categories\" : \"IPV_4_ADDR\"\n" +
+                    "                       }\n" +
+                    "            ,\"extractor\" : \"STIX\"\n" +
+                    "         }";
+            ExtractorHandler handler = ExtractorHandler.load(config);
+            Extractor extractor = handler.getExtractor();
+            Iterable<ThreatIntelResults> results = extractor.extract(stixDoc);
+            Assert.assertEquals(3, Iterables.size(results));
+            Assert.assertEquals("10.0.0.0", Iterables.get(results, 0).getKey().indicator);
+            Assert.assertEquals("10.0.0.1", Iterables.get(results, 1).getKey().indicator);
+            Assert.assertEquals("10.0.0.2", Iterables.get(results, 2).getKey().indicator);
+        }
+        {
+            /**
+             {
+             "config" : {
+             }
+             ,"extractor" : "STIX"
+             }
+             */
+            String config = "{\n" +
+                    "            \"config\" : {\n" +
+                    "                       }\n" +
+                    "            ,\"extractor\" : \"STIX\"\n" +
+                    "         }";
+            ExtractorHandler handler = ExtractorHandler.load(config);
+            Extractor extractor = handler.getExtractor();
+            Iterable<ThreatIntelResults> results = extractor.extract(stixDoc);
+            Assert.assertEquals(3, Iterables.size(results));
+            Assert.assertEquals("10.0.0.0", Iterables.get(results, 0).getKey().indicator);
+            Assert.assertEquals("10.0.0.1", Iterables.get(results, 1).getKey().indicator);
+            Assert.assertEquals("10.0.0.2", Iterables.get(results, 2).getKey().indicator);
+        }
+        {
+            /**
+             {
+             "config" : {
+                "stix_address_categories" : "IPV_6_ADDR"
+             }
+             ,"extractor" : "STIX"
+             }
+             */
+            String config = "{\n" +
+                    "            \"config\" : {\n" +
+                    "                       \"stix_address_categories\" : \"IPV_6_ADDR\"\n" +
+                    "                       }\n" +
+                    "            ,\"extractor\" : \"STIX\"\n" +
+                    "         }";
+            ExtractorHandler handler = ExtractorHandler.load(config);
+            Extractor extractor = handler.getExtractor();
+            Iterable<ThreatIntelResults> results = extractor.extract(stixDoc);
+            Assert.assertEquals(0, Iterables.size(results));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/HBaseConverterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/HBaseConverterTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/HBaseConverterTest.java
new file mode 100644
index 0000000..25cdbc7
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/HBaseConverterTest.java
@@ -0,0 +1,53 @@
+package org.apache.metron.dataloads.hbase;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.threatintel.hbase.Converter;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class HBaseConverterTest {
+    ThreatIntelKey key = new ThreatIntelKey("google");
+    Map<String, String> value = new HashMap<String, String>() {{
+        put("foo", "bar");
+        put("grok", "baz");
+    }};
+    Long timestamp = 7L;
+    ThreatIntelResults results = new ThreatIntelResults(key, value);
+    @Test
+    public void testKeySerialization() {
+        byte[] serialized = key.toBytes();
+        ThreatIntelKey deserialized = ThreatIntelKey.fromBytes(serialized);
+        Assert.assertEquals(key, deserialized);
+    }
+
+    @Test
+    public void testPut() throws IOException {
+        Put put = Converter.INSTANCE.toPut("cf", key, value, timestamp);
+        Map.Entry<ThreatIntelResults, Long> converted= Converter.INSTANCE.fromPut(put, "cf");
+        Assert.assertEquals(new AbstractMap.SimpleEntry<>(results, timestamp), converted);
+    }
+    @Test
+    public void testResult() throws IOException {
+        Result r = Converter.INSTANCE.toResult("cf", key, value, timestamp);
+        Map.Entry<ThreatIntelResults, Long> converted= Converter.INSTANCE.fromResult(r, "cf");
+        Assert.assertEquals(new AbstractMap.SimpleEntry<>(results, timestamp), converted);
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        Get get = Converter.INSTANCE.toGet("cf", key);
+        Assert.assertArrayEquals(key.toBytes(), get.getRow());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
new file mode 100644
index 0000000..179337e
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
@@ -0,0 +1,91 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.metron.dataloads.ThreatIntelBulkLoader;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.apache.metron.threatintel.hbase.Converter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public class BulkLoadMapperIntegrationTest {
+  /** The test util. */
+  private HBaseTestingUtility testUtil;
+
+  /** The test table. */
+  private HTable testTable;
+  String tableName = "malicious_domains";
+  String cf = "cf";
+  Configuration config = null;
+  @Before
+  public void setup() throws Exception {
+    Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
+    config = kv.getValue();
+    testUtil = kv.getKey();
+    testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+  }
+
+  @After
+  public void teardown() throws Exception {
+    HBaseUtil.INSTANCE.teardown(testUtil);
+  }
+
+  @Test
+  public void test() throws IOException, ClassNotFoundException, InterruptedException {
+ /**
+         {
+            "config" : {
+                        "columns" : {
+                                "host" : 0
+                                ,"meta" : 2
+                                    }
+                       ,"indicator_column" : "host"
+                       ,"separator" : ","
+                       }
+            ,"extractor" : "CSV"
+         }
+         */
+        final String extractorConfig = "{\n" +
+                "            \"config\" : {\n" +
+                "                        \"columns\" : [\"host:0\",\"meta:2\"]\n" +
+                "                       ,\"indicator_column\" : \"host\"\n" +
+                "                       ,\"separator\" : \",\"\n" +
+                "                       }\n" +
+                "            ,\"extractor\" : \"CSV\"\n" +
+                "         }";
+    Assert.assertNotNull(testTable);
+    FileSystem fs = FileSystem.get(config);
+    String contents = "google.com,1,foo";
+    HBaseUtil.INSTANCE.writeFile(contents, new Path("input.csv"), fs);
+    Job job = ThreatIntelBulkLoader.createJob(config, "input.csv", tableName, cf, extractorConfig, 0L);
+    Assert.assertTrue(job.waitForCompletion(true));
+    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+    List<Map.Entry<ThreatIntelResults, Long>> results = new ArrayList<>();
+    for(Result r : scanner) {
+      results.add(Converter.INSTANCE.fromResult(r, cf));
+    }
+    Assert.assertEquals(1, results.size());
+    Assert.assertEquals(0L, (long)results.get(0).getValue());
+    Assert.assertEquals(results.get(0).getKey().getKey().indicator, "google.com");
+    Assert.assertEquals(results.get(0).getKey().getValue().size(), 2);
+    Assert.assertEquals(results.get(0).getKey().getValue().get("meta"), "foo");
+    Assert.assertEquals(results.get(0).getKey().getValue().get("host"), "google.com");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java
new file mode 100644
index 0000000..1d30b5c
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperTest.java
@@ -0,0 +1,78 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.metron.threatintel.ThreatIntelResults;
+import org.apache.metron.threatintel.hbase.Converter;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/3/16.
+ */
+public class BulkLoadMapperTest {
+    @Test
+    public void testMapper() throws IOException, InterruptedException {
+        /**
+         {
+            "config" : {
+                        "columns" : {
+                                "host" : 0
+                                ,"meta" : 2
+                                    }
+                       ,"indicator_column" : "host"
+                       ,"separator" : ","
+                       }
+            ,"extractor" : "CSV"
+         }
+         */
+        final String extractorConfig = "{\n" +
+                "            \"config\" : {\n" +
+                "                        \"columns\" : [\"host:0\",\"meta:2\"]\n" +
+                "                       ,\"indicator_column\" : \"host\"\n" +
+                "                       ,\"separator\" : \",\"\n" +
+                "                       }\n" +
+                "            ,\"extractor\" : \"CSV\"\n" +
+                "         }";
+
+        final Map<ImmutableBytesWritable, Put> puts = new HashMap<>();
+        BulkLoadMapper mapper = new BulkLoadMapper() {
+            @Override
+            protected void write(ImmutableBytesWritable key, Put value, Context context) throws IOException, InterruptedException {
+                puts.put(key, value);
+            }
+        };
+        mapper.initialize(new Configuration() {{
+            set(BulkLoadMapper.COLUMN_FAMILY_KEY, "cf");
+            set(BulkLoadMapper.CONFIG_KEY, extractorConfig);
+            set(BulkLoadMapper.LAST_SEEN_KEY, "0");
+        }});
+        {
+            mapper.map(null, new Text("#google.com,1,foo"), null);
+            Assert.assertTrue(puts.size() == 0);
+        }
+        {
+            mapper.map(null, new Text("google.com,1,foo"), null);
+            Assert.assertTrue(puts.size() == 1);
+            ThreatIntelKey expectedKey = new ThreatIntelKey() {{
+                indicator = "google.com";
+            }};
+            Put put = puts.get(new ImmutableBytesWritable(expectedKey.toBytes()));
+            Assert.assertNotNull(puts);
+            Map.Entry<ThreatIntelResults, Long> results = Converter.INSTANCE.fromPut(put, "cf");
+            Assert.assertEquals(0L, (long)results.getValue());
+            Assert.assertEquals(results.getKey().getKey().indicator, "google.com");
+            Assert.assertEquals(results.getKey().getValue().size(), 2);
+            Assert.assertEquals(results.getKey().getValue().get("meta"), "foo");
+            Assert.assertEquals(results.getKey().getValue().get("host"), "google.com");
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java
new file mode 100644
index 0000000..d243a65
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/HBaseUtil.java
@@ -0,0 +1,58 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+
+import java.io.*;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/5/16.
+ */
+public enum HBaseUtil {
+    INSTANCE;
+    public Map.Entry<HBaseTestingUtility,Configuration> create(boolean startMRCluster) throws Exception {
+        Configuration config = HBaseConfiguration.create();
+        config.set("hbase.master.hostname", "localhost");
+        config.set("hbase.regionserver.hostname", "localhost");
+        HBaseTestingUtility testUtil = new HBaseTestingUtility(config);
+
+        testUtil.startMiniCluster(1);
+        if(startMRCluster) {
+            testUtil.startMiniMapReduceCluster();
+        }
+        return new AbstractMap.SimpleEntry<>(testUtil, config);
+    }
+    public void writeFile(String contents, Path filename, FileSystem fs) throws IOException {
+        FSDataOutputStream os = fs.create(filename, true);
+        PrintWriter pw = new PrintWriter(new OutputStreamWriter(os));
+        pw.print(contents);
+        pw.flush();
+        os.close();
+    }
+
+    public String readFile(FileSystem fs, Path filename) throws IOException {
+        FSDataInputStream in = fs.open(filename);
+        BufferedReader br = new BufferedReader(new InputStreamReader(in));
+        List<String> contents = new ArrayList<>();
+        for(String line = null;(line = br.readLine()) != null;) {
+            contents.add(line);
+        }
+        return Joiner.on('\n').join(contents);
+    }
+
+    public void teardown(HBaseTestingUtility testUtil) throws Exception {
+        testUtil.shutdownMiniMapReduceCluster();
+        testUtil.shutdownMiniCluster();
+        testUtil.cleanupTestDir();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
new file mode 100644
index 0000000..3979224
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
@@ -0,0 +1,124 @@
+package org.apache.metron.dataloads.hbase.mr;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.metron.dataloads.LeastRecentlyUsedPruner;
+import org.apache.metron.reference.lookup.LookupKey;
+import org.apache.metron.reference.lookup.accesstracker.AccessTrackerUtil;
+import org.apache.metron.reference.lookup.accesstracker.BloomAccessTracker;
+import org.apache.metron.reference.lookup.accesstracker.PersistentAccessTracker;
+import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.threatintel.hbase.Converter;
+import org.apache.metron.threatintel.hbase.ThreatIntelLookup;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by cstella on 2/8/16.
+ */
+public class LeastRecentlyUsedPrunerIntegrationTest {
+    /** The test util. */
+    private HBaseTestingUtility testUtil;
+
+    /** The test table. */
+    private HTable testTable;
+    private HTable atTable;
+    String tableName = "malicious_domains";
+    String cf = "cf";
+    String atTableName = "access_trackers";
+    String atCF= "cf";
+    Configuration config = null;
+    @Before
+    public void setup() throws Exception {
+        Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
+        config = kv.getValue();
+        testUtil = kv.getKey();
+        testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+        atTable = testUtil.createTable(Bytes.toBytes(atTableName), Bytes.toBytes(atCF));
+    }
+    @After
+    public void teardown() throws Exception {
+        HBaseUtil.INSTANCE.teardown(testUtil);
+    }
+    public List<LookupKey> getKeys(int start, int end) {
+        List<LookupKey> keys = new ArrayList<>();
+        for(int i = start;i < end;++i) {
+            keys.add(new ThreatIntelKey("key-" + i));
+        }
+        return keys;
+    }
+    @Test
+    public void test() throws Exception {
+        long ts = System.currentTimeMillis();
+        BloomAccessTracker bat = new BloomAccessTracker("tracker1", 100, 0.03);
+        PersistentAccessTracker pat = new PersistentAccessTracker(tableName, "0", atTable, atCF, bat, 0L);
+        ThreatIntelLookup lookup = new ThreatIntelLookup(testTable, cf, pat);
+        List<LookupKey> goodKeysHalf = getKeys(0, 5);
+        List<LookupKey> goodKeysOtherHalf = getKeys(5, 10);
+        Iterable<LookupKey> goodKeys = Iterables.concat(goodKeysHalf, goodKeysOtherHalf);
+        List<LookupKey> badKey = getKeys(10, 11);
+        for(LookupKey k : goodKeysHalf) {
+            testTable.put(Converter.INSTANCE.toPut(cf, (ThreatIntelKey) k
+                                                  , new HashMap<String, String>() {{
+                                                    put("k", "dummy");
+                                                    }}
+                                                  , 1L
+                                                  )
+                         );
+            Assert.assertTrue(lookup.exists((ThreatIntelKey)k, testTable, true));
+        }
+        pat.persist(true);
+        for(LookupKey k : goodKeysOtherHalf) {
+            testTable.put(Converter.INSTANCE.toPut(cf, (ThreatIntelKey) k
+                                                  , new HashMap<String, String>() {{
+                                                    put("k", "dummy");
+                                                    }}
+                                                  , 1L
+                                                  )
+                         );
+            Assert.assertTrue(lookup.exists((ThreatIntelKey)k, testTable, true));
+        }
+        testUtil.flush();
+        Assert.assertFalse(lookup.getAccessTracker().hasSeen(goodKeysHalf.get(0)));
+        for(LookupKey k : goodKeysOtherHalf) {
+            Assert.assertTrue(lookup.getAccessTracker().hasSeen(k));
+        }
+        pat.persist(true);
+        {
+            testTable.put(Converter.INSTANCE.toPut(cf, (ThreatIntelKey) badKey.get(0)
+                    , new HashMap<String, String>() {{
+                        put("k", "dummy");
+                    }}
+                    , 1L
+                    )
+            );
+        }
+        testUtil.flush();
+        Assert.assertFalse(lookup.getAccessTracker().hasSeen(badKey.get(0)));
+
+
+        Job job = LeastRecentlyUsedPruner.createJob(config, tableName, cf, atTableName, atCF, ts);
+        Assert.assertTrue(job.waitForCompletion(true));
+        for(LookupKey k : goodKeys) {
+            Assert.assertTrue(lookup.exists((ThreatIntelKey)k, testTable, true));
+        }
+        for(LookupKey k : badKey) {
+            Assert.assertFalse(lookup.exists((ThreatIntelKey)k, testTable, true));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-DataServices/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataServices/src/main/resources/hbase-site.xml b/metron-streaming/Metron-DataServices/src/main/resources/hbase-site.xml
deleted file mode 100644
index 5c3c819..0000000
--- a/metron-streaming/Metron-DataServices/src/main/resources/hbase-site.xml
+++ /dev/null
@@ -1,127 +0,0 @@
-<!--Tue Apr  1 18:16:39 2014-->
-  <configuration>
-    <property>
-    <name>hbase.tmp.dir</name>
-    <value>/disk/h/hbase</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.chunkpool.maxsize</name>
-    <value>0.5</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.codecs</name>
-    <value>lzo,gz,snappy</value>
-  </property>
-    <property>
-    <name>hbase.hstore.flush.retries.number</name>
-    <value>120</value>
-  </property>
-    <property>
-    <name>hbase.client.keyvalue.maxsize</name>
-    <value>10485760</value>
-  </property>
-    <property>
-    <name>hbase.rootdir</name>
-    <value>hdfs://nn1:8020/apps/hbase/data</value>
-  </property>
-    <property>
-    <name>hbase.defaults.for.version.skip</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.client.scanner.caching</name>
-    <value>100</value>
-  </property>
-    <property>
-    <name>hbase.superuser</name>
-    <value>hbase</value>
-  </property>
-    <property>
-    <name>hfile.block.cache.size</name>
-    <value>0.40</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.checksum.verify</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.mslab.enabled</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.hregion.max.filesize</name>
-    <value>107374182400</value>
-  </property>
-    <property>
-    <name>hbase.cluster.distributed</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>zookeeper.session.timeout</name>
-    <value>30000</value>
-  </property>
-    <property>
-    <name>zookeeper.znode.parent</name>
-    <value>/hbase-unsecure</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.global.memstore.lowerLimit</name>
-    <value>0.38</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.handler.count</name>
-    <value>240</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.mslab.chunksize</name>
-    <value>8388608</value>
-  </property>
-    <property>
-    <name>hbase.zookeeper.quorum</name>
-    <value>zkpr1,zkpr2,zkpr3</value>
-  </property>
-    <property>
-    <name>hbase.zookeeper.useMulti</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.hregion.majorcompaction</name>
-    <value>86400000</value>
-  </property>
-    <property>
-    <name>hbase.hstore.blockingStoreFiles</name>
-    <value>200</value>
-  </property>
-    <property>
-    <name>hbase.zookeeper.property.clientPort</name>
-    <value>2181</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.flush.size</name>
-    <value>134217728</value>
-  </property>
-    <property>
-    <name>hbase.security.authorization</name>
-    <value>false</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.global.memstore.upperLimit</name>
-    <value>0.4</value>
-  </property>
-    <property>
-    <name>hbase.hstore.compactionThreshold</name>
-    <value>4</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.block.multiplier</name>
-    <value>8</value>
-  </property>
-    <property>
-    <name>hbase.security.authentication</name>
-    <value>simple</value>
-  </property>
-    <property>
-    <name>dfs.client.read.shortcircuit</name>
-    <value>true</value>
-  </property>
-  </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/pom.xml b/metron-streaming/Metron-EnrichmentAdapters/pom.xml
index a37a032..45d068f 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/pom.xml
+++ b/metron-streaming/Metron-EnrichmentAdapters/pom.xml
@@ -10,156 +10,160 @@
 	the specific language governing permissions and limitations under the License. -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.metron</groupId>
-		<artifactId>Metron-Streaming</artifactId>
-		<version>0.6BETA</version>
-	</parent>
-	<artifactId>Metron-EnrichmentAdapters</artifactId>
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>Metron-Streaming</artifactId>
+        <version>0.6BETA</version>
+    </parent>
+    <artifactId>Metron-EnrichmentAdapters</artifactId>
 
-	<properties>
-       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>		
-		<mysql.version>5.1.31</mysql.version>
-		<slf4j.version>1.7.7</slf4j.version>
-		<hbase.client.version>0.96.1-hadoop2</hbase.client.version>
-		<storm.hdfs.version>0.1.2</storm.hdfs.version>
-    <guava.version>${global_guava_version}</guava.version>
-	</properties>
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.metron</groupId>
-			<artifactId>Metron-Common</artifactId>
-			<version>${project.parent.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-api</artifactId>
-			<version>${slf4j.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>mysql</groupId>
-			<artifactId>mysql-connector-java</artifactId>
-			<version>${mysql.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hbase</groupId>
-			<artifactId>hbase-client</artifactId>
-			<version>${hbase.client.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-hdfs</artifactId>
-			<version>${global_hadoop_version}</version>
-			  <exclusions>
-				<exclusion>
-				   <artifactId>servlet-api</artifactId>
-				   <groupId>javax.servlet</groupId>
-				  </exclusion>
-		    </exclusions>					
-		</dependency>
-		<dependency>
-			<groupId>org.apache.storm</groupId>
-			<artifactId>storm-core</artifactId>
-			<version>${global_storm_version}</version>
-			<scope>provided</scope>
-			  <exclusions>
-				<exclusion>
-				   <artifactId>servlet-api</artifactId>
-				   <groupId>javax.servlet</groupId>
-				  </exclusion>
-		    </exclusions>					
-		</dependency>
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-common</artifactId>
-			<version>${global_hadoop_version}</version>
-			<exclusions>
-				<exclusion>
-				   <artifactId>servlet-api</artifactId>
-				   <groupId>javax.servlet</groupId>
-				  </exclusion>
-		    </exclusions>			
-		</dependency>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>${global_junit_version}</version>
-		</dependency>
-		<dependency>
-			<groupId>commons-validator</groupId>
-			<artifactId>commons-validator</artifactId>
-			<version>1.4.0</version>
-		</dependency>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <mysql.version>5.1.31</mysql.version>
+        <slf4j.version>1.7.7</slf4j.version>
+        <storm.hdfs.version>0.1.2</storm.hdfs.version>
+        <guava.version>${global_guava_version}</guava.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${global_junit_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-validator</groupId>
+            <artifactId>commons-validator</artifactId>
+            <version>1.4.0</version>
+        </dependency>
 
-	</dependencies>
-	<reporting>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<systemProperties>
-						<property>
-							<name>mode</name>
-							<value>global</value>
-						</property>
-					</systemProperties>
-				</configuration>
-			</plugin>
-			<!-- Normally, dependency report takes time, skip it -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-project-info-reports-plugin</artifactId>
-				<version>2.7</version>
+    </dependencies>
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemProperties>
+                        <property>
+                            <name>mode</name>
+                            <value>global</value>
+                        </property>
+                    </systemProperties>
+                </configuration>
+            </plugin>
+            <!-- Normally, dependency report takes time, skip it -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-project-info-reports-plugin</artifactId>
+                <version>2.7</version>
 
-				<configuration>
-					<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>emma-maven-plugin</artifactId>
-				<version>1.0-alpha-3</version>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-pmd-plugin</artifactId>
-				<configuration>
-					<targetJdk>1.7</targetJdk>
-				</configuration>
-			</plugin>
-		</plugins>
-	</reporting>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.1</version>
-				<configuration>
-					<source>1.7</source>
-					<target>1.7</target>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
+                <configuration>
+                    <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>emma-maven-plugin</artifactId>
+                <version>1.0-alpha-3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-pmd-plugin</artifactId>
+                <configuration>
+                    <targetJdk>1.7</targetJdk>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
index bfe9ef6..2ebf1b0 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
@@ -53,6 +53,11 @@ public class CIFHbaseAdapter implements EnrichmentAdapter<String>,Serializable {
 	private static final Logger LOGGER = Logger
 			.getLogger(CIFHbaseAdapter.class);
 
+	@Override
+	public void logAccess(String value) {
+
+	}
+
 	public JSONObject enrich(String metadata) {
 
 		JSONObject output = new JSONObject();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
index 89721a7..93ec2ef 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/geo/GeoAdapter.java
@@ -11,6 +11,11 @@ public class GeoAdapter extends JdbcAdapter {
 
   private InetAddressValidator ipvalidator = new InetAddressValidator();
 
+  @Override
+  public void logAccess(String value) {
+
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public JSONObject enrich(String value) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
index 260d878..f4822de 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapter.java
@@ -35,6 +35,11 @@ public class HostFromJSONListAdapter extends AbstractHostAdapter {
       return false;
   }
 
+  @Override
+  public void logAccess(String value) {
+
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public JSONObject enrich(String metadata) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
index 4e4586e..4ae0329 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
@@ -32,7 +32,7 @@ public class HostFromPropertiesFileAdapter extends AbstractHostAdapter {
 	}
 
 	@Override
-	public boolean initializeAdapter() 
+	public boolean initializeAdapter()
 	{
 		
 		if(_known_hosts.size() > 0)
@@ -41,6 +41,11 @@ public class HostFromPropertiesFileAdapter extends AbstractHostAdapter {
 			return false;
 	}
 
+	@Override
+	public void logAccess(String value) {
+
+	}
+
 	@SuppressWarnings("unchecked")
     @Override
 	public JSONObject enrich(String metadata) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
index ca4f8ea..83cb856 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/jdbc/JdbcAdapter.java
@@ -32,8 +32,8 @@ public abstract class JdbcAdapter implements EnrichmentAdapter<String>,
       if (!InetAddress.getByName(host).isReachable(500)) {
         throw new Exception("Unable to reach host " + host);
       }
-      Class.forName(config.getClassName());
-      connection = DriverManager.getConnection(config.getJdbcUrl());
+      Class.forName(this.config.getClassName());
+      connection = DriverManager.getConnection(this.config.getJdbcUrl());
       connection.setReadOnly(true);
       if (!connection.isValid(0))
         throw new Exception("Invalid connection string....");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
index cfd47c8..fde0cbd 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
@@ -19,9 +19,7 @@ package org.apache.metron.enrichment.adapters.threat;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
@@ -36,7 +34,6 @@ import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.log4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +59,11 @@ public class ThreatHbaseAdapter implements EnrichmentAdapter<String>,
 	private static final Logger LOGGER = Logger
 			.getLogger(ThreatHbaseAdapter.class);
 
+	@Override
+	public void logAccess(String value) {
+
+	}
+
 	public JSONObject enrich(String metadata) {
 
 		JSONObject output = new JSONObject();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
index bbcd0bd..ce2efee 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
@@ -19,7 +19,6 @@ package org.apache.metron.enrichment.adapters.whois;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -32,7 +31,6 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
 
-import com.google.common.base.Joiner;
 import org.apache.metron.tldextractor.BasicTldExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +94,11 @@ public class WhoisHBaseAdapter implements EnrichmentAdapter<String>,
 
 	}
 
+	@Override
+	public void logAccess(String value) {
+
+	}
+
 	@SuppressWarnings({ "unchecked", "deprecation" })
 	public JSONObject enrich(String metadataIn) {
 		

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index f16c2fb..2f530a9 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -19,6 +19,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
 
   protected List<Enrichment> enrichments;
 
+  protected String type = "enrichment";
   /**
    * @param enrichments A class for sending tuples to enrichment bolt
    * @return Instance of this class
@@ -28,6 +29,11 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
     return this;
   }
 
+  public EnrichmentJoinBolt withType(String type) {
+    this.type = type;
+    return this;
+  }
+
   @Override
   public void prepare(Map map, TopologyContext topologyContext) {
 
@@ -46,12 +52,17 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
   @Override
   public JSONObject joinValues(Map<String, JSONObject> streamValueMap) {
     JSONObject message = new JSONObject();
-    message.put("message", streamValueMap.get("message"));
+    if(streamValueMap.get("message").containsKey("message")) {
+      message =  streamValueMap.get("message");
+    }
+    else {
+      message.put("message", streamValueMap.get("message"));
+    }
     JSONObject enrichment = new JSONObject();
     for(String streamId: getStreamIds()) {
       enrichment.put(streamId, streamValueMap.get(streamId));
     }
-    message.put("enrichment", enrichment);
+    message.put(type, enrichment);
     return message;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 1563652..72c7f51 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -21,9 +21,11 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import backtype.storm.topology.base.BaseRichBolt;
+import com.google.common.base.Splitter;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterables;
 import org.apache.metron.domain.Enrichment;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.json.simple.JSONObject;
@@ -79,8 +81,7 @@ public class GenericEnrichmentBolt extends BaseRichBolt {
    * @return Instance of this class
    */
 
-  public GenericEnrichmentBolt withEnrichment
-  (Enrichment<EnrichmentAdapter> enrichment) {
+  public GenericEnrichmentBolt withEnrichment(Enrichment<EnrichmentAdapter> enrichment) {
     this.streamId = enrichment.getName();
     this.enrichment = enrichment;
     this.adapter = this.enrichment.getAdapter();
@@ -132,18 +133,17 @@ public class GenericEnrichmentBolt extends BaseRichBolt {
             .build(loader);
     boolean success = adapter.initializeAdapter();
     if (!success) {
-      LOG.error("[Metron] EnrichmentBolt could not initialize adapter");
+      LOG.error("[Metron] EnrichmentSplitterBolt could not initialize adapter");
       throw new IllegalStateException("Could not initialize adapter...");
     }
   }
 
   @Override
-  public void declareOutputFields(OutputFieldsDeclarer declearer) {
-    declearer.declareStream(streamId, new Fields("key", "message"));
-    declearer.declareStream("error", new Fields("message"));
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream(streamId, new Fields("key", "message"));
+    declarer.declareStream("error", new Fields("message"));
   }
 
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
@@ -159,18 +159,19 @@ public class GenericEnrichmentBolt extends BaseRichBolt {
         JSONObject enrichedField = new JSONObject();
         String value = (String) rawMessage.get(field);
         if (value != null && value.length() != 0) {
+          adapter.logAccess(value);
           enrichedField = cache.getUnchecked(value);
           if (enrichedField == null)
             throw new Exception("[Metron] Could not enrich string: "
                     + value);
         }
-        enrichedMessage.put(field, enrichedField);
+        enrichedMessage.put(Iterables.getLast(Splitter.on('/').split(field)), enrichedField);
       }
       if (!enrichedMessage.isEmpty()) {
         collector.emit(streamId, new Values(key, enrichedMessage));
       }
     } catch (Exception e) {
-      LOG.error("[Metron] Unable to enrich message: " + rawMessage);
+      LOG.error("[Metron] Unable to enrich message: " + rawMessage, e);
       JSONObject error = ErrorGenerator.generateErrorMessage("Enrichment problem: " + rawMessage, e);
       if (key != null) {
         collector.emit(streamId, new Values(key, enrichedMessage));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
new file mode 100644
index 0000000..3a6b4cb
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
@@ -0,0 +1,97 @@
+package org.apache.metron.threatintel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.reference.lookup.Lookup;
+import org.apache.metron.reference.lookup.accesstracker.BloomAccessTracker;
+import org.apache.metron.reference.lookup.accesstracker.PersistentAccessTracker;
+import org.apache.metron.threatintel.hbase.ThreatIntelLookup;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Created by cstella on 2/10/16.
+ */
+public class ThreatIntelAdapter implements EnrichmentAdapter<String>,Serializable {
+    protected static final Logger _LOG = LoggerFactory.getLogger(ThreatIntelAdapter.class);
+    protected ThreatIntelConfig config;
+    protected ThreatIntelLookup lookup;
+
+    public ThreatIntelAdapter() {
+    }
+    public ThreatIntelAdapter(ThreatIntelConfig config) {
+        withConfig(config);
+    }
+
+    public ThreatIntelAdapter withConfig(ThreatIntelConfig config) {
+        this.config = config;
+        return this;
+    }
+
+    @Override
+    public void logAccess(String value) {
+        lookup.getAccessTracker().logAccess(new ThreatIntelKey(value));
+    }
+
+    @Override
+    public JSONObject enrich(String value) {
+        JSONObject enriched = new JSONObject();
+        boolean isThreat = false;
+        try {
+            isThreat = lookup.exists(new ThreatIntelKey(value), lookup.getTable(), false);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to retrieve value", e);
+        }
+        if(isThreat) {
+            enriched.put("threat_source", config.getHBaseTable());
+            _LOG.trace("Enriched value => " + enriched);
+        }
+        //throw new RuntimeException("Unable to retrieve value " + value);
+        return enriched;
+    }
+
+    @Override
+    public boolean initializeAdapter() {
+        PersistentAccessTracker accessTracker;
+        String hbaseTable = config.getHBaseTable();
+        int expectedInsertions = config.getExpectedInsertions();
+        double falsePositives = config.getFalsePositiveRate();
+        String trackerHBaseTable = config.getTrackerHBaseTable();
+        String trackerHBaseCF = config.getTrackerHBaseCF();
+        long millisecondsBetweenPersist = config.getMillisecondsBetweenPersists();
+        BloomAccessTracker bat = new BloomAccessTracker(hbaseTable, expectedInsertions, falsePositives);
+        Configuration hbaseConfig = HBaseConfiguration.create();
+        try {
+            accessTracker = new PersistentAccessTracker( hbaseTable
+                                                        , UUID.randomUUID().toString()
+                                                        , config.getProvider().getTable(hbaseConfig, trackerHBaseTable)
+                                                        , trackerHBaseCF
+                                                        , bat
+                                                        , millisecondsBetweenPersist
+                                                        );
+            lookup = new ThreatIntelLookup(config.getProvider().getTable(hbaseConfig, hbaseTable), config.getHBaseCF(), accessTracker);
+        } catch (IOException e) {
+            throw new IllegalStateException("Unable to initialize ThreatIntelAdapter", e);
+        }
+
+        return true;
+    }
+
+    @Override
+    public void cleanup() {
+        try {
+            lookup.close();
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to cleanup access tracker", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
new file mode 100644
index 0000000..cb1f5e3
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelConfig.java
@@ -0,0 +1,111 @@
+package org.apache.metron.threatintel;
+
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Created by cstella on 2/10/16.
+ */
+public class ThreatIntelConfig implements Serializable {
+    public static final long MS_IN_HOUR = 10000*60*60;
+    private String hBaseTable;
+    private String hBaseCF;
+    private double falsePositiveRate = 0.03;
+    private int expectedInsertions = 100000;
+    private String trackerHBaseTable;
+    private String trackerHBaseCF;
+    private long millisecondsBetweenPersists = 2*MS_IN_HOUR;
+    private TableProvider provider = new HTableProvider();
+
+    public String getHBaseTable() {
+        return hBaseTable;
+    }
+
+    public int getExpectedInsertions() {
+        return expectedInsertions;
+    }
+
+    public double getFalsePositiveRate() {
+        return falsePositiveRate;
+    }
+
+    public String getTrackerHBaseTable() {
+        return trackerHBaseTable;
+    }
+
+    public String getTrackerHBaseCF() {
+        return trackerHBaseCF;
+    }
+
+    public long getMillisecondsBetweenPersists() {
+        return millisecondsBetweenPersists;
+    }
+
+    public String getHBaseCF() {
+        return hBaseCF;
+    }
+
+    public TableProvider getProvider() {
+        return provider;
+    }
+
+    public ThreatIntelConfig withProviderImpl(String connectorImpl) {
+        if(connectorImpl == null || connectorImpl.length() == 0 || connectorImpl.charAt(0) == '$') {
+            provider = new HTableProvider();
+        }
+        else {
+            try {
+                Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
+                provider = clazz.getConstructor().newInstance();
+            } catch (InstantiationException e) {
+                throw new IllegalStateException("Unable to instantiate connector.", e);
+            } catch (IllegalAccessException e) {
+                throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
+            } catch (InvocationTargetException e) {
+                throw new IllegalStateException("Unable to instantiate connector", e);
+            } catch (NoSuchMethodException e) {
+                throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalStateException("Unable to instantiate connector: class not found", e);
+            }
+        }
+        return this;
+    }
+
+    public ThreatIntelConfig withTrackerHBaseTable(String hBaseTable) {
+        this.trackerHBaseTable = hBaseTable;
+        return this;
+    }
+
+    public ThreatIntelConfig withTrackerHBaseCF(String cf) {
+        this.trackerHBaseCF = cf;
+        return this;
+    }
+    public ThreatIntelConfig withHBaseTable(String hBaseTable) {
+        this.hBaseTable = hBaseTable;
+        return this;
+    }
+
+    public ThreatIntelConfig withHBaseCF(String cf) {
+        this.hBaseCF= cf;
+        return this;
+    }
+
+    public ThreatIntelConfig withFalsePositiveRate(double falsePositiveRate) {
+        this.falsePositiveRate = falsePositiveRate;
+        return this;
+    }
+
+    public ThreatIntelConfig withExpectedInsertions(int expectedInsertions) {
+        this.expectedInsertions = expectedInsertions;
+        return this;
+    }
+
+    public ThreatIntelConfig withMillisecondsBetweenPersists(long millisecondsBetweenPersists) {
+        this.millisecondsBetweenPersists = millisecondsBetweenPersists;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml b/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml
deleted file mode 100644
index 8d812a9..0000000
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml
+++ /dev/null
@@ -1,131 +0,0 @@
-<!--Tue Apr  1 18:16:39 2014-->
-  <configuration>
-    <property>
-    <name>hbase.tmp.dir</name>
-    <value>/disk/h/hbase</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.chunkpool.maxsize</name>
-    <value>0.5</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.codecs</name>
-    <value>lzo,gz,snappy</value>
-  </property>
-    <property>
-    <name>hbase.hstore.flush.retries.number</name>
-    <value>120</value>
-  </property>
-    <property>
-    <name>hbase.client.keyvalue.maxsize</name>
-    <value>10485760</value>
-  </property>
-    <property>
-    <name>hbase.rootdir</name>
-    <value>hdfs://nn1:8020/apps/hbase/data</value>
-  </property>
-    <property>
-    <name>hbase.defaults.for.version.skip</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.client.scanner.caching</name>
-    <value>100</value>
-  </property>
-    <property>
-    <name>hbase.superuser</name>
-    <value>hbase</value>
-  </property>
-    <property>
-    <name>hfile.block.cache.size</name>
-    <value>0.40</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.checksum.verify</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.mslab.enabled</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.hregion.max.filesize</name>
-    <value>107374182400</value>
-  </property>
-    <property>
-    <name>hbase.cluster.distributed</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>zookeeper.session.timeout</name>
-    <value>30000</value>
-  </property>
-    <property>
-    <name>zookeeper.znode.parent</name>
-    <value>/hbase-unsecure</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.global.memstore.lowerLimit</name>
-    <value>0.38</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.handler.count</name>
-    <value>240</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.mslab.chunksize</name>
-    <value>8388608</value>
-  </property>
-    <property>
-    <name>hbase.zookeeper.quorum</name>
-    <value>zkpr1,zkpr2,zkpr3</value>
-  </property>
-    <property>
-    <name>hbase.zookeeper.useMulti</name>
-    <value>true</value>
-  </property>
-    <property>
-    <name>hbase.hregion.majorcompaction</name>
-    <value>86400000</value>
-  </property>
-    <property>
-    <name>hbase.hstore.blockingStoreFiles</name>
-    <value>200</value>
-  </property>
-    <property>
-    <name>hbase.zookeeper.property.clientPort</name>
-    <value>2181</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.flush.size</name>
-    <value>134217728</value>
-  </property>
-    <property>
-    <name>hbase.security.authorization</name>
-    <value>false</value>
-  </property>
-    <property>
-    <name>hbase.regionserver.global.memstore.upperLimit</name>
-    <value>0.4</value>
-  </property>
-    <property>
-    <name>hbase.hstore.compactionThreshold</name>
-    <value>4</value>
-  </property>
-    <property>
-    <name>hbase.hregion.memstore.block.multiplier</name>
-    <value>8</value>
-  </property>
-    <property>
-    <name>hbase.security.authentication</name>
-    <value>simple</value>
-  </property>
-    <property>
-    <name>dfs.client.read.shortcircuit</name>
-    <value>true</value>
-  </property>
-  <property>
-    <name>dfs.domain.socket.path</name>
-    <value>/var/run/hdfs/dn_socket</value>
-  </property>
-  </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1ddfd12c/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
index 4a39edb..eb4491d 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/org/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
@@ -21,8 +21,8 @@ package org.apache.metron.enrichment.adapters.cif;
 import java.net.InetAddress;
 import java.util.Properties;
 
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
 import org.apache.metron.test.AbstractTestContext;
-import org.apache.metron.enrichment.adapters.cif.CIFHbaseAdapter;
 import org.junit.Assert;
 
 
@@ -121,7 +121,7 @@ public class CIFHbaseAdapterTest extends AbstractTestContext {
     }
 
     /**
-     * Test method for {@link org.apache.metron.enrichment.adapters.cif.CIFHbaseAdapter#initializeAdapter()}.
+     * Test method for {@link EnrichmentAdapter#initializeAdapter()}.
      */
     public void testInitializeAdapter() {
         if(skipTests(this.getMode())){



Mime
View raw message