metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmerri...@apache.org
Subject metron git commit: METRON-1018 Integration tests should reference flux yaml and property files deployed by Ambari (merrimanr) closes apache/metron#635
Date Wed, 26 Jul 2017 13:27:21 GMT
Repository: metron
Updated Branches:
  refs/heads/master 5c0ac32d1 -> badc6cf97


METRON-1018 Integration tests should reference flux yaml and property files deployed by Ambari (merrimanr) closes apache/metron#635


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/badc6cf9
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/badc6cf9
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/badc6cf9

Branch: refs/heads/master
Commit: badc6cf9739a31800abee1cfccbcf9930b130fa7
Parents: 5c0ac32
Author: merrimanr <merrimanr@gmail.com>
Authored: Wed Jul 26 08:24:51 2017 -0500
Committer: merrimanr <merrimanr@apache.org>
Committed: Wed Jul 26 08:24:51 2017 -0500

----------------------------------------------------------------------
 metron-deployment/packaging/ambari/.gitignore   |   2 +
 .../packaging/ambari/metron-mpack/pom.xml       |  41 ++
 .../package/scripts/params/params_linux.py      |   1 +
 .../templates/elasticsearch.properties.j2       |  49 --
 .../package/templates/enrichment.properties.j2  |  63 --
 .../docker/rpm-docker/SPECS/metron.spec         |   1 -
 .../src/main/assembly/assembly.xml              |   1 +
 .../src/main/config/elasticsearch.properties.j2 |  49 ++
 .../ElasticsearchIndexingIntegrationTest.java   |   7 +-
 .../src/main/assembly/assembly.xml              |   1 +
 .../src/main/config/enrichment.properties.j2    |  63 ++
 .../src/main/flux/enrichment/test.yaml          | 604 -------------------
 .../enrichment/bolt/GenericEnrichmentBolt.java  |  11 +-
 .../enrichment/bolt/ErrorEnrichmentBolt.java    |  46 --
 .../integration/EnrichmentIntegrationTest.java  |  75 +--
 .../integration/IndexingIntegrationTest.java    |  34 +-
 .../main/config/zookeeper/enrichments/test.json |   2 +
 .../metron/integration/BaseIntegrationTest.java |   7 +-
 .../components/FluxTopologyComponent.java       |  47 +-
 .../integration/components/KafkaComponent.java  |   2 +-
 .../components/ZKServerComponent.java           |   2 +-
 .../components/ParserTopologyComponent.java     |   3 +-
 .../PcapTopologyIntegrationTest.java            |  15 +-
 .../metron-solr/src/main/assembly/assembly.xml  |   1 +
 .../src/main/config/solr.properties.j2          |  49 ++
 .../SolrIndexingIntegrationTest.java            |   7 +-
 .../test/bolt/BaseEnrichmentBoltTest.java       |   1 +
 27 files changed, 338 insertions(+), 846 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/ambari/.gitignore
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/.gitignore b/metron-deployment/packaging/ambari/.gitignore
index e708548..10c9004 100644
--- a/metron-deployment/packaging/ambari/.gitignore
+++ b/metron-deployment/packaging/ambari/.gitignore
@@ -1,2 +1,4 @@
 archive.zip
 *.hash
+elasticsearch.properties.j2
+enrichment.properties.j2

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/ambari/metron-mpack/pom.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/pom.xml b/metron-deployment/packaging/ambari/metron-mpack/pom.xml
index 09a5da0..ae721f2 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/pom.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/pom.xml
@@ -98,6 +98,32 @@
                             </resources>
                         </configuration>
                     </execution>
+                    <execution>
+                        <id>copy-property-templates</id>
+                        <phase>prepare-package</phase>
+                        <goals>
+                            <goal>copy-resources</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${basedir}/src/main/resources/common-services/METRON/CURRENT/package/templates</outputDirectory>
+                            <resources>
+                                <resource>
+                                    <directory>${basedir}/../../../../metron-platform/metron-enrichment/src/main/config</directory>
+                                    <includes>
+                                        <include>enrichment.properties.j2</include>
+                                    </includes>
+                                    <filtering>false</filtering>
+                                </resource>
+                                <resource>
+                                    <directory>${basedir}/../../../../metron-platform/metron-elasticsearch/src/main/config</directory>
+                                    <includes>
+                                        <include>elasticsearch.properties.j2</include>
+                                    </includes>
+                                    <filtering>false</filtering>
+                                </resource>
+                            </resources>
+                        </configuration>
+                    </execution>
                 </executions>
             </plugin>
             <plugin>
@@ -129,6 +155,21 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <filesets>
+                        <fileset>
+                            <directory>${basedir}/src/main/resources/common-services/METRON/CURRENT/package/templates</directory>
+                            <includes>
+                                <include>enrichment.properties.j2</include>
+                                <include>elasticsearch.properties.j2</include>
+                            </includes>
+                        </fileset>
+                    </filesets>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index ee9542c..3f84ef5 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -164,6 +164,7 @@ HdfsResource = functools.partial(
 )
 
 # HBase
