metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [4/4] metron git commit: METRON-1460: Create a complementary non-split-join enrichment topology closes apache/metron#940
Date Wed, 07 Mar 2018 14:17:04 GMT
METRON-1460: Create a complementary non-split-join enrichment topology closes apache/metron#940


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/1d95b831
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/1d95b831
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/1d95b831

Branch: refs/heads/master
Commit: 1d95b8316a18097747be116a0276c56b894fb79c
Parents: 486be49
Author: cstella <cestella@gmail.com>
Authored: Wed Mar 7 09:16:45 2018 -0500
Committer: cstella <cestella@gmail.com>
Committed: Wed Mar 7 09:16:45 2018 -0500

----------------------------------------------------------------------
 dependencies_with_url.csv                       |   4 +-
 .../docker/rpm-docker/SPECS/metron.spec         |   1 +
 .../enrichment/handler/ConfigHandler.java       |   4 +
 .../common/message/BytesFromPosition.java       |   4 +-
 .../message/JSONFromFieldByReference.java       |  37 ++
 .../metron/common/message/JSONFromPosition.java |   4 +-
 .../metron/common/message/MessageGetters.java   |   1 +
 metron-platform/metron-enrichment/README.md     |  45 ++
 metron-platform/metron-enrichment/pom.xml       |   6 +
 .../main/flux/enrichment/remote-unified.yaml    | 378 +++++++++++++++++
 .../adapters/stellar/StellarAdapter.java        |   5 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java  |  12 +-
 .../enrichment/bolt/ThreatIntelJoinBolt.java    | 115 +-----
 .../enrichment/bolt/UnifiedEnrichmentBolt.java  | 412 +++++++++++++++++++
 .../enrichment/parallel/ConcurrencyContext.java |  96 +++++
 .../enrichment/parallel/EnrichmentCallable.java |  66 +++
 .../enrichment/parallel/EnrichmentContext.java  |  43 ++
 .../parallel/EnrichmentStrategies.java          | 108 +++++
 .../enrichment/parallel/EnrichmentStrategy.java |  71 ++++
 .../enrichment/parallel/ParallelEnricher.java   | 281 +++++++++++++
 .../parallel/WorkerPoolStrategies.java          |  45 ++
 .../enrichment/utils/EnrichmentUtils.java       |  16 +
 .../enrichment/utils/ThreatIntelUtils.java      | 127 ++++++
 .../integration/EnrichmentIntegrationTest.java  |  42 +-
 .../UnifiedEnrichmentIntegrationTest.java       |  25 ++
 .../parallel/ParallelEnricherTest.java          | 157 +++++++
 .../unified_enrichment_arch.svg                 |  14 +
 .../unified_enrichment_arch_diagram.xml         |  14 +
 28 files changed, 1983 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index a1f431b..e2b947b 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -341,7 +341,7 @@ org.eclipse.persistence:org.eclipse.persistence.asm:jar:2.6.4:compile,EPL 1.0,ht
 org.eclipse.persistence:org.eclipse.persistence.core:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
 org.eclipse.persistence:org.eclipse.persistence.jpa.jpql:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
 org.eclipse.persistence:org.eclipse.persistence.jpa:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
-
+com.github.ben-manes.caffeine:caffeine:jar:2.6.2:compile,ASLv2,https://github.com/ben-manes/caffeine/blob/v2.6.2/LICENSE
 com.google.code.gson:gson:jar:2.2:compile
   org.codehaus.plexus:plexus-classworlds:jar:2.4:compile
   org.codehaus.plexus:plexus-component-annotations:jar:1.5.5:compile
@@ -356,4 +356,4 @@ com.google.code.gson:gson:jar:2.2:compile
   org.sonatype.aether:aether-util:jar:1.12:compile
   org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile
   org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile
-  org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
\ No newline at end of file
+  org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 0c2fff9..265d595 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -258,6 +258,7 @@ This package installs the Metron Enrichment files
 %{metron_home}/config/zookeeper/enrichments/yaf.json
 %{metron_home}/config/zookeeper/enrichments/asa.json
 %{metron_home}/flux/enrichment/remote.yaml
+%{metron_home}/flux/enrichment/remote-unified.yaml
 %attr(0644,root,root) %{metron_home}/lib/metron-enrichment-%{full_version}-uber.jar
 
 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
index 11a4852..369ba8c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
@@ -19,6 +19,10 @@ package org.apache.metron.common.configuration.enrichment.handler;
 
 import java.util.*;
 