+enrichment_hbase_provider_impl = 'org.apache.metron.hbase.HTableProvider'
 enrichment_table = status_params.enrichment_table
 enrichment_cf = status_params.enrichment_cf
 threatintel_table = status_params.threatintel_table

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/elasticsearch.properties.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/elasticsearch.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/elasticsearch.properties.j2
deleted file mode 100644
index acb0f59..0000000
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/elasticsearch.properties.j2
+++ /dev/null
@@ -1,49 +0,0 @@
-{#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-##### Storm #####
-indexing.workers={{indexing_workers}}
-indexing.acker.executors={{indexing_acker_executors}}
-topology.worker.childopts={{indexing_topology_worker_childopts}}
-topology.auto-credentials={{topology_auto_credentials}}
-topology.max.spout.pending={{indexing_topology_max_spout_pending}}
-
-##### Kafka #####
-kafka.zk={{zookeeper_quorum}}
-kafka.broker={{kafka_brokers}}
-kafka.security.protocol={{kafka_security_protocol}}
-
-# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
-kafka.start={{indexing_kafka_start}}
-
-indexing.input.topic={{indexing_input_topic}}
-indexing.error.topic={{indexing_error_topic}}
-
-##### Indexing #####
-indexing.writer.class.name={{indexing_writer_class_name}}
-
-##### HDFS #####
-bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}}
-bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}}
-bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}}
-indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}}
-
-##### Parallelism #####
-kafka.spout.parallelism={{indexing_kafka_spout_parallelism}}
-indexing.writer.parallelism={{indexing_writer_parallelism}}
-hdfs.writer.parallelism={{hdfs_writer_parallelism}}

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
deleted file mode 100755
index 485b938..0000000
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
+++ /dev/null
@@ -1,63 +0,0 @@
-{#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-##### Storm #####
-enrichment.workers={{enrichment_workers}}
-enrichment.acker.executors={{enrichment_acker_executors}}
-topology.worker.childopts={{enrichment_topology_worker_childopts}}
-topology.auto-credentials={{topology_auto_credentials}}
-topology.max.spout.pending={{enrichment_topology_max_spout_pending}}
-
-##### Kafka #####
-kafka.zk={{zookeeper_quorum}}
-kafka.broker={{kafka_brokers}}
-kafka.security.protocol={{kafka_security_protocol}}
-
-# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
-kafka.start={{enrichment_kafka_start}}
-
-enrichment.input.topic={{enrichment_input_topic}}
-enrichment.output.topic={{enrichment_output_topic}}
-enrichment.error.topic={{enrichment_error_topic}}
-threat.intel.error.topic={{threatintel_error_topic}}
-
-##### JoinBolt #####
-enrichment.join.cache.size={{enrichment_join_cache_size}}
-threat.intel.join.cache.size={{threatintel_join_cache_size}}
-
-##### Enrichment #####
-hbase.provider.impl=org.apache.metron.hbase.HTableProvider
-enrichment.simple.hbase.table={{enrichment_table}}
-enrichment.simple.hbase.cf={{enrichment_cf}}
-enrichment.host.known_hosts={{enrichment_host_known_hosts}}
-
-##### Threat Intel #####
-threat.intel.tracker.table={{threatintel_table}}
-threat.intel.tracker.cf={{threatintel_cf}}
-threat.intel.simple.hbase.table={{threatintel_table}}
-threat.intel.simple.hbase.cf={{threatintel_cf}}
-
-##### Parallelism #####
-kafka.spout.parallelism={{enrichment_kafka_spout_parallelism}}
-enrichment.split.parallelism={{enrichment_split_parallelism}}
-enrichment.stellar.parallelism={{enrichment_stellar_parallelism}}
-enrichment.join.parallelism={{enrichment_join_parallelism}}
-threat.intel.split.parallelism={{threat_intel_split_parallelism}}
-threat.intel.stellar.parallelism={{threat_intel_stellar_parallelism}}
-threat.intel.join.parallelism={{threat_intel_join_parallelism}}
-kafka.writer.parallelism={{kafka_writer_parallelism}}

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index e34c240..94c7e05 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -245,7 +245,6 @@ This package installs the Metron Enrichment files
 %{metron_home}/config/zookeeper/enrichments/yaf.json
 %{metron_home}/config/zookeeper/enrichments/asa.json
 %{metron_home}/flux/enrichment/remote.yaml
-%exclude %{metron_home}/flux/enrichment/test.yaml
 %attr(0644,root,root) %{metron_home}/lib/metron-enrichment-%{full_version}-uber.jar
 
 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-elasticsearch/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/assembly/assembly.xml b/metron-platform/metron-elasticsearch/src/main/assembly/assembly.xml
index 27d5d0b..1535967 100644
--- a/metron-platform/metron-elasticsearch/src/main/assembly/assembly.xml
+++ b/metron-platform/metron-elasticsearch/src/main/assembly/assembly.xml
@@ -25,6 +25,7 @@
       <excludes>
         <exclude>**/*.formatted</exclude>
         <exclude>**/*.filtered</exclude>
+        <exclude>**/*.j2</exclude>
       </excludes>
       <fileMode>0644</fileMode>
       <lineEnding>unix</lineEnding>

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2 b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2
new file mode 100644
index 0000000..acb0f59
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties.j2
@@ -0,0 +1,49 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+##### Storm #####
+indexing.workers={{indexing_workers}}
+indexing.acker.executors={{indexing_acker_executors}}
+topology.worker.childopts={{indexing_topology_worker_childopts}}
+topology.auto-credentials={{topology_auto_credentials}}
+topology.max.spout.pending={{indexing_topology_max_spout_pending}}
+
+##### Kafka #####
+kafka.zk={{zookeeper_quorum}}
+kafka.broker={{kafka_brokers}}
+kafka.security.protocol={{kafka_security_protocol}}
+
+# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+kafka.start={{indexing_kafka_start}}
+
+indexing.input.topic={{indexing_input_topic}}
+indexing.error.topic={{indexing_error_topic}}
+
+##### Indexing #####
+indexing.writer.class.name={{indexing_writer_class_name}}
+
+##### HDFS #####
+bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}}
+bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}}
+bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}}
+indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}}
+
+##### Parallelism #####
+kafka.spout.parallelism={{indexing_kafka_spout_parallelism}}
+indexing.writer.parallelism={{indexing_writer_parallelism}}
+hdfs.writer.parallelism={{hdfs_writer_parallelism}}

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
index 54e494e..4c03526 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
@@ -102,11 +102,16 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes
     topologyProperties.setProperty("es.clustername", "metron");
     topologyProperties.setProperty("es.port", "9300");
     topologyProperties.setProperty("es.ip", "localhost");
-    topologyProperties.setProperty("indexing.writer.class.name", "org.apache.metron.elasticsearch.writer.ElasticsearchWriter");
+    topologyProperties.setProperty("indexing_writer_class_name", "org.apache.metron.elasticsearch.writer.ElasticsearchWriter");
   }
 
   @Override
   public String cleanField(String field) {
     return field;
   }
+
+  @Override
+  public String getTemplatePath() {
+    return "../metron-elasticsearch/src/main/config/elasticsearch.properties.j2";
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/assembly/assembly.xml b/metron-platform/metron-enrichment/src/main/assembly/assembly.xml
index b412ed8..6cbe4e7 100644
--- a/metron-platform/metron-enrichment/src/main/assembly/assembly.xml
+++ b/metron-platform/metron-enrichment/src/main/assembly/assembly.xml
@@ -25,6 +25,7 @@
       <excludes>
         <exclude>**/*.formatted</exclude>
         <exclude>**/*.filtered</exclude>
+        <exclude>**/*.j2</exclude>
       </excludes>
       <fileMode>0644</fileMode>
       <lineEnding>unix</lineEnding>

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/main/config/enrichment.properties.j2
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/enrichment.properties.j2 b/metron-platform/metron-enrichment/src/main/config/enrichment.properties.j2
new file mode 100755
index 0000000..f8b9b66
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/config/enrichment.properties.j2
@@ -0,0 +1,63 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+##### Storm #####
+enrichment.workers={{enrichment_workers}}
+enrichment.acker.executors={{enrichment_acker_executors}}
+topology.worker.childopts={{enrichment_topology_worker_childopts}}
+topology.auto-credentials={{topology_auto_credentials}}
+topology.max.spout.pending={{enrichment_topology_max_spout_pending}}
+
+##### Kafka #####
+kafka.zk={{zookeeper_quorum}}
+kafka.broker={{kafka_brokers}}
+kafka.security.protocol={{kafka_security_protocol}}
+
+# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+kafka.start={{enrichment_kafka_start}}
+
+enrichment.input.topic={{enrichment_input_topic}}
+enrichment.output.topic={{enrichment_output_topic}}
+enrichment.error.topic={{enrichment_error_topic}}
+threat.intel.error.topic={{threatintel_error_topic}}
+
+##### JoinBolt #####
+enrichment.join.cache.size={{enrichment_join_cache_size}}
+threat.intel.join.cache.size={{threatintel_join_cache_size}}
+
+##### Enrichment #####
+hbase.provider.impl={{enrichment_hbase_provider_impl}}
+enrichment.simple.hbase.table={{enrichment_table}}
+enrichment.simple.hbase.cf={{enrichment_cf}}
+enrichment.host.known_hosts={{enrichment_host_known_hosts}}
+
+##### Threat Intel #####
+threat.intel.tracker.table={{threatintel_table}}
+threat.intel.tracker.cf={{threatintel_cf}}
+threat.intel.simple.hbase.table={{threatintel_table}}
+threat.intel.simple.hbase.cf={{threatintel_cf}}
+
+##### Parallelism #####
+kafka.spout.parallelism={{enrichment_kafka_spout_parallelism}}
+enrichment.split.parallelism={{enrichment_split_parallelism}}
+enrichment.stellar.parallelism={{enrichment_stellar_parallelism}}
+enrichment.join.parallelism={{enrichment_join_parallelism}}
+threat.intel.split.parallelism={{threat_intel_split_parallelism}}
+threat.intel.stellar.parallelism={{threat_intel_stellar_parallelism}}
+threat.intel.join.parallelism={{threat_intel_join_parallelism}}
+kafka.writer.parallelism={{kafka_writer_parallelism}}

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
deleted file mode 100644
index b4481ff..0000000
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
+++ /dev/null
@@ -1,604 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "enrichment"
-config:
-    topology.workers: ${enrichment.workers}
-    topology.acker.executors: ${enrichment.acker.executors}
-    topology.worker.childopts: ${topology.worker.childopts}
-    topology.auto-credentials: ${topology.auto-credentials}
-    topology.max.spout.pending: ${topology.max.spout.pending}
-
-components:
-
-# Enrichment
-    -   id: "stellarEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
-        configMethods:
-            -   name: "ofType"
-                args:
-                    - "ENRICHMENT"
-
-    -   id: "stellarEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "stellar"
-            -   ref: "stellarEnrichmentAdapter"
-
-    -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
-    -   id: "geoEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "geo"
-            -   ref: "geoEnrichmentAdapter"
-    -   id: "hostEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
-        constructorArgs:
-            - '${enrichment.host.known_hosts}'
-    -   id: "hostEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "host"
-            -   ref: "hostEnrichmentAdapter"
-
-    -   id: "simpleHBaseEnrichmentConfig"
-        className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig"
-        configMethods:
-            -   name: "withProviderImpl"
-                args:
-                    - "${hbase.provider.impl}"
-            -   name: "withHBaseTable"
-                args:
-                    - "${enrichment.simple.hbase.table}"
-            -   name: "withHBaseCF"
-                args:
-                    - "${enrichment.simple.hbase.cf}"
-    -   id: "simpleHBaseEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter"
-        configMethods:
-           -    name: "withConfig"
-                args:
-                    - ref: "simpleHBaseEnrichmentConfig"
-    -   id: "simpleHBaseEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-          -   "hbaseEnrichment"
-          -   ref: "simpleHBaseEnrichmentAdapter"
-    -   id: "enrichments"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "hostEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "simpleHBaseEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "stellarEnrichment"
-
-    #enrichment error
-    -   id: "enrichmentErrorKafkaWriter"
-        className: "org.apache.metron.writer.kafka.KafkaWriter"
-        configMethods:
-            -   name: "withTopic"
-                args:
-                    - "${enrichment.error.topic}"
-            -   name: "withZkQuorum"
-                args:
-                    - "${kafka.zk}"
-
-# Threat Intel
-    -   id: "stellarThreatIntelAdapter"
-        className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
-        configMethods:
-            -   name: "ofType"
-                args:
-                    - "THREAT_INTEL"
-    -   id: "stellarThreatIntelEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "stellar"
-            -   ref: "stellarThreatIntelAdapter"
-    -   id: "simpleHBaseThreatIntelConfig"
-        className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig"
-        configMethods:
-            -   name: "withProviderImpl"
-                args:
-                    - "${hbase.provider.impl}"
-            -   name: "withTrackerHBaseTable"
-                args:
-                    - "${threat.intel.tracker.table}"
-            -   name: "withTrackerHBaseCF"
-                args:
-                    - "${threat.intel.tracker.cf}"
-            -   name: "withHBaseTable"
-                args:
-                    - "${threat.intel.simple.hbase.table}"
-            -   name: "withHBaseCF"
-                args:
-                    - "${threat.intel.simple.hbase.cf}"
-    -   id: "simpleHBaseThreatIntelAdapter"
-        className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter"
-        configMethods:
-           -    name: "withConfig"
-                args:
-                    - ref: "simpleHBaseThreatIntelConfig"
-    -   id: "simpleHBaseThreatIntelEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-          -   "hbaseThreatIntel"
-          -   ref: "simpleHBaseThreatIntelAdapter"
-
-    -   id: "threatIntels"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "simpleHBaseThreatIntelEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "stellarThreatIntelEnrichment"
-
-    #threatintel error
-    -   id: "threatIntelErrorKafkaWriter"
-        className: "org.apache.metron.writer.kafka.KafkaWriter"
-        configMethods:
-            -   name: "withTopic"
-                args:
-                    - "${threat.intel.error.topic}"
-            -   name: "withZkQuorum"
-                args:
-                    - "${kafka.zk}"
-
-#indexing
-    -   id: "kafkaWriter"
-        className: "org.apache.metron.writer.kafka.KafkaWriter"
-        configMethods:
-            -   name: "withTopic"
-                args:
-                    - "${enrichment.output.topic}"
-            -   name: "withZkQuorum"
-                args:
-                    - "${kafka.zk}"
-
-#kafka/zookeeper
-# Any kafka props for the producer go here.
-    -   id: "kafkaProps"
-        className: "java.util.HashMap"
-        configMethods:
-          -   name: "put"
-              args:
-                  - "value.deserializer"
-                  - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
-          -   name: "put"
-              args:
-                  - "key.deserializer"
-                  - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
-          -   name: "put"
-              args:
-                  - "group.id"
-                  - "enrichments"
-          -   name: "put"
-              args:
-                  - "security.protocol"
-                  - "${kafka.security.protocol}"
-
-
-  # The fields to pull out of the kafka messages
-    -   id: "fields"
-        className: "java.util.ArrayList"
-        configMethods:
-          -   name: "add"
-              args:
-                  - "value"
-
-    -   id: "kafkaConfig"
-        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
-        constructorArgs:
-          - ref: "kafkaProps"
-          # topic name
-          - "${enrichment.input.topic}"
-          - "${kafka.zk}"
-          - ref: "fields"
-        configMethods:
-            -   name: "setFirstPollOffsetStrategy"
-                args:
-                    - "${kafka.start}"
-
-
-spouts:
-    -   id: "kafkaSpout"
-        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
-        constructorArgs:
-            - ref: "kafkaConfig"
-        parallelism: ${kafka.spout.parallelism}
-
-bolts:
-# Enrichment Bolts
-    -   id: "enrichmentSplitBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "enrichments"
-        parallelism: ${enrichment.split.parallelism}
-
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-
-    -   id: "stellarEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "stellarEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${enrichment.stellar.parallelism}
-
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "hostEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-
-    -   id: "simpleHBaseEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "simpleHBaseEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "ErrorEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.ErrorEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "enrichmentJoinBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMaxCacheSize"
-                args: [${enrichment.join.cache.size}]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${enrichment.join.parallelism}
-
-    -   id: "enrichmentErrorOutputBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMessageWriter"
-                args:
-                    - ref: "enrichmentErrorKafkaWriter"
-
-
-# Threat Intel Bolts
-    -   id: "threatIntelSplitBolt"
-        className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "threatIntels"
-            -   name: "withMessageFieldName"
-                args: ["message"]
-        parallelism: ${threat.intel.split.parallelism}
-
-    -   id: "simpleHBaseThreatIntelBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "simpleHBaseThreatIntelEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "stellarThreatIntelBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "stellarThreatIntelEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${threat.intel.stellar.parallelism}
-
-    -   id: "threatIntelJoinBolt"
-        className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMaxCacheSize"
-                args: [${threat.intel.join.cache.size}]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${threat.intel.join.parallelism}
-
-    -   id: "threatIntelErrorOutputBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMessageWriter"
-                args:
-                    - ref: "threatIntelErrorKafkaWriter"
-
-# Indexing Bolts
-    -   id: "outputBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMessageWriter"
-                args:
-                    - ref: "kafkaWriter"
-        parallelism: ${kafka.writer.parallelism}
-
-
-streams:
-#parser
-    -   name: "spout -> enrichmentSplit"
-        from: "kafkaSpout"
-        to: "enrichmentSplitBolt"
-        grouping:
-            type: SHUFFLE
-
-#enrichment
-    -   name: "enrichmentSplit -> host"
-        from: "enrichmentSplitBolt"
-        to: "hostEnrichmentBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-    -   name: "enrichmentSplit -> geo"
-        from: "enrichmentSplitBolt"
-        to: "geoEnrichmentBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "enrichmentSplit -> stellar"
-        from: "enrichmentSplitBolt"
-        to: "stellarEnrichmentBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["key"]
-
-
-    -   name: "enrichmentSplit -> simpleHBaseEnrichmentBolt"
-        from: "enrichmentSplitBolt"
-        to: "simpleHBaseEnrichmentBolt"
-        grouping:
-            streamId: "hbaseEnrichment"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "enrichmentSplit -> ErrorEnrichmentBolt"
-        from: "enrichmentSplitBolt"
-        to: "ErrorEnrichmentBolt"
-        grouping:
-            streamId: "hbaseEnrichment"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "splitter -> join"
-        from: "enrichmentSplitBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "geo -> join"
-        from: "geoEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["key"]
-    -   name: "stellar -> join"
-        from: "stellarEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["key"]
-
-
-
-    -   name: "simpleHBaseEnrichmentBolt -> join"
-        from: "simpleHBaseEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "hbaseEnrichment"
-            type: FIELDS
-            args: ["key"]
-    -   name: "host -> join"
-        from: "hostEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-
-    # Error output
-    -   name: "geoEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "geoEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "stellarEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "stellarEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "hostEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "hostEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "simpleHBaseEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "simpleHBaseEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "ErrorEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "ErrorEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: FIELDS
-            args: ["message"]
-
-#threat intel
-    -   name: "enrichmentJoin -> threatSplit"
-        from: "enrichmentJoinBolt"
-        to: "threatIntelSplitBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "threatSplit -> simpleHBaseThreatIntel"
-        from: "threatIntelSplitBolt"
-        to: "simpleHBaseThreatIntelBolt"
-        grouping:
-            streamId: "hbaseThreatIntel"
-            type: FIELDS
-            args: ["key"]
-    -   name: "threatSplit -> stellarThreatIntel"
-        from: "threatIntelSplitBolt"
-        to: "stellarThreatIntelBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["key"]
-
-
-    -   name: "simpleHBaseThreatIntel -> join"
-        from: "simpleHBaseThreatIntelBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "hbaseThreatIntel"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "stellarThreatIntel -> join"
-        from: "stellarThreatIntelBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "threatIntelSplit -> threatIntelJoin"
-        from: "threatIntelSplitBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-#output
-    -   name: "threatIntelJoin -> output"
-        from: "threatIntelJoinBolt"
-        to: "outputBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-
-    # Error output
-    -   name: "simpleHBaseThreatIntelBolt -> threatIntelErrorOutputBolt"
-        from: "simpleHBaseThreatIntelBolt"
-        to: "threatIntelErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "stellarThreatIntelBolt -> threatIntelErrorOutputBolt"
-        from: "stellarThreatIntelBolt"
-        to: "threatIntelErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: FIELDS
-            args: ["message"]
-

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 8b6fee0..907b309 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -197,7 +197,6 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
       else {
         throw new RuntimeException("Source type is missing from enrichment fragment: " + rawMessage.toJSONString());
       }
-      boolean error = false;
       String prefix = null;
       for (Object o : rawMessage.keySet()) {
         String field = (String) o;
@@ -210,7 +209,11 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
             SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
             if(config == null) {
               LOG.error("Unable to find SensorEnrichmentConfig for sourceType: " + sourceType);
-              error = true;
+              MetronError metronError = new MetronError()
+                      .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
+                      .withMessage("Unable to find SensorEnrichmentConfig for sourceType: " + sourceType)
+                      .addRawMessage(rawMessage);
+              ErrorUtils.handleError(collector, metronError);
               continue;
             }
             config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF, stellarContext);
@@ -226,7 +229,6 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
             }
             catch(Exception e) {
               LOG.error(e.getMessage(), e);
-              error = true;
               MetronError metronError = new MetronError()
                       .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
                       .withThrowable(e)
@@ -250,9 +252,6 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
       }
 
       enrichedMessage.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis());
-      if(error) {
-        throw new Exception("Unable to enrich " + rawMessage + " check logs for specifics.");
-      }
       if (!enrichedMessage.isEmpty()) {
         collector.emit(enrichmentType, new Values(key, enrichedMessage, subGroup));
       }

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ErrorEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ErrorEnrichmentBolt.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ErrorEnrichmentBolt.java
deleted file mode 100644
index 571079c..0000000
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ErrorEnrichmentBolt.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.bolt;
-
-import org.json.simple.JSONObject;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * Exists in order to provide a bolt that tests that when the GenericEnrichmentBolt writes to error, that it actually carries
- * through the queue
- */
-public class ErrorEnrichmentBolt extends GenericEnrichmentBolt {
-
-  public static final String TEST_ERROR_MESSAGE = "Test throwing error from ErrorEnrichmentBolt";
-
-  public ErrorEnrichmentBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void execute(Tuple tuple) {
-    JSONObject rawMessage = new JSONObject();
-    rawMessage.put("rawMessage", "Error Test Raw Message String");
-
-    JSONObject enrichedMessage= new JSONObject();
-    enrichedMessage.put("enrichedMessage", "Error Test Enriched Message String");
-    handleError("key", rawMessage, "subgroup", enrichedMessage, new IllegalStateException(TEST_ERROR_MESSAGE));
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index e798b72..a9a2fea 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -89,7 +89,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
   public static final String DEFAULT_DMACODE= "test dmaCode";
   public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE);
 
-  protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/test.yaml";
+  protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/remote.yaml";
+  protected String templatePath = "../metron-enrichment/src/main/config/enrichment.properties.j2";
   protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
   private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath);
 