+/**
+ * This is the core logic of how to configure enrichments.  The default type of enrichment configuration is a simple list
+ * however more complex enrichment adapters require more complex configuration (e.g. stellar).
+ */
 public class ConfigHandler {
   private Object config;
   private Configs type = Configs.LIST;

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
index b73228f..56c6490 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
@@ -25,8 +25,8 @@ public class BytesFromPosition implements MessageGetStrategy {
 
   public BytesFromPosition() {};
 
-  public BytesFromPosition(int position) {
-    this.position = position;
+  public BytesFromPosition(Integer position) {
+    this.position = position == null?0:position;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
new file mode 100644
index 0000000..a0d4b7d
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+
+/**
+ * This retrieves the JSONObject from the field name by reference.
+ * This is in contrast to JSONFromField, which clones the JSON object and passes by value.
+ */
+public class JSONFromFieldByReference implements MessageGetStrategy {
+  private String messageFieldName;
+  public JSONFromFieldByReference(String messageFieldName) {
+    this.messageFieldName = messageFieldName;
+  }
+
+  @Override
+  public JSONObject get(Tuple tuple) {
+    return (JSONObject) tuple.getValueByField(messageFieldName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
index c91a262..15f0447 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
@@ -35,8 +35,8 @@ public class JSONFromPosition implements MessageGetStrategy {
 
   public JSONFromPosition() {};
 
-  public JSONFromPosition(int position) {
-    this.position = position;
+  public JSONFromPosition(Integer position) {
+    this.position = position == null?0:position;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
index 7004d78..46bb406 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
@@ -41,6 +41,7 @@ public enum MessageGetters {
   BYTES_FROM_POSITION((String arg) -> new BytesFromPosition(ConversionUtils.convert(arg, Integer.class))),
   JSON_FROM_POSITION((String arg) -> new JSONFromPosition(ConversionUtils.convert(arg, Integer.class))),
   JSON_FROM_FIELD((String arg) -> new JSONFromField(arg)),
+  JSON_FROM_FIELD_BY_REFERENCE((String arg) -> new JSONFromFieldByReference(arg)),
   OBJECT_FROM_FIELD((String arg) -> new ObjectFromField(arg)),
   DEFAULT_BYTES_FROM_POSITION(new BytesFromPosition()),
   DEFAULT_JSON_FROM_POSITION(new JSONFromPosition()),

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/README.md b/metron-platform/metron-enrichment/README.md
index d742046..aa6fc99 100644
--- a/metron-platform/metron-enrichment/README.md
+++ b/metron-platform/metron-enrichment/README.md
@@ -33,6 +33,49 @@ data format (e.g. a JSON Map structure with `original_message` and
 
 ![Architecture](enrichment_arch.png)
 
+### Unified Enrichment Topology
+
+There is an experimental unified enrichment topology which is shipped.
+Currently the architecture, as described above, has a split/join in
+order to perform enrichments in parallel.  This poses some issues in
+terms of ease of tuning and reasoning about performance.  
+
+In order to deal with these issues, there is an alternative enrichment topology which
+uses data parallelism as opposed to the split/join task parallelism.
+This architecture uses a worker pool to fully enrich any message within 
+a worker.  This results in 
+* Fewer bolts in the topology 
+* Each bolt fully operates on a message.
+* Fewer network hops
+
+![Unified Architecture](unified_enrichment_arch.svg)
+
+This architecture is fully backwards compatible; the only difference is
+how the enrichment will operate on each message (in one bolt where the
+split/join is done in a threadpool as opposed
+to split across multiple bolts).
+
+#### Using It
+
+In order to use this, you will need to 
+* Edit `$METRON_HOME/bin/start_enrichment_topology.sh` and adjust it to use `remote-unified.yaml` instead of `remote.yaml`
+* Restart the enrichment topology.
+
+#### Configuring It
+
+There are two parameters which you might want to tune in this topology.
+Both of them are topology configuration adjustable in the flux file
+`$METRON_HOME/config/flux/enrichment/remote-unified.yaml`:
+* `metron.threadpool.size` : The size of the threadpool.  This can take a number or a multiple of the number of cores (e.g. `5C` to 5 times the number of cores).  The default is `2C`.
+* `metron.threadpool.type` : The type of threadpool. (note: descriptions taken from [here](https://zeroturnaround.com/rebellabs/fixedthreadpool-cachedthreadpool-or-forkjoinpool-picking-correct-java-executors-for-background-tasks/)).
+   * `FIXED` is a fixed threadpool of size `n`. `n` threads will process tasks at the time, when the pool is saturated, new tasks will get added to a queue without a limit on size. Good for CPU intensive tasks.  This is the default.
+   * `WORK_STEALING` is a work stealing threadpool.  This will create and shut down threads dynamically to accommodate the required parallelism level. It also tries to reduce the contention on the task queue, so can be really good in heavily loaded environments. Also good when your tasks create more tasks for the executor, like recursive tasks.
+
+In order to configure the parallelism for the enrichment bolt and threat
+intel bolt, the configurations will be taken from the respective join bolt
+parallelism.  When proper ambari support for this is added, we will add
+its own property.
+
 ## Enrichment Configuration
 
 The configuration for the `enrichment` topology, the topology primarily
@@ -371,3 +414,5 @@ Now we need to start the topologies and send some data:
 * Ensure that the documents have new fields `foo`, `bar` and `ALL_CAPS` with values as described above.
 
 Note that we could have used any Stellar statements here, including calling out to HBase via `ENRICHMENT_GET` and `ENRICHMENT_EXISTS` or even calling a machine learning model via [Model as a Service](../../metron-analytics/metron-maas-service).
+
+

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml
index e82b86b..bcfb41b 100644
--- a/metron-platform/metron-enrichment/pom.xml
+++ b/metron-platform/metron-enrichment/pom.xml
@@ -68,6 +68,12 @@
             <type>test-jar</type>
         </dependency>
         <dependency>
+          <groupId>com.github.ben-manes.caffeine</groupId>
+          <artifactId>caffeine</artifactId>
+          <version>2.6.2</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-profiler-client</artifactId>
             <version>${project.parent.version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml
new file mode 100644
index 0000000..ddc5ffc
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml
@@ -0,0 +1,378 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is a drop-in replacement for the existing split/join enrichment topology.
+# Instead of a fan-out/fan-in architecture, this adopts a data-parallelism strategy
+# whereby a message is fully enriched inside of a UnifiedEnrichmentBolt.  This simplifies
+# the architecture greatly and cuts down on network hops.  It has unknown performance
+# characteristics, so caveat emptor.
+
+name: "enrichment"
+config:
+    topology.workers: ${enrichment.workers}
+    topology.acker.executors: ${enrichment.acker.executors}
+    topology.worker.childopts: ${topology.worker.childopts}
+    topology.auto-credentials: ${topology.auto-credentials}
+    topology.max.spout.pending: ${topology.max.spout.pending}
+    # Change this if you want to adjust the threadpool size
+    metron.threadpool.size: "2C" # Either a number (e.g. 5) or multiple of cores (e.g. 5C = 5 times the number of cores)
+    # Change this if you want to adjust the threadpool type
+    metron.threadpool.type: "FIXED" # FIXED or WORK_STEALING
+components:
+
+# Enrichment
+    -   id: "stellarEnrichmentAdapter"
+        className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
+        configMethods:
+            -   name: "ofType"
+                args:
+                    - "ENRICHMENT"
+
+    # Any kafka props for the producer go here.
+    -   id: "kafkaWriterProps"
+        className: "java.util.HashMap"
+        configMethods:
+          -   name: "put"
+              args:
+                  - "security.protocol"
+                  - "${kafka.security.protocol}"
+
+    -   id: "stellarEnrichment"
+        className: "org.apache.metron.enrichment.configuration.Enrichment"
+        constructorArgs:
+            -   "stellar"
+            -   ref: "stellarEnrichmentAdapter"
+
+    -   id: "geoEnrichmentAdapter"
+        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
+    -   id: "geoEnrichment"
+        className: "org.apache.metron.enrichment.configuration.Enrichment"
+        constructorArgs:
+            -   "geo"
+            -   ref: "geoEnrichmentAdapter"
+    -   id: "hostEnrichmentAdapter"
+        className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
+        constructorArgs:
+            - '${enrichment.host.known_hosts}'
+    -   id: "hostEnrichment"
+        className: "org.apache.metron.enrichment.configuration.Enrichment"
+        constructorArgs:
+            -   "host"
+            -   ref: "hostEnrichmentAdapter"
+
+    -   id: "simpleHBaseEnrichmentConfig"
+        className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig"
+        configMethods:
+            -   name: "withProviderImpl"
+                args:
+                    - "${hbase.provider.impl}"
+            -   name: "withHBaseTable"
+                args:
+                    - "${enrichment.simple.hbase.table}"
+            -   name: "withHBaseCF"
+                args:
+                    - "${enrichment.simple.hbase.cf}"
+    -   id: "simpleHBaseEnrichmentAdapter"
+        className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter"
+        configMethods:
+           -    name: "withConfig"
+                args:
+                    - ref: "simpleHBaseEnrichmentConfig"
+    -   id: "simpleHBaseEnrichment"
+        className: "org.apache.metron.enrichment.configuration.Enrichment"
+        constructorArgs:
+          -   "hbaseEnrichment"
+          -   ref: "simpleHBaseEnrichmentAdapter"
+    -   id: "enrichments"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - ref: "geoEnrichment"
+            -   name: "add"
+                args:
+                    - ref: "hostEnrichment"
+            -   name: "add"
+                args:
+                    - ref: "simpleHBaseEnrichment"
+            -   name: "add"
+                args:
+                    - ref: "stellarEnrichment"
+
+    #enrichment error
+    -   id: "enrichmentErrorKafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
+        configMethods:
+            -   name: "withTopic"
+                args:
+                    - "${enrichment.error.topic}"
+            -   name: "withZkQuorum"
+                args:
+                    - "${kafka.zk}"
+            -   name: "withProducerConfigs"
+                args: 
+                    - ref: "kafkaWriterProps"
+
+# Threat Intel
+    -   id: "stellarThreatIntelAdapter"
+        className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
+        configMethods:
+            -   name: "ofType"
+                args:
+                    - "THREAT_INTEL"
+    -   id: "stellarThreatIntelEnrichment"
+        className: "org.apache.metron.enrichment.configuration.Enrichment"
+        constructorArgs:
+            -   "stellar"
+            -   ref: "stellarThreatIntelAdapter"
+    -   id: "simpleHBaseThreatIntelConfig"
+        className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig"
+        configMethods:
+            -   name: "withProviderImpl"
+                args:
+                    - "${hbase.provider.impl}"
+            -   name: "withTrackerHBaseTable"
+                args:
+                    - "${threat.intel.tracker.table}"
+            -   name: "withTrackerHBaseCF"
+                args:
+                    - "${threat.intel.tracker.cf}"
+            -   name: "withHBaseTable"
+                args:
+                    - "${threat.intel.simple.hbase.table}"
+            -   name: "withHBaseCF"
+                args:
+                    - "${threat.intel.simple.hbase.cf}"
+    -   id: "simpleHBaseThreatIntelAdapter"
+        className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter"
+        configMethods:
+           -    name: "withConfig"
+                args:
+                    - ref: "simpleHBaseThreatIntelConfig"
+    -   id: "simpleHBaseThreatIntelEnrichment"
+        className: "org.apache.metron.enrichment.configuration.Enrichment"
+        constructorArgs:
+          -   "hbaseThreatIntel"
+          -   ref: "simpleHBaseThreatIntelAdapter"
+
+    -   id: "threatIntels"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - ref: "simpleHBaseThreatIntelEnrichment"
+            -   name: "add"
+                args:
+                    - ref: "stellarThreatIntelEnrichment"
+
+    #threatintel error
+    -   id: "threatIntelErrorKafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
+        configMethods:
+            -   name: "withTopic"
+                args:
+                    - "${threat.intel.error.topic}"
+            -   name: "withZkQuorum"
+                args:
+                    - "${kafka.zk}"
+            -   name: "withProducerConfigs"
+                args: 
+                    - ref: "kafkaWriterProps"
+#indexing
+    -   id: "kafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
+        configMethods:
+            -   name: "withTopic"
+                args:
+                    - "${enrichment.output.topic}"
+            -   name: "withZkQuorum"
+                args:
+                    - "${kafka.zk}"
+            -   name: "withProducerConfigs"
+                args: 
+                    - ref: "kafkaWriterProps"
+
+#kafka/zookeeper
+    # Any kafka props for the consumer go here.
+    -   id: "kafkaProps"
+        className: "java.util.HashMap"
+        configMethods:
+          -   name: "put"
+              args:
+                  - "value.deserializer"
+                  - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+          -   name: "put"
+              args:
+                  - "key.deserializer"
+                  - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+          -   name: "put"
+              args:
+                  - "group.id"
+                  - "enrichments"
+          -   name: "put"
+              args:
+                  - "security.protocol"
+                  - "${kafka.security.protocol}"
+
+
+  # The fields to pull out of the kafka messages
+    -   id: "fields"
+        className: "java.util.ArrayList"
+        configMethods:
+          -   name: "add"
+              args:
+                  - "value"
+
+    -   id: "kafkaConfig"
+        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
+        constructorArgs:
+          - ref: "kafkaProps"
+          # topic name
+          - "${enrichment.input.topic}"
+          - "${kafka.zk}"
+          - ref: "fields"
+        configMethods:
+            -   name: "setFirstPollOffsetStrategy"
+                args:
+                    - "${kafka.start}"
+
+
+spouts:
+    -   id: "kafkaSpout"
+        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
+        constructorArgs:
+            - ref: "kafkaConfig"
+        parallelism: ${kafka.spout.parallelism}
+
+bolts:
+# Enrichment Bolts
+    -   id: "enrichmentBolt"
+        className: "org.apache.metron.enrichment.bolt.UnifiedEnrichmentBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withEnrichments"
+                args:
+                    - ref: "enrichments"
+            -   name: "withMaxCacheSize"
+                args: [${enrichment.join.cache.size}]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+            -   name: "withCaptureCacheStats"
+                args: [true]
+            -   name: "withStrategy"
+                args:
+                    - "ENRICHMENT"
+            -   name: "withMessageGetter"
+                args: ["JSON_FROM_POSITION"]
+        parallelism: ${enrichment.join.parallelism}
+
+    -   id: "enrichmentErrorOutputBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withMessageWriter"
+                args:
+                    - ref: "enrichmentErrorKafkaWriter"
+
+
+# Threat Intel Bolts
+    -   id: "threatIntelBolt"
+        className: "org.apache.metron.enrichment.bolt.UnifiedEnrichmentBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withEnrichments"
+                args:
+                    - ref: "threatIntels"
+            -   name: "withMaxCacheSize"
+                args: [${enrichment.join.cache.size}]
+            -   name: "withMaxTimeRetain"
+                args: [10]
+            -   name: "withCaptureCacheStats"
+                args: [true]
+            -   name: "withStrategy"
+                args:
+                    - "THREAT_INTEL"
+            -   name: "withMessageFieldName"
+                args: ["message"]
+            -   name: "withMessageGetter"
+                args: ["JSON_FROM_FIELD_BY_REFERENCE"]
+        parallelism: ${threat.intel.join.parallelism}
+
+    -   id: "threatIntelErrorOutputBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withMessageWriter"
+                args:
+                    - ref: "threatIntelErrorKafkaWriter"
+
+# Indexing Bolts
+    -   id: "outputBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withMessageWriter"
+                args:
+                    - ref: "kafkaWriter"
+        parallelism: ${kafka.writer.parallelism}
+
+
+streams:
+#parser
+    -   name: "spout -> enrichmentBolt"
+        from: "kafkaSpout"
+        to: "enrichmentBolt"
+        grouping:
+            type: LOCAL_OR_SHUFFLE
+
+    # Error output
+    -   name: "enrichmentBolt -> enrichmentErrorOutputBolt"
+        from: "enrichmentBolt"
+        to: "enrichmentErrorOutputBolt"
+        grouping:
+            streamId: "error"
+            type: LOCAL_OR_SHUFFLE
+
+#threat intel
+    -   name: "enrichmentBolt -> threatIntelBolt"
+        from: "enrichmentBolt"
+        to: "threatIntelBolt"
+        grouping:
+            streamId: "message"
+            type: LOCAL_OR_SHUFFLE
+
+#output
+    -   name: "threatIntelBolt -> output"
+        from: "threatIntelBolt"
+        to: "outputBolt"
+        grouping:
+            streamId: "message"
+            type: LOCAL_OR_SHUFFLE
+
+    # Error output
+    -   name: "threatIntelBolt -> threatIntelErrorOutputBolt"
+        from: "threatIntelBolt"
+        to: "threatIntelErrorOutputBolt"
+        grouping:
+            streamId: "error"
+            type: LOCAL_OR_SHUFFLE
+
+

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
index d3f7820..0aabd38 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
@@ -22,6 +22,7 @@ import static org.apache.metron.enrichment.bolt.GenericEnrichmentBolt.STELLAR_CO
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -173,7 +174,9 @@ public class StellarAdapter implements EnrichmentAdapter<CacheKey>,Serializable
     if(_PERF_LOG.isDebugEnabled()) {
       slowLogThreshold = ConversionUtils.convert(globalConfig.getOrDefault(STELLAR_SLOW_LOG, STELLAR_SLOW_LOG_DEFAULT), Long.class);
     }
-    Map<String, Object> message = value.getValue(Map.class);
+    //Ensure that you clone the message, because process will modify the message.  If the message object is modified
+    //then cache misses will happen because the cache will be modified.
+    Map<String, Object> message = new HashMap<>(value.getValue(Map.class));
     VariableResolver resolver = new MapVariableResolver(message, sensorConfig, globalConfig);
     StellarProcessor processor = new StellarProcessor();
     JSONObject enriched = process(message

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 9b84193..dbbb7b6 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -34,6 +34,7 @@ import org.apache.metron.common.performance.PerformanceLogger;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.enrichment.configuration.Enrichment;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.storm.task.OutputCollector;
@@ -245,16 +246,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
               continue;
             }
           }
-          if ( !enrichedField.isEmpty()) {
-            for (Object enrichedKey : enrichedField.keySet()) {
-              if(!StringUtils.isEmpty(prefix)) {
-                enrichedMessage.put(field + "." + enrichedKey, enrichedField.get(enrichedKey));
-              }
-              else {
-                enrichedMessage.put(enrichedKey, enrichedField.get(enrichedKey));
-              }
-            }
-          }
+          enrichedMessage = EnrichmentUtils.adjustKeys(enrichedMessage, enrichedField, field, prefix);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index 6e24d65..1ce0b16 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -30,6 +30,7 @@ import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriag
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.enrichment.utils.ThreatIntelUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
@@ -45,35 +46,6 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  /**
-   * The message key under which the overall threat triage score is stored.
-   */
-  public static final String THREAT_TRIAGE_SCORE_KEY = "threat.triage.score";
-
-  /**
-   * The prefix of the message keys that record the threat triage rules that fired.
-   */
-  public static final String THREAT_TRIAGE_RULES_KEY = "threat.triage.rules";
-
-  /**
-   * The portion of the message key used to record the 'name' field of a rule.
-   */
-  public static final String THREAT_TRIAGE_RULE_NAME = "name";
-
-  /**
-   * The portion of the message key used to record the 'comment' field of a rule.
-   */
-  public static final String THREAT_TRIAGE_RULE_COMMENT = "comment";
-
-  /**
-   * The portion of the message key used to record the 'score' field of a rule.
-   */
-  public static final String THREAT_TRIAGE_RULE_SCORE = "score";
-
-  /**
-   * The portion of the message key used to record the 'reason' field of a rule.
-   */
-  public static final String THREAT_TRIAGE_RULE_REASON = "reason";
 
   /**
    * The Stellar function resolver.
@@ -133,70 +105,12 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
     }
   }
 
+
   @Override
   public JSONObject joinMessages(Map<String, Tuple> streamMessageMap, MessageGetStrategy messageGetStrategy) {
     JSONObject ret = super.joinMessages(streamMessageMap, messageGetStrategy);
-    LOG.trace("Received joined messages: {}", ret);
-    boolean isAlert = ret.containsKey("is_alert");
-    if(!isAlert) {
-      for (Object key : ret.keySet()) {
-        if (key.toString().startsWith("threatintels") && !key.toString().endsWith(".ts")) {
-          isAlert = true;
-          break;
-        }
-      }
-    }
-    else {
-      Object isAlertObj = ret.get("is_alert");
-      isAlert = ConversionUtils.convert(isAlertObj, Boolean.class);
-      if(!isAlert) {
-        ret.remove("is_alert");
-      }
-    }
-    if(isAlert) {
-      ret.put("is_alert" , "true");
-      String sourceType = MessageUtils.getSensorType(ret);
-      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
-      ThreatTriageConfig triageConfig = null;
-      if(config != null) {
-        triageConfig = config.getThreatIntel().getTriageConfig();
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("{}: Found sensor enrichment config.", sourceType);
-        }
-      }
-      else {
-        LOG.debug("{}: Unable to find threat config.", sourceType );
-      }
-      if(triageConfig != null) {
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("{}: Found threat triage config: {}", sourceType, triageConfig);
-        }
-
-        if(LOG.isDebugEnabled() && (triageConfig.getRiskLevelRules() == null || triageConfig.getRiskLevelRules().isEmpty())) {
-          LOG.debug("{}: Empty rules!", sourceType);
-        }
-
-        // triage the threat
-        ThreatTriageProcessor threatTriageProcessor = new ThreatTriageProcessor(config, functionResolver, stellarContext);
-        ThreatScore score = threatTriageProcessor.apply(ret);
-
-        if(LOG.isDebugEnabled()) {
-          String rules = Joiner.on('\n').join(triageConfig.getRiskLevelRules());
-          LOG.debug("Marked {} as triage level {} with rules {}", sourceType, score.getScore(),
-              rules);
-        }
-
-        // attach the triage threat score to the message
-        if(score.getRuleScores().size() > 0) {
-          appendThreatScore(score, ret);
-        }
-      }
-      else {
-        LOG.debug("{}: Unable to find threat triage config!", sourceType);
-      }
-    }
-
-    return ret;
+    String sourceType = MessageUtils.getSensorType(ret);
+    return ThreatIntelUtils.triage(ret, getConfigurations().getSensorEnrichmentConfig(sourceType), functionResolver, stellarContext);
   }
 
   @Override
@@ -207,24 +121,5 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
     }
   }
 
-  /**
-   * Appends the threat score to the telemetry message.
-   * @param threatScore The threat triage score
-   * @param message The telemetry message being triaged.
-   */
-  private void appendThreatScore(ThreatScore threatScore, JSONObject message) {
-
-    // append the overall threat score
-    message.put(THREAT_TRIAGE_SCORE_KEY, threatScore.getScore());
-
-    // append each of the rules - each rule is 'flat'
-    Joiner joiner = Joiner.on(".");
-    int i = 0;
-    for(RuleScore score: threatScore.getRuleScores()) {
-      message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_NAME), score.getRule().getName());
-      message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_COMMENT), score.getRule().getComment());
-      message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_SCORE), score.getRule().getScore());
-      message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i++, THREAT_TRIAGE_RULE_REASON), score.getReason());
-    }
-  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
new file mode 100644
index 0000000..c258fb0
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.enrichment.configuration.Enrichment;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.parallel.EnrichmentContext;
+import org.apache.metron.enrichment.parallel.EnrichmentStrategies;
+import org.apache.metron.enrichment.parallel.ParallelEnricher;
+import org.apache.metron.enrichment.parallel.ConcurrencyContext;
+import org.apache.metron.enrichment.parallel.WorkerPoolStrategies;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This bolt is a unified enrichment/threat intel bolt.  In contrast to the split/enrich/join
+ * bolts above, this handles the entire enrichment lifecycle in one bolt using a threadpool to
+ * enrich in parallel.
+ *
+ * From an architectural perspective, this is a divergence from the polymorphism based strategy we have
+ * used in the split/join bolts.  Rather, this bolt is provided a strategy to use, either enrichment or threat intel,
+ * through composition.  This allows us to move most of the implementation into components independent
+ * from Storm.  This will greater facilitate reuse.
+ */
+public class UnifiedEnrichmentBolt extends ConfiguredEnrichmentBolt {
+
+  public static class Perf {} // used for performance logging
+  private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final String STELLAR_CONTEXT_CONF = "stellarContext";
+
+  /**
+   * The number of threads in the threadpool.  One threadpool is created per process.
+   * This is a topology-level configuration
+   */
+  public static final String THREADPOOL_NUM_THREADS_TOPOLOGY_CONF = "metron.threadpool.size";
+  /**
+   * The type of threadpool to create. This is a topology-level configuration.
+   */
+  public static final String THREADPOOL_TYPE_TOPOLOGY_CONF = "metron.threadpool.type";
+
+  /**
+   * The enricher implementation to use.  This will do the parallel enrichment via a thread pool.
+   */
+  protected ParallelEnricher enricher;
+
+  /**
+   * The strategy to use for this enrichment bolt.  Practically speaking this is either
+   * enrichment or threat intel.  It is configured in the topology itself.
+   */
+  protected EnrichmentStrategies strategy;
+  /**
+   * Determine the way to retrieve the message.  This must be specified in the topology.
+   */
+  protected MessageGetStrategy messageGetter;
+  protected MessageGetters getterStrategy;
+  protected OutputCollector collector;
+  private Context stellarContext;
+  /**
+   * An enrichment type to adapter map.  This is configured externally.
+   */
+  protected Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType = new HashMap<>();
+
+  /**
+   * The total number of elements in a LRU cache.  This cache is used for the enrichments; if an
+   * element is in the cache, then the result is returned instead of computed.
+   */
+  protected Long maxCacheSize;
+  /**
+   * The total amount of time in minutes since write to keep an element in the cache.
+   */
+  protected Long maxTimeRetain;
+  /**
+   * If the bolt is reloaded, invalidate the cache?
+   */
+  protected boolean invalidateCacheOnReload = false;
+
+  /**
+   * The message field to use.  If this is set, then this indicates the field to use to retrieve the message object.
+   * IF this is unset, then we presume that the message is coming in as a string version of a JSON blob on the first
+   * element of the tuple.
+   */
+  protected String messageFieldName;
+  protected EnrichmentContext enrichmentContext;
+  protected boolean captureCacheStats = true;
+
+  public UnifiedEnrichmentBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
+  /**
+   * Specify the enrichments to support.
+   * @param enrichments enrichment
+   * @return Instance of this class
+   */
+  public UnifiedEnrichmentBolt withEnrichments(List<Enrichment> enrichments) {
+    for(Enrichment e : enrichments) {
+      enrichmentsByType.put(e.getType(), e.getAdapter());
+    }
+    return this;
+  }
+
+  public UnifiedEnrichmentBolt withCaptureCacheStats(boolean captureCacheStats) {
+    this.captureCacheStats = captureCacheStats;
+    return this;
+  }
+
+  /**
+   * Determine the message get strategy (One of the enums from MessageGetters).
+   * @param getter
+   * @return
+   */
+  public UnifiedEnrichmentBolt withMessageGetter(String getter) {
+    this.getterStrategy = MessageGetters.valueOf(getter);
+    return this;
+  }
+
+  /**
+   * Figure out how many threads to use in the thread pool.  The user can pass an arbitrary object, so parse it
+   * according to some rules.  If it's a number, then cast to an int.  IF it's a string and ends with "C", then strip
+   * the C and treat it as an integral multiple of the number of cores.  If it's a string and does not end with a C, then treat
+   * it as a number in string form.
+   * @param numThreads
+   * @return
+   */
+  private static int getNumThreads(Object numThreads) {
+    if(numThreads instanceof Number) {
+      return ((Number)numThreads).intValue();
+    }
+    else if(numThreads instanceof String) {
+      String numThreadsStr = ((String)numThreads).trim().toUpperCase();
+      if(numThreadsStr.endsWith("C")) {
+        Integer factor = Integer.parseInt(numThreadsStr.replace("C", ""));
+        return factor*Runtime.getRuntime().availableProcessors();
+      }
+      else {
+        return Integer.parseInt(numThreadsStr);
+      }
+    }
+    return 2*Runtime.getRuntime().availableProcessors();
+  }
+
+  /**
+   * The strategy to use.  This indicates which part of the config that this bolt uses
+   * to enrich, threat intel or enrichment.  This must conform to one of the EnrichmentStrategies
+   * enum.
+   * @param strategy
+   * @return
+   */
+  public UnifiedEnrichmentBolt withStrategy(String strategy) {
+    this.strategy = EnrichmentStrategies.valueOf(strategy);
+    return this;
+  }
+
+  /**
+   * @param maxCacheSize Maximum size of cache before flushing
+   * @return Instance of this class
+   */
+  public UnifiedEnrichmentBolt withMaxCacheSize(long maxCacheSize) {
+    this.maxCacheSize = maxCacheSize;
+    return this;
+  }
+
+  /**
+   * @param maxTimeRetain Maximum time to retain cached entry before expiring
+   * @return Instance of this class
+   */
+
+  public UnifiedEnrichmentBolt withMaxTimeRetain(long maxTimeRetain) {
+    this.maxTimeRetain = maxTimeRetain;
+    return this;
+  }
+
+  /**
+   * Invalidate the cache on reload of bolt.  By default, we do not.
+   * @param cacheInvalidationOnReload
+   * @return
+   */
+  public UnifiedEnrichmentBolt withCacheInvalidationOnReload(boolean cacheInvalidationOnReload) {
+    this.invalidateCacheOnReload= cacheInvalidationOnReload;
+    return this;
+  }
+
+
+  @Override
+  public void reloadCallback(String name, ConfigurationType type) {
+    if(invalidateCacheOnReload) {
+      if(strategy != null && ConcurrencyContext.get(strategy).getCache() != null) {
+        ConcurrencyContext.get(strategy).getCache().invalidateAll();
+      }
+    }
+    if(type == ConfigurationType.GLOBAL && enrichmentsByType != null) {
+      for(EnrichmentAdapter adapter : enrichmentsByType.values()) {
+        adapter.updateAdapter(getConfigurations().getGlobalConfig());
+      }
+    }
+  }
+
+
+  /**
+   * Fully enrich a message based on the strategy which was used to configure the bolt.
+   * Each enrichment is done in parallel and the results are joined together.  Each enrichment
+   * will use a cache so computation is avoided if the result has been computed before.
+   *
+   * Errors in the enrichment result in an error message being sent on the "error" stream.
+   * The successful enrichments will be joined with the original message and the message will
+   * be sent along the "message" stream.
+   *
+   * @param input The input tuple to be processed.
+   */
+  @Override
+  public void execute(Tuple input) {
+    JSONObject message = generateMessage(input);
+    try {
+      String sourceType = MessageUtils.getSensorType(message);
+      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
+      if(config == null) {
+        LOG.debug("Unable to find SensorEnrichmentConfig for sourceType: {}", sourceType);
+        config = new SensorEnrichmentConfig();
+      }
+      //This is an existing kludge for the stellar adapter to pass information along.
+      //We should figure out if this can be rearchitected a bit.  This smells.
+      config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF, stellarContext);
+      String guid = getGUID(input, message);
+
+      // enrich the message
+      ParallelEnricher.EnrichmentResult result = enricher.apply(message, strategy, config, perfLog);
+      JSONObject enriched = result.getResult();
+      enriched = strategy.postProcess(enriched, config, enrichmentContext);
+
+      //we can emit the message now
+      collector.emit("message",
+              input,
+              new Values(guid, enriched));
+      //and handle each of the errors in turn.  If any adapter errored out, we will have one message per.
+      for(Map.Entry<Object, Throwable> t : result.getEnrichmentErrors()) {
+        LOG.error("[Metron] Unable to enrich message: {}", message, t);
+        MetronError error = new MetronError()
+                .withErrorType(strategy.getErrorType())
+                .withMessage(t.getValue().getMessage())
+                .withThrowable(t.getValue())
+                .addRawMessage(t.getKey());
+        ErrorUtils.handleError(collector, error);
+      }
+    } catch (Exception e) {
+      //If something terrible and unexpected happens then we want to send an error along, but this
+      //really shouldn't be happening.
+      LOG.error("[Metron] Unable to enrich message: {}", message, e);
+      MetronError error = new MetronError()
+              .withErrorType(strategy.getErrorType())
+              .withMessage(e.getMessage())
+              .withThrowable(e)
+              .addRawMessage(message);
+      ErrorUtils.handleError(collector, error);
+    }
+    finally {
+      collector.ack(input);
+    }
+  }
+
+  /**
+   * The message field name.  If this is set, then use this field to retrieve the message.
+   * @param messageFieldName
+   * @return
+   */
+  public UnifiedEnrichmentBolt withMessageFieldName(String messageFieldName) {
+    this.messageFieldName = messageFieldName;
+    return this;
+  }
+
+  /**
+   * Take the tuple and construct the message.
+   * @param tuple
+   * @return
+   */
+  public JSONObject generateMessage(Tuple tuple) {
+    return (JSONObject) messageGetter.get(tuple);
+  }
+
+  @Override
+  public final void prepare(Map map, TopologyContext topologyContext,
+                       OutputCollector outputCollector) {
+    super.prepare(map, topologyContext, outputCollector);
+    collector = outputCollector;
+    if (this.maxCacheSize == null) {
+      throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified");
+    }
+    if (this.maxTimeRetain == null) {
+      throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
+    }
+    if (this.enrichmentsByType.isEmpty()) {
+      throw new IllegalStateException("Adapter must be specified");
+    }
+
+    for(Map.Entry<String, EnrichmentAdapter<CacheKey>> adapterKv : enrichmentsByType.entrySet()) {
+      boolean success = adapterKv.getValue().initializeAdapter(getConfigurations().getGlobalConfig());
+      if (!success) {
+        LOG.error("[Metron] Could not initialize adapter: " + adapterKv.getKey());
+        throw new IllegalStateException("Could not initialize adapter: " + adapterKv.getKey());
+      }
+    }
+    WorkerPoolStrategies workerPoolStrategy = WorkerPoolStrategies.FIXED;
+    if(map.containsKey(THREADPOOL_TYPE_TOPOLOGY_CONF)) {
+      workerPoolStrategy = WorkerPoolStrategies.valueOf(map.get(THREADPOOL_TYPE_TOPOLOGY_CONF) + "");
+    }
+    if(map.containsKey(THREADPOOL_NUM_THREADS_TOPOLOGY_CONF)) {
+      int numThreads = getNumThreads(map.get(THREADPOOL_NUM_THREADS_TOPOLOGY_CONF));
+      ConcurrencyContext.get(strategy).initialize(numThreads, maxCacheSize, maxTimeRetain, workerPoolStrategy, LOG, captureCacheStats);
+    }
+    else {
+      throw new IllegalStateException("You must pass " + THREADPOOL_NUM_THREADS_TOPOLOGY_CONF + " via storm config.");
+    }
+    messageGetter = this.getterStrategy.get(messageFieldName);
+    enricher = new ParallelEnricher(enrichmentsByType, ConcurrencyContext.get(strategy), captureCacheStats);
+    perfLog = new PerformanceLogger(() -> getConfigurations().getGlobalConfig(), Perf.class.getName());
+    GeoLiteDatabase.INSTANCE.update((String)getConfigurations().getGlobalConfig().get(GeoLiteDatabase.GEO_HDFS_FILE));
+    initializeStellar();
+    enrichmentContext = new EnrichmentContext(StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
+  }
+
+
+  protected void initializeStellar() {
+    stellarContext = new Context.Builder()
+                         .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+                         .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
+                         .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
+                         .build();
+    StellarFunctions.initialize(stellarContext);
+  }
+
+  /**
+   * Return the GUID from either the tuple or the message.
+   *
+   * @param tuple
+   * @param message
+   * @return
+   */
+  public String getGUID(Tuple tuple, JSONObject message) {
+    String key = null, guid = null;
+    try {
+      key = tuple.getStringByField("key");
+      guid = (String)message.get(Constants.GUID);
+    }
+    catch(Throwable t) {
+      //swallowing this just in case.
+    }
+    if(key != null) {
+      return key;
+    }
+    else if(guid != null) {
+      return guid;
+    }
+    else {
+      return UUID.randomUUID().toString();
+    }
+  }
+
+  /**
+   * Declare the output schema for all the streams of this topology.
+   * We declare two streams: error and message.
+   *
+   * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
+   */
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream("message", new Fields("key", "message"));
+    declarer.declareStream("error", new Fields("message"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ConcurrencyContext.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ConcurrencyContext.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ConcurrencyContext.java
new file mode 100644
index 0000000..ed5985a
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ConcurrencyContext.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+
+import java.util.EnumMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This provides the parallel infrastructure, the thread pool and the cache.
+ * The threadpool is static and the cache is instance specific.
+ */
+public class ConcurrencyContext {
+  private static Executor executor;
+  private Cache<CacheKey, JSONObject> cache;
+
+  private static EnumMap<EnrichmentStrategies, ConcurrencyContext> strategyToInfrastructure
+          = new EnumMap<EnrichmentStrategies, ConcurrencyContext>(EnrichmentStrategies.class) {{
+    for(EnrichmentStrategies e : EnrichmentStrategies.values()) {
+      put(e, new ConcurrencyContext());
+    }
+  }};
+
+  public static ConcurrencyContext get(EnrichmentStrategies strategy) {
+    return strategyToInfrastructure.get(strategy);
+  }
+
+  protected ConcurrencyContext() { }
+
+  /*
+   * Initialize the thread pool and cache.  The threadpool is static and the cache is per strategy.
+   *
+   * @param numThreads The number of threads in the threadpool.
+   * @param maxCacheSize The maximum size of the cache, beyond which and keys are evicted.
+   * @param maxTimeRetain The maximum time to retain an element in the cache (in minutes)
+   * @param poolStrategy The strategy for creating a threadpool
+   * @param log The logger to use
+   * @param logStats Should we record stats in the cache?
+   */
+  public synchronized void initialize( int numThreads
+                                     , long maxCacheSize
+                                     , long maxTimeRetain
+                                     , WorkerPoolStrategies poolStrategy
+                                     , Logger log
+                                     , boolean logStats
+                                     ) {
+    if(executor == null) {
+      if (log != null) {
+        log.info("Creating new threadpool of size {}", numThreads);
+      }
+      executor = (poolStrategy == null? WorkerPoolStrategies.FIXED:poolStrategy).create(numThreads);
+    }
+    if(cache == null) {
+      if (log != null) {
+        log.info("Creating new cache with maximum size {}, and expiration after write of {} minutes", maxCacheSize, maxTimeRetain);
+      }
+      Caffeine builder = Caffeine.newBuilder().maximumSize(maxCacheSize)
+                           .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
+                           .executor(executor)
+                         ;
+      if(logStats) {
+        builder = builder.recordStats();
+      }
+      cache = builder.build();
+    }
+  }
+
+  public static Executor getExecutor() {
+    return executor;
+  }
+
+  public Cache<CacheKey, JSONObject> getCache() {
+    return cache;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentCallable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentCallable.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentCallable.java
new file mode 100644
index 0000000..da4d574
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentCallable.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.json.simple.JSONObject;
+
+import java.util.concurrent.Callable;
+import java.util.function.Function;
+
+/**
+ * Enrich based on a key and enrichment adapter.  The CacheKey contains all necessary input information for an enrichment.
+ */
+public class EnrichmentCallable implements Callable<JSONObject>, Function<CacheKey, JSONObject> {
+  CacheKey key;
+  EnrichmentAdapter<CacheKey> adapter;
+
+  public EnrichmentCallable( CacheKey key
+          , EnrichmentAdapter<CacheKey> adapter
+  )
+  {
+    this.key = key;
+    this.adapter = adapter;
+  }
+
+  /**
+   * Computes a result, or throws an exception if unable to do so.
+   *
+   * @return computed result
+   * @throws Exception if unable to compute a result
+   */
+  @Override
+  public JSONObject call() throws Exception {
+    //Log access for this key.
+    adapter.logAccess(key);
+    return adapter.enrich(key);
+  }
+
+  /**
+   * Applies this function to the given argument.
+   *
+   * @param cacheKey the function argument
+   * @return the function result
+   */
+  @Override
+  public JSONObject apply(CacheKey cacheKey) {
+    adapter.logAccess(key);
+    return adapter.enrich(cacheKey);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentContext.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentContext.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentContext.java
new file mode 100644
index 0000000..d2a9fe7
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentContext.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+
+/**
+ * The full context needed for an enrichment.  This is an abstraction to pass information from the underlying
+ * environment (e.g. a storm bolt) to the set of storm independent enrichment infrastructure.
+ */
+public class EnrichmentContext {
+  private FunctionResolver functionResolver;
+  private Context stellarContext;
+
+  public EnrichmentContext(FunctionResolver functionResolver, Context stellarContext) {
+    this.functionResolver = functionResolver;
+    this.stellarContext = stellarContext;
+  }
+
+  public FunctionResolver getFunctionResolver() {
+    return functionResolver;
+  }
+
+  public Context getStellarContext() {
+    return stellarContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java
new file mode 100644
index 0000000..3683407
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.apache.metron.enrichment.utils.ThreatIntelUtils;
+import org.json.simple.JSONObject;
+
+/**
+ * The specific strategies to interact with the sensor enrichment config.
+ * The approach presented here, in contrast to the inheritance-based approach
+ * in the bolts, allows for an abstraction through composition whereby we
+ * localize all the interactions with the sensor enrichment config in a strategy
+ * rather than bind the abstraction to Storm, our distributed processing engine.
+ */
+public enum EnrichmentStrategies implements EnrichmentStrategy {
+  /**
+   * Interact with the enrichment portion of the enrichment config
+   */
+  ENRICHMENT(new EnrichmentStrategy() {
+    @Override
+    public EnrichmentConfig getUnderlyingConfig(SensorEnrichmentConfig config) {
+      return config.getEnrichment();
+    }
+
+    @Override
+    public Constants.ErrorType getErrorType() {
+      return Constants.ErrorType.ENRICHMENT_ERROR;
+    }
+
+    @Override
+    public String fieldToEnrichmentKey(String type, String field) {
+      return EnrichmentUtils.getEnrichmentKey(type, field);
+    }
+  }),
+  /**
+   * Interact with the threat intel portion of the enrichment config.
+   */
+  THREAT_INTEL(new EnrichmentStrategy() {
+    @Override
+    public EnrichmentConfig getUnderlyingConfig(SensorEnrichmentConfig config) {
+      return config.getThreatIntel();
+    }
+
+    @Override
+    public Constants.ErrorType getErrorType() {
+      return Constants.ErrorType.THREAT_INTEL_ERROR;
+    }
+
+    @Override
+    public String fieldToEnrichmentKey(String type, String field) {
+      return ThreatIntelUtils.getThreatIntelKey(type, field);
+    }
+
+    @Override
+    public JSONObject postProcess(JSONObject message, SensorEnrichmentConfig config, EnrichmentContext context) {
+      return ThreatIntelUtils.triage(message, config, context.getFunctionResolver(), context.getStellarContext());
+    }
+  })
+  ;
+
+  EnrichmentStrategy enrichmentStrategy;
+  EnrichmentStrategies(EnrichmentStrategy enrichmentStrategy) {
+    this.enrichmentStrategy = enrichmentStrategy;
+  }
+
+  /**
+   * Get the underlying enrichment config.  If this is provided, then we need not retrieve
+   * @return
+   */
+  @Override
+  public EnrichmentConfig getUnderlyingConfig(SensorEnrichmentConfig config) {
+    return enrichmentStrategy.getUnderlyingConfig(config);
+  }
+
+  public String fieldToEnrichmentKey(String type, String field) {
+    return enrichmentStrategy.fieldToEnrichmentKey(type, field);
+  }
+
+
+  public JSONObject postProcess(JSONObject message, SensorEnrichmentConfig config, EnrichmentContext context)  {
+    return enrichmentStrategy.postProcess(message, config, context);
+  }
+
+  @Override
+  public Constants.ErrorType getErrorType() {
+    return enrichmentStrategy.getErrorType();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategy.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategy.java
new file mode 100644
index 0000000..1d32749
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategy.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+
+import java.util.Map;
+
+/**
+ * Enrichment strategy.  This interface provides a mechanism to interface with the enrichment config and any
+ * post processing steps that are needed to be done after-the-fact.
+ *
+ * The reasoning behind this is that the key difference between enrichments and threat intel is that they pull
+ * their configurations from different parts of the SensorEnrichmentConfig object and as a post-join step, they differ
+ * slightly.
+ *
+ */
+public interface EnrichmentStrategy {
+
+  /**
+   * Get the underlying configuration for this phase from the sensor enrichment config.
+   * @return
+   */
+  EnrichmentConfig getUnderlyingConfig(SensorEnrichmentConfig config);
+
+  /**
+   * Retrieves the error type, so that error messages can be constructed appropriately.
+   */
+  Constants.ErrorType getErrorType();
+
+  /**
+   * Takes the enrichment type and the field and returns a unique key to prefix the output of the enrichment.  For
+   * less adaptable enrichments than Stellar, this is important to allow for namespacing in the new fields created.
+   * @param type The enrichment type name
+   * @param field The input field
+   * @return
+   */
+  String fieldToEnrichmentKey(String type, String field);
+
+
+  /**
+   * Post-process callback after messages are enriched and joined.  By default, this is noop.
+   * @param message The input message.
+   * @param config The enrichment configuration
+   * @param context The enrichment context
+   * @return
+   */
+  default JSONObject postProcess(JSONObject message, SensorEnrichmentConfig config, EnrichmentContext context) {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
new file mode 100644
index 0000000..2238c92
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.json.simple.JSONObject;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+
+/**
+ * This is an independent component which will accept a message and a set of enrichment adapters as well as a config which defines
+ * how those enrichments should be performed and fully enrich the message.  The result will be the enriched message
+ * unified together and a list of errors which happened.
+ */
+public class ParallelEnricher {
+
+  private Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType = new HashMap<>();
+  private EnumMap<EnrichmentStrategies, CacheStats> cacheStats = new EnumMap<>(EnrichmentStrategies.class);
+
+  /**
+   * The result of an enrichment.
+   */
+  public static class EnrichmentResult {
+    private JSONObject result;
+    private List<Map.Entry<Object, Throwable>> enrichmentErrors;
+
+    public EnrichmentResult(JSONObject result, List<Map.Entry<Object, Throwable>> enrichmentErrors) {
+      this.result = result;
+      this.enrichmentErrors = enrichmentErrors;
+    }
+
+    /**
+     * The unified fully enriched result.
+     * @return
+     */
+    public JSONObject getResult() {
+      return result;
+    }
+
+    /**
+     * The errors that happened in the course of enriching.
+     * @return
+     */
+    public List<Map.Entry<Object, Throwable>> getEnrichmentErrors() {
+      return enrichmentErrors;
+    }
+  }
+
+  private ConcurrencyContext concurrencyContext;
+
+  /**
+   * Construct a parallel enricher with a set of enrichment adapters associated with their enrichment types.
+   * @param enrichmentsByType
+   */
+  public ParallelEnricher( Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType
+                         , ConcurrencyContext concurrencyContext
+                         , boolean logStats
+                         )
+  {
+    this.enrichmentsByType = enrichmentsByType;
+    this.concurrencyContext = concurrencyContext;
+    if(logStats) {
+      for(EnrichmentStrategies s : EnrichmentStrategies.values()) {
+        cacheStats.put(s, null);
+      }
+    }
+  }
+
+  /**
+   * Fully enriches a message.  Each enrichment is done in parallel via a threadpool.
+   * Each enrichment is fronted with a LRU cache.
+   *
+   * @param message the message to enrich
+   * @param strategy The enrichment strategy to use (e.g. enrichment or threat intel)
+   * @param config The sensor enrichment config
+   * @param perfLog The performance logger.  We log the performance for this call, the split portion and the enrichment portion.
+   * @return the enrichment result
+   */
+  public EnrichmentResult apply( JSONObject message
+                         , EnrichmentStrategies strategy
+                         , SensorEnrichmentConfig config
+                         , PerformanceLogger perfLog
+                         ) throws ExecutionException, InterruptedException {
+    if(message == null) {
+      return null;
+    }
+    if(perfLog != null) {
+      perfLog.mark("execute");
+      if(perfLog.isDebugEnabled() && !cacheStats.isEmpty()) {
+        CacheStats before =  cacheStats.get(strategy);
+        CacheStats after = concurrencyContext.getCache().stats();
+        if(before != null && after != null) {
+          CacheStats delta = after.minus(before);
+          perfLog.log("cache", delta.toString());
+        }
+        cacheStats.put(strategy, after);
+      }
+    }
+    String sensorType = MessageUtils.getSensorType(message);
+    message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
+    // Split the message into individual tasks.
+    //
+    // A task will either correspond to an enrichment adapter or,
+    // in the case of Stellar, a stellar subgroup.  The tasks will be grouped by enrichment type (the key of the
+    //tasks map).  Each JSONObject will correspond to a unit of work.
+    Map<String, List<JSONObject>> tasks = splitMessage( message
+                                                      , strategy
+                                                      , config
+                                                      );
+    message.put(getClass().getSimpleName().toLowerCase() + ".splitter.end.ts", "" + System.currentTimeMillis());
+    message.put(getClass().getSimpleName().toLowerCase() + ".enrich.begin.ts", "" + System.currentTimeMillis());
+    if(perfLog != null) {
+      perfLog.mark("enrich");
+    }
+    List<CompletableFuture<JSONObject>> taskList = new ArrayList<>();
+    List<Map.Entry<Object, Throwable>> errors = Collections.synchronizedList(new ArrayList<>());
+    for(Map.Entry<String, List<JSONObject>> task : tasks.entrySet()) {
+      //task is the list of enrichment tasks for the task.getKey() adapter
+      EnrichmentAdapter<CacheKey> adapter = enrichmentsByType.get(task.getKey());
+      for(JSONObject m : task.getValue()) {
+        /* now for each unit of work (each of these only has one element in them)
+         * the key is the field name and the value is value associated with that field.
+         *
+         * In the case of stellar enrichment, the field name is the subgroup name or empty string.
+         * The value is the subset of the message needed for the enrichment.
+         *
+         * In the case of another enrichment (e.g. hbase), the field name is the field name being enriched.
+         * The value is the corresponding value.
+         */
+        for(Object o : m.keySet()) {
+          String field = (String) o;
+          Object value = m.get(o);
+          CacheKey cacheKey = new CacheKey(field, value, config);
+          String prefix = adapter.getOutputPrefix(cacheKey);
+          Supplier<JSONObject> supplier = () -> {
+            try {
+              JSONObject ret = concurrencyContext.getCache().get(cacheKey, new EnrichmentCallable(cacheKey, adapter));
+              if(ret == null) {
+                ret = new JSONObject();
+              }
+              //each enrichment has their own unique prefix to use to adjust the keys for the enriched fields.
+              return EnrichmentUtils.adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix);
+            } catch (Throwable e) {
+              JSONObject errorMessage = new JSONObject();
+              errorMessage.putAll(m);
+              errorMessage.put(Constants.SENSOR_TYPE, sensorType );
+              errors.add(new AbstractMap.SimpleEntry<>(errorMessage, new IllegalStateException(strategy + " error with " + task.getKey() + " failed: " + e.getMessage(), e)));
+              return new JSONObject();
+            }
+          };
+          //add the Future to the task list
+          taskList.add(CompletableFuture.supplyAsync( supplier, ConcurrencyContext.getExecutor()));
+        }
+      }
+    }
+    if(taskList.isEmpty()) {
+      return new EnrichmentResult(message, errors);
+    }
+
+    EnrichmentResult ret = new EnrichmentResult(all(taskList, message, (left, right) -> join(left, right)).get(), errors);
+    message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis());
+    if(perfLog != null) {
+      String key = message.get(Constants.GUID) + "";
+      perfLog.log("enrich", "key={}, elapsed time to enrich", key);
+      perfLog.log("execute", "key={}, elapsed time to run execute", key);
+    }
+    return ret;
+  }
+
+  private static JSONObject join(JSONObject left, JSONObject right) {
+    JSONObject message = new JSONObject();
+    message.putAll(left);
+    message.putAll(right);
+    List<Object> emptyKeys = new ArrayList<>();
+    for(Object key : message.keySet()) {
+      Object value = message.get(key);
+      if(value == null || value.toString().length() == 0) {
+        emptyKeys.add(key);
+      }
+    }
+    for(Object o : emptyKeys) {
+      message.remove(o);
+    }
+    return message;
+  }
+
+
+  /**
+   * Wait until all the futures complete and join the resulting JSONObjects using the supplied binary operator
+   * and identity object.
+   *
+   * @param futures
+   * @param identity
+   * @param reduceOp
+   * @return
+   */
+  public static CompletableFuture<JSONObject> all(
+            List<CompletableFuture<JSONObject>> futures
+          , JSONObject identity
+          , BinaryOperator<JSONObject> reduceOp
+  ) {
+    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);
+    CompletableFuture<Void> future = CompletableFuture.allOf(cfs);
+    return future.thenApply(aVoid -> futures.stream().map(CompletableFuture::join).reduce(identity, reduceOp));
+  }
+
+  /**
+   * Take a message and a config and return a list of tasks indexed by adapter enrichment types.
+   * @param message
+   * @param enrichmentStrategy
+   * @param config
+   * @return
+   */
+  public Map<String, List<JSONObject>> splitMessage( JSONObject message
+                                                   , EnrichmentStrategy enrichmentStrategy
+                                                   , SensorEnrichmentConfig config
+                                                   ) {
+    Map<String, List<JSONObject>> streamMessageMap = new HashMap<>();
+    Map<String, Object> enrichmentFieldMap = enrichmentStrategy.getUnderlyingConfig(config).getFieldMap();
+
+    Map<String, ConfigHandler> fieldToHandler = enrichmentStrategy.getUnderlyingConfig(config).getEnrichmentConfigs();
+
+    Set<String> enrichmentTypes = new HashSet<>(enrichmentFieldMap.keySet());
+
+    //the set of enrichments configured
+    enrichmentTypes.addAll(fieldToHandler.keySet());
+
+    //For each of these enrichment types, we're going to construct JSONObjects
+    //which represent the individual enrichment tasks.
+    for (String enrichmentType : enrichmentTypes) {
+      Object fields = enrichmentFieldMap.get(enrichmentType);
+      ConfigHandler retriever = fieldToHandler.get(enrichmentType);
+
+      //How this is split depends on the ConfigHandler
+      List<JSONObject> enrichmentObject = retriever.getType()
+              .splitByFields( message
+                      , fields
+                      , field -> enrichmentStrategy.fieldToEnrichmentKey(enrichmentType, field)
+                      , retriever
+              );
+      streamMessageMap.put(enrichmentType, enrichmentObject);
+    }
+    return streamMessageMap;
+  }
+
+}


Mime
View raw message