@@ -124,40 +125,39 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
     final String threatIntelTableName = "threat_intel";
     final String enrichmentsTableName = "enrichments";
     final Properties topologyProperties = new Properties() {{
-      setProperty("enrichment.workers", "1");
-      setProperty("enrichment.acker.executors", "0");
-      setProperty("topology.worker.childopts", "");
-      setProperty("topology.auto-credentials", "[]");
-      setProperty("topology.max.spout.pending", "");
-      setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
-      setProperty("kafka.security.protocol", "PLAINTEXT");
-      setProperty("enrichment.input.topic", Constants.ENRICHMENT_TOPIC);
-      setProperty("enrichment.output.topic", Constants.INDEXING_TOPIC);
-      setProperty("enrichment.error.topic", ERROR_TOPIC);
-      setProperty("threat.intel.error.topic", ERROR_TOPIC);
-      setProperty("enrichment.join.cache.size", "1000");
-      setProperty("threat.intel.join.cache.size", "1000");
-
-      setProperty("enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"},\n" +
-              "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"},\n" +
-              "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"},\n" +
+      setProperty("enrichment_workers", "1");
+      setProperty("enrichment_acker_executors", "0");
+      setProperty("enrichment_topology_worker_childopts", "");
+      setProperty("topology_auto_credentials", "[]");
+      setProperty("enrichment_topology_max_spout_pending", "");
+      setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST");
+      setProperty("kafka_security_protocol", "PLAINTEXT");
+      setProperty("enrichment_input_topic", Constants.ENRICHMENT_TOPIC);
+      setProperty("enrichment_output_topic", Constants.INDEXING_TOPIC);
+      setProperty("enrichment_error_topic", ERROR_TOPIC);
+      setProperty("threatintel_error_topic", ERROR_TOPIC);
+      setProperty("enrichment_join_cache_size", "1000");
+      setProperty("threatintel_join_cache_size", "1000");
+      setProperty("enrichment_hbase_provider_impl", "org.apache.metron.enrichment.integration.EnrichmentIntegrationTest\\$Provider");
+      setProperty("enrichment_table", enrichmentsTableName);
+      setProperty("enrichment_cf", cf);
+      setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," +
+              "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," +
+              "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," +
               "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
-      setProperty("hbase.provider.impl", "" + Provider.class.getName());
-      setProperty("threat.intel.tracker.table", trackerHBaseTableName);
-      setProperty("threat.intel.tracker.cf", cf);
-      setProperty("threat.intel.simple.hbase.table", threatIntelTableName);
-      setProperty("threat.intel.simple.hbase.cf", cf);
-      setProperty("enrichment.simple.hbase.table", enrichmentsTableName);
-      setProperty("enrichment.simple.hbase.cf", cf);
-
-      setProperty("kafka.spout.parallelism", "1");
-      setProperty("enrichment.split.parallelism", "1");
-      setProperty("enrichment.stellar.parallelism", "1");
-      setProperty("enrichment.join.parallelism", "1");
-      setProperty("threat.intel.split.parallelism", "1");
-      setProperty("threat.intel.stellar.parallelism", "1");
-      setProperty("threat.intel.join.parallelism", "1");
-      setProperty("kafka.writer.parallelism", "1");
+
+      setProperty("threatintel_table", threatIntelTableName);
+      setProperty("threatintel_cf", cf);
+
+
+      setProperty("enrichment_kafka_spout_parallelism", "1");
+      setProperty("enrichment_split_parallelism", "1");
+      setProperty("enrichment_stellar_parallelism", "1");
+      setProperty("enrichment_join_parallelism", "1");
+      setProperty("threat_intel_split_parallelism", "1");
+      setProperty("threat_intel_stellar_parallelism", "1");
+      setProperty("threat_intel_join_parallelism", "1");
+      setProperty("kafka_writer_parallelism", "1");
 
     }};
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
@@ -200,6 +200,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
             .withTopologyLocation(new File(fluxPath))
             .withTopologyName("test")
+            .withTemplateLocation(new File(templatePath))
             .withTopologyProperties(topologyProperties)
             .build();
 
@@ -254,10 +255,10 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
 
   protected void validateErrors(List<Map<String, Object>> errors) {
     for(Map<String, Object> error : errors) {
-      Assert.assertEquals("Test throwing error from ErrorEnrichmentBolt", error.get(Constants.ErrorFields.MESSAGE.getName()));
-      Assert.assertEquals("java.lang.IllegalStateException: Test throwing error from ErrorEnrichmentBolt", error.get(Constants.ErrorFields.EXCEPTION.getName()));
+      Assert.assertEquals("java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.MESSAGE.getName()));
+      Assert.assertEquals("com.google.common.util.concurrent.UncheckedExecutionException: java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.EXCEPTION.getName()));
       Assert.assertEquals(Constants.ErrorType.ENRICHMENT_ERROR.getType(), error.get(Constants.ErrorFields.ERROR_TYPE.getName()));
-      Assert.assertEquals("{\"rawMessage\":\"Error Test Raw Message String\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
+      Assert.assertEquals("{\"error_test\":{},\"source.type\":\"test\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index 7c97479..c0f9919 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -119,23 +119,23 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
     cleanHdfsDir(hdfsDir);
     final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
     final Properties topologyProperties = new Properties() {{
-      setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
-      setProperty("kafka.security.protocol", "PLAINTEXT");
-      setProperty("storm.auto.credentials", "[]");
-      setProperty("indexing.workers", "1");
-      setProperty("indexing.acker.executors", "0");
-      setProperty("topology.max.spout.pending", "");
-      setProperty("indexing.input.topic", Constants.INDEXING_TOPIC);
-      setProperty("indexing.error.topic", ERROR_TOPIC);
-      setProperty("topology.auto-credentials", "[]");
+      setProperty("indexing_kafka_start", "UNCOMMITTED_EARLIEST");
+      setProperty("kafka_security_protocol", "PLAINTEXT");
+      setProperty("topology_auto_credentials", "[]");
+      setProperty("indexing_workers", "1");
+      setProperty("indexing_acker_executors", "0");
+      setProperty("indexing_topology_worker_childopts", "");
+      setProperty("indexing_topology_max_spout_pending", "");
+      setProperty("indexing_input_topic", Constants.INDEXING_TOPIC);
+      setProperty("indexing_error_topic", ERROR_TOPIC);
       //HDFS settings
-      setProperty("bolt.hdfs.rotation.policy", TimedRotationPolicy.class.getCanonicalName());
-      setProperty("bolt.hdfs.rotation.policy.count", "1");
-      setProperty("bolt.hdfs.rotation.policy.units", "DAYS");
-      setProperty("indexing.hdfs.output", hdfsDir);
-      setProperty("kafka.spout.parallelism", "1");
-      setProperty("indexing.writer.parallelism", "1");
-      setProperty("hdfs.writer.parallelism", "1");
+      setProperty("bolt_hdfs_rotation_policy", TimedRotationPolicy.class.getCanonicalName());
+      setProperty("bolt_hdfs_rotation_policy_count", "1");
+      setProperty("bolt_hdfs_rotation_policy_units", "DAYS");
+      setProperty("metron_apps_indexed_hdfs_dir", hdfsDir);
+      setProperty("indexing_kafka_spout_parallelism", "1");
+      setProperty("indexing_writer_parallelism", "1");
+      setProperty("hdfs_writer_parallelism", "1");
     }};
     setAdditionalProperties(topologyProperties);
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
@@ -168,6 +168,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
             .withTopologyLocation(new File(fluxPath))
             .withTopologyName("test")
+            .withTemplateLocation(new File(getTemplatePath()))
             .withTopologyProperties(topologyProperties)
             .build();
 
@@ -305,4 +306,5 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
   public abstract InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception;
   public abstract void setAdditionalProperties(Properties topologyProperties);
   public abstract String cleanField(String field);
+  public abstract String getTemplatePath();
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
index bba6679..4130943 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
@@ -26,6 +26,8 @@
           }
           ,"dst_enrichment" : {
             "dst_classification" : "ENRICHMENT_GET('playful_classification', ip_dst_addr, 'enrichments', 'cf')"
+          },"error_test" : {
+            "error_test": "1/0"
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
index 7ae373b..1e7cd25 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
@@ -17,11 +17,9 @@
  */
 package org.apache.metron.integration;
 
-import com.google.common.base.Function;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 
-import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Properties;
 
@@ -33,7 +31,10 @@ public abstract class BaseIntegrationTest {
 
     protected static ZKServerComponent getZKServerComponent(final Properties topologyProperties) {
         return new ZKServerComponent()
-                .withPostStartCallback((zkComponent) -> topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString())
+                .withPostStartCallback((zkComponent) -> {
+                  topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString());
+                  topologyProperties.setProperty("kafka.zk", zkComponent.getConnectionString());
+                        }
                 );
     }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
index 779db37..658e149 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.integration.components;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -45,7 +46,9 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Comparator;
+import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 public class FluxTopologyComponent implements InMemoryComponent {
 
@@ -54,12 +57,14 @@ public class FluxTopologyComponent implements InMemoryComponent {
   LocalCluster stormCluster;
   String topologyName;
   File topologyLocation;
+  File templateLocation;
   Properties topologyProperties;
 
   public static class Builder {
 
     String topologyName;
     File topologyLocation;
+    File templateLocation;
     Properties topologyProperties;
 
     public Builder withTopologyName(String name) {
@@ -72,6 +77,11 @@ public class FluxTopologyComponent implements InMemoryComponent {
       return this;
     }
 
+    public Builder withTemplateLocation(File location) {
+      this.templateLocation = location;
+      return this;
+    }
+
     public Builder withTopologyProperties(Properties properties) {
       this.topologyProperties = properties;
       this.topologyProperties.put("storm.home", "target");
@@ -79,13 +89,14 @@ public class FluxTopologyComponent implements InMemoryComponent {
     }
 
     public FluxTopologyComponent build() {
-      return new FluxTopologyComponent(topologyName, topologyLocation, topologyProperties);
+      return new FluxTopologyComponent(topologyName, topologyLocation, templateLocation, topologyProperties);
     }
   }
 
-  public FluxTopologyComponent(String topologyName, File topologyLocation, Properties topologyProperties) {
+  public FluxTopologyComponent(String topologyName, File topologyLocation, File templateLocation, Properties topologyProperties) {
     this.topologyName = topologyName;
     this.topologyLocation = topologyLocation;
+    this.templateLocation = templateLocation;
     this.topologyProperties = topologyProperties;
   }
 
@@ -101,6 +112,10 @@ public class FluxTopologyComponent implements InMemoryComponent {
     return topologyLocation;
   }
 
+  public File getTemplateLocation() {
+    return templateLocation;
+  }
+
   public Properties getTopologyProperties() {
     return topologyProperties;
   }
@@ -217,11 +232,11 @@ public class FluxTopologyComponent implements InMemoryComponent {
   }
 
   public void submitTopology() throws NoSuchMethodException, IOException, InstantiationException, TException, IllegalAccessException, InvocationTargetException, ClassNotFoundException, NoSuchFieldException {
-    startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties());
+    startTopology(getTopologyName(), getTopologyLocation(), getTemplateLocation(), getTopologyProperties());
   }
 
-  private void startTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException, NoSuchFieldException{
-    TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties);
+  private void startTopology(String topologyName, File topologyLoc, File templateFile, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException, NoSuchFieldException{
+    TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, templateFile, properties);
     Config conf = FluxBuilder.buildConfig(topologyDef);
     ExecutionContext context = new ExecutionContext(topologyDef, conf);
     StormTopology topology = FluxBuilder.buildTopology(context);
@@ -239,12 +254,26 @@ public class FluxTopologyComponent implements InMemoryComponent {
     }
   }
 
-  private static TopologyDef loadYaml(String topologyName, File yamlFile, Properties properties) throws IOException {
+  private static TopologyDef loadYaml(String topologyName, File yamlFile, File templateFile, Properties properties) throws IOException {
     File tmpFile = File.createTempFile(topologyName, "props");
     tmpFile.deleteOnExit();
-    try (FileWriter propWriter = new FileWriter(tmpFile)){
-      properties.store(propWriter, topologyName + " properties");
-      return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
+    if (templateFile != null) {
+      try (FileWriter propWriter = new FileWriter(tmpFile)){
+        String templateContents = FileUtils.readFileToString(templateFile);
+        for(Map.Entry prop: properties.entrySet()) {
+          String replacePattern = String.format("{{%s}}", prop.getKey());
+          templateContents = templateContents.replaceAll(Pattern.quote(replacePattern), (String) prop.getValue());
+        }
+        propWriter.write(templateContents);
+        propWriter.flush();
+        return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
+      }
+    } else {
+      try (FileWriter propWriter = new FileWriter(tmpFile)){
+        properties.store(propWriter, topologyName + " properties");
+        return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
+      }
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
index 6ec1314..1e5a041 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
@@ -142,7 +142,7 @@ public class KafkaComponent implements InMemoryComponent {
   @Override
   public void start() {
     // setup Zookeeper
-    zookeeperConnectString = topologyProperties.getProperty("kafka.zk");
+    zookeeperConnectString = topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY);
 
     zkClient = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
index cc85d5f..0a47034 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
@@ -28,7 +28,7 @@ import java.util.Optional;
 import java.util.function.Consumer;
 
 public class ZKServerComponent implements InMemoryComponent {
-  public static final String ZOOKEEPER_PROPERTY = "kafka.zk";
+  public static final String ZOOKEEPER_PROPERTY = "zookeeper_quorum";
   private TestingServer testZkServer;
   private String zookeeperUrl = null;
   private Map<String,String> properties = null;

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index b556411..171ff47 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.parsers.integration.components;
 
+import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.KillOptions;
@@ -83,7 +84,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
   @Override
   public void start() throws UnableToStartException {
     try {
-      TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty("kafka.zk")
+      TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY)
                                                                    , Optional.ofNullable(brokerUrl)
                                                                    , sensorType
                                                                    , 1

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index e988c30..29c68d0 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -29,6 +29,8 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +46,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.metron.common.Constants;
+import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.Processor;
 import org.apache.metron.integration.ProcessorResult;
@@ -67,7 +70,7 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-public class PcapTopologyIntegrationTest {
+public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
   final static String KAFKA_TOPIC = "pcap";
   private static String BASE_DIR = "pcap";
   private static String DATA_DIR = BASE_DIR + "/data_dir";
@@ -185,12 +188,10 @@ public class PcapTopologyIntegrationTest {
     }};
     updatePropertiesCallback.apply(topologyProperties);
 
-    final ZKServerComponent zkServerComponent = new ZKServerComponent().withPostStartCallback(
-            (zkComponent) -> topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString())
-    );
-    final KafkaComponent kafkaComponent = new KafkaComponent().withTopics(new ArrayList<KafkaComponent.Topic>() {{
-      add(new KafkaComponent.Topic(KAFKA_TOPIC, 1));
-    }}).withTopologyProperties(topologyProperties);
+    final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
+
+    final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, Collections.singletonList(
+            new KafkaComponent.Topic(KAFKA_TOPIC, 1)));
 
 
     final MRComponent mr = new MRComponent().withBasePath(baseDir.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-solr/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/assembly/assembly.xml b/metron-platform/metron-solr/src/main/assembly/assembly.xml
index 93eeeb1..51491ae 100644
--- a/metron-platform/metron-solr/src/main/assembly/assembly.xml
+++ b/metron-platform/metron-solr/src/main/assembly/assembly.xml
@@ -25,6 +25,7 @@
       <excludes>
         <exclude>**/*.formatted</exclude>
         <exclude>**/*.filtered</exclude>
+        <exclude>**/*.j2</exclude>
       </excludes>
       <fileMode>0644</fileMode>
       <lineEnding>unix</lineEnding>

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-solr/src/main/config/solr.properties.j2
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/solr.properties.j2 b/metron-platform/metron-solr/src/main/config/solr.properties.j2
new file mode 100644
index 0000000..acb0f59
--- /dev/null
+++ b/metron-platform/metron-solr/src/main/config/solr.properties.j2
@@ -0,0 +1,49 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+##### Storm #####
+indexing.workers={{indexing_workers}}
+indexing.acker.executors={{indexing_acker_executors}}
+topology.worker.childopts={{indexing_topology_worker_childopts}}
+topology.auto-credentials={{topology_auto_credentials}}
+topology.max.spout.pending={{indexing_topology_max_spout_pending}}
+
+##### Kafka #####
+kafka.zk={{zookeeper_quorum}}
+kafka.broker={{kafka_brokers}}
+kafka.security.protocol={{kafka_security_protocol}}
+
+# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+kafka.start={{indexing_kafka_start}}
+
+indexing.input.topic={{indexing_input_topic}}
+indexing.error.topic={{indexing_error_topic}}
+
+##### Indexing #####
+indexing.writer.class.name={{indexing_writer_class_name}}
+
+##### HDFS #####
+bolt.hdfs.rotation.policy={{bolt_hdfs_rotation_policy}}
+bolt.hdfs.rotation.policy.units={{bolt_hdfs_rotation_policy_units}}
+bolt.hdfs.rotation.policy.count={{bolt_hdfs_rotation_policy_count}}
+indexing.hdfs.output={{metron_apps_indexed_hdfs_dir}}
+
+##### Parallelism #####
+kafka.spout.parallelism={{indexing_kafka_spout_parallelism}}
+indexing.writer.parallelism={{indexing_writer_parallelism}}
+hdfs.writer.parallelism={{hdfs_writer_parallelism}}

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
index f47e8e8..c6406c8 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
@@ -115,11 +115,16 @@ public class SolrIndexingIntegrationTest extends IndexingIntegrationTest {
 
   @Override
   public void setAdditionalProperties(Properties topologyProperties) {
-    topologyProperties.setProperty("indexing.writer.class.name", "org.apache.metron.solr.writer.SolrWriter");
+    topologyProperties.setProperty("indexing_writer_class_name", "org.apache.metron.solr.writer.SolrWriter");
   }
 
   @Override
   public String cleanField(String field) {
     return field.replaceFirst("_[dfils]$", "");
   }
+
+  @Override
+  public String getTemplatePath() {
+    return "../metron-solr/src/main/config/solr.properties.j2";
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/badc6cf9/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
index ef4d69d..8e351ff 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseEnrichmentBoltTest.java
@@ -97,6 +97,7 @@ public class BaseEnrichmentBoltTest extends BaseBoltTest {
     joinStreamIds.add("stellar:numeric");
     joinStreamIds.add("stellar:dst_enrichment");
     joinStreamIds.add("stellar:src_enrichment");
+    joinStreamIds.add("stellar:error_test");
     joinStreamIds.add("host:");
     joinStreamIds.add("hbaseEnrichment:");
     joinStreamIds.add("message:");


Mime
View raw message