metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [15/15] incubator-metron git commit: METRON-142 Simplify Parser configuration (merrimanr via cestella) closes apache/incubator-metron#120
Date Mon, 16 May 2016 18:07:56 GMT
METRON-142 Simplify Parser configuration (merrimanr via cestella) closes apache/incubator-metron#120


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/df8d682e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/df8d682e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/df8d682e

Branch: refs/heads/master
Commit: df8d682e8ffe365cd9d03000730f43342cc8dd95
Parents: 00f8588
Author: merrimanr <merrimanr@gmail.com>
Authored: Mon May 16 14:07:25 2016 -0400
Committer: cstella <cestella@gmail.com>
Committed: Mon May 16 14:07:25 2016 -0400

----------------------------------------------------------------------
 metron-deployment/playbooks/metron_install.yml  |     5 +-
 .../files/config/sensors/bro.json               |    20 -
 .../files/config/sensors/pcap.json              |    23 -
 .../files/config/sensors/snort.json             |    28 -
 .../files/config/sensors/yaf.json               |    22 -
 .../roles/metron_streaming/tasks/main.yml       |     6 +
 .../metron_streaming/tasks/source_config.yml    |    12 -
 .../roles/metron_streaming/tasks/topologies.yml |     9 -
 .../monit/templates/scripts/start_topology.sh   |     2 +-
 .../pcapservice/ConfigurationUtilTest.java      |     2 -
 .../org/apache/metron/common/Constants.java     |     4 -
 .../metron/common/bolt/ConfiguredBolt.java      |    35 +-
 .../common/bolt/ConfiguredEnrichmentBolt.java   |    64 +
 .../common/bolt/ConfiguredParserBolt.java       |    64 +
 .../common/configuration/Configuration.java     |     4 -
 .../common/configuration/ConfigurationType.java |    29 +-
 .../common/configuration/Configurations.java    |    46 +-
 .../configuration/ConfigurationsUtils.java      |   151 +-
 .../configuration/EnrichmentConfigurations.java |    49 +
 .../configuration/ParserConfigurations.java     |    48 +
 .../configuration/SensorParserConfig.java       |    89 +
 .../common/interfaces/BulkMessageWriter.java    |     6 +-
 .../metron/common/spout/kafka/SpoutConfig.java  |     6 +-
 .../metron/common/utils/ReflectionUtils.java    |    34 +-
 .../common/bolt/BaseConfiguredBoltTest.java     |    48 +
 .../metron/common/bolt/ConfiguredBoltTest.java  |   162 -
 .../bolt/ConfiguredEnrichmentBoltTest.java      |   137 +
 .../common/bolt/ConfiguredParserBoltTest.java   |   129 +
 .../ConfigurationManagerIntegrationTest.java    |     7 +-
 .../common/cli/ConfigurationsUtilsTest.java     |    36 +-
 .../common/configuration/ConfigurationTest.java |     2 +-
 .../configuration/ConfigurationsTest.java       |     3 +-
 .../SensorEnrichmentConfigTest.java             |     2 +-
 .../SensorEnrichmentUpdateConfigTest.java       |     2 +-
 .../writer/ElasticsearchWriter.java             |     6 +-
 .../ElasticsearchEnrichmentIntegrationTest.java |     2 +-
 .../src/main/assembly/assembly.xml              |    12 +
 .../main/config/zookeeper/enrichments/bro.json  |    20 +
 .../config/zookeeper/enrichments/snort.json     |    28 +
 .../config/zookeeper/enrichments/websphere.json |    20 +
 .../main/config/zookeeper/enrichments/yaf.json  |    22 +
 .../src/main/flux/enrichment/test.yaml          |    10 -
 .../enrichment/bolt/BulkMessageWriterBolt.java  |     3 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java  |    13 +-
 .../apache/metron/enrichment/bolt/JoinBolt.java |     3 +-
 .../metron/enrichment/bolt/SplitBolt.java       |     3 +-
 .../apache/metron/writer/hdfs/HdfsWriter.java   |     8 +-
 .../bolt/BulkMessageWriterBoltTest.java         |    13 +-
 .../enrichment/bolt/EnrichmentJoinBoltTest.java |     2 +-
 .../bolt/GenericEnrichmentBoltTest.java         |     4 +-
 .../metron/enrichment/bolt/SplitBoltTest.java   |    10 +-
 .../bolt/ThreatIntelJoinBoltTest.java           |     6 +-
 .../main/config/zookeeper/enrichments/test.json |    51 +
 .../src/main/config/zookeeper/global.json       |    10 +
 .../metron/integration/BaseIntegrationTest.java |     7 +-
 .../integration/EnrichmentIntegrationTest.java  |    18 +-
 .../components/ConfigUploadComponent.java       |    82 +
 .../components/KafkaWithZKComponent.java        |     2 +
 .../metron/integration/utils/SampleUtil.java    |    21 +-
 .../main/resources/sample/config/global.json    |    10 -
 .../resources/sample/config/sensors/bro.json    |    29 -
 .../resources/sample/config/sensors/pcap.json   |    24 -
 .../resources/sample/config/sensors/snort.json  |    32 -
 .../sample/config/sensors/websphere.json        |    20 -
 .../resources/sample/config/sensors/yaf.json    |    51 -
 .../sample/data/SampleIndexed/YafIndexed        |    10 -
 .../data/SampleInput/.PCAPExampleOutput.crc     |   Bin 44 -> 0 bytes
 .../resources/sample/data/SampleInput/AsaOutput |   100 -
 .../sample/data/SampleInput/BluecoatSyslog.txt  |   144 -
 .../sample/data/SampleInput/BroExampleOutput    | 23411 -----------------
 .../data/SampleInput/FireeyeExampleOutput       |    90 -
 .../sample/data/SampleInput/ISESampleOutput     |   308 -
 .../data/SampleInput/LancopeExampleOutput       |    40 -
 .../sample/data/SampleInput/PCAPExampleOutput   |   Bin 4510 -> 0 bytes
 .../sample/data/SampleInput/PaloaltoOutput      |   100 -
 .../sample/data/SampleInput/SnortOutput         |     3 -
 .../data/SampleInput/SourcefireExampleOutput    |     2 -
 .../sample/data/SampleInput/SquidExampleOutput  |     2 -
 .../sample/data/SampleInput/WebsphereOutput.txt |     5 -
 .../sample/data/SampleInput/YafExampleOutput    |    10 -
 .../sample/data/SampleParsed/BluecoatParsed     |   144 -
 .../sample/data/SampleParsed/SnortParsed        |     3 -
 .../sample/data/SampleParsed/SquidExampleParsed |     2 -
 .../sample/data/SampleParsed/WebsphereParsed    |     5 -
 .../sample/data/SampleParsed/YafExampleParsed   |    10 -
 .../src/main/resources/sample/patterns/test     |     2 -
 .../data/SampleInput/.PCAPExampleOutput.crc     |   Bin 0 -> 44 bytes
 .../src/main/sample/data/SampleInput/AsaOutput  |   100 +
 .../data/SampleInput/FireeyeExampleOutput       |    90 +
 .../sample/data/SampleInput/ISESampleOutput     |   308 +
 .../data/SampleInput/LancopeExampleOutput       |    40 +
 .../sample/data/SampleInput/PCAPExampleOutput   |   Bin 0 -> 4510 bytes
 .../main/sample/data/SampleInput/PaloaltoOutput |   100 +
 .../data/SampleInput/SourcefireExampleOutput    |     2 +
 .../sample/data/bluecoat/parsed/BluecoatParsed  |   144 +
 .../sample/data/bluecoat/raw/BluecoatSyslog.txt |   144 +
 .../sample/data/bro/parsed/BroExampleParsed     |    10 +
 .../main/sample/data/bro/raw/BroExampleOutput   |    10 +
 .../main/sample/data/snort/parsed/SnortParsed   |     3 +
 .../src/main/sample/data/snort/raw/SnortOutput  |     3 +
 .../sample/data/squid/parsed/SquidExampleParsed |     2 +
 .../sample/data/squid/raw/SquidExampleOutput    |     2 +
 .../main/sample/data/test/indexed/TestIndexed   |    10 +
 .../sample/data/test/parsed/TestExampleParsed   |    10 +
 .../data/websphere/parsed/WebsphereParsed       |     5 +
 .../data/websphere/raw/WebsphereOutput.txt      |     5 +
 .../src/main/sample/data/yaf/indexed/YafIndexed |    10 +
 .../sample/data/yaf/parsed/YafExampleParsed     |    10 +
 .../main/sample/data/yaf/raw/YafExampleOutput   |    10 +
 .../src/main/sample/patterns/test               |     2 +
 .../src/main/config/parsers.properties          |    21 -
 .../main/config/zookeeper/parsers/bluecoat.json |     5 +
 .../src/main/config/zookeeper/parsers/bro.json  |     5 +
 .../main/config/zookeeper/parsers/snort.json    |     5 +
 .../main/config/zookeeper/parsers/squid.json    |     9 +
 .../config/zookeeper/parsers/websphere.json     |    11 +
 .../src/main/config/zookeeper/parsers/yaf.json  |    12 +
 .../src/main/flux/bluecoat/remote.yaml          |    71 -
 .../src/main/flux/bluecoat/test.yaml            |    72 -
 .../src/main/flux/bro/remote.yaml               |    71 -
 .../metron-parsers/src/main/flux/bro/test.yaml  |    72 -
 .../src/main/flux/snort/remote.yaml             |    69 -
 .../src/main/flux/snort/test.yaml               |    69 -
 .../src/main/flux/squid/remote.yaml             |    78 -
 .../src/main/flux/squid/test.yaml               |    78 -
 .../src/main/flux/yaf/remote.yaml               |    84 -
 .../metron-parsers/src/main/flux/yaf/test.yaml  |    85 -
 .../org/apache/metron/parsers/GrokParser.java   |    81 +-
 .../metron/parsers/asa/GrokAsaParser.java       |     7 +-
 .../parsers/bluecoat/BasicBluecoatParser.java   |     8 +-
 .../apache/metron/parsers/bolt/ParserBolt.java  |     3 +-
 .../metron/parsers/bro/BasicBroParser.java      |     6 +
 .../parsers/fireeye/BasicFireEyeParser.java     |     6 +
 .../parsers/interfaces/MessageParser.java       |     5 +-
 .../metron/parsers/ise/BasicIseParser.java      |     6 +
 .../parsers/lancope/BasicLancopeParser.java     |     6 +
 .../parsers/logstash/BasicLogstashParser.java   |     6 +
 .../paloalto/BasicPaloAltoFirewallParser.java   |     5 +
 .../metron/parsers/snort/BasicSnortParser.java  |     5 +
 .../sourcefire/BasicSourcefireParser.java       |     6 +
 .../parsers/topology/ParserTopologyBuilder.java |    58 +
 .../parsers/topology/ParserTopologyCLI.java     |   127 +
 .../parsers/websphere/GrokWebSphereParser.java  |     4 -
 .../src/main/scripts/start_parser_topology.sh   |     2 +-
 .../apache/metron/parsers/GrokParserTest.java   |    26 +-
 .../metron/parsers/SampleGrokParserTest.java    |    11 +-
 .../apache/metron/parsers/SquidParserTest.java  |     7 +-
 .../apache/metron/parsers/YafParserTest.java    |    10 +-
 .../integration/BluecoatIntegrationTest.java    |    29 +-
 .../parsers/integration/BroIntegrationTest.java |    37 +
 .../integration/ParserIntegrationTest.java      |   104 +-
 .../parsers/integration/ParserValidation.java   |    26 +
 .../integration/SnortIntegrationTest.java       |    29 +-
 .../integration/SquidIntegrationTest.java       |    30 +-
 .../integration/WebSphereIntegrationTest.java   |    40 +-
 .../parsers/integration/YafIntegrationTest.java |    29 +-
 .../components/ParserTopologyComponent.java     |    84 +
 .../validation/SampleDataValidation.java        |    52 +
 .../websphere/GrokWebSphereParserTest.java      |    54 +-
 .../PcapTopologyIntegrationTest.java            |     2 +-
 .../org/apache/metron/pcap/PcapHelperTest.java  |     2 +-
 .../apache/metron/solr/writer/SolrWriter.java   |     5 +-
 .../SolrEnrichmentIntegrationTest.java          |     4 +-
 .../metron/solr/writer/SolrWriterTest.java      |     3 +-
 .../java/org/apache/metron/TestConstants.java   |    11 +-
 .../org/apache/metron/test/TestDataType.java    |    31 +
 .../test/bolt/BaseEnrichmentBoltTest.java       |    12 +-
 .../metron/test/utils/SampleDataUtils.java      |    41 +
 .../metron/test/utils/ValidationUtils.java      |    49 +
 metron-platform/pom.xml                         |     6 +-
 pom.xml                                         |     4 +-
 171 files changed, 3014 insertions(+), 26121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-deployment/playbooks/metron_install.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/playbooks/metron_install.yml b/metron-deployment/playbooks/metron_install.yml
index 0118213..7e73cbb 100644
--- a/metron-deployment/playbooks/metron_install.yml
+++ b/metron-deployment/playbooks/metron_install.yml
@@ -26,7 +26,6 @@
   become: true
   roles:
     - role: metron_common
-    - role: monit
   tags:
     - metron-prereqs
 
@@ -106,6 +105,8 @@
 - hosts: metron
   become: true
   roles:
-    - monit-start
+    - role: ambari_gather_facts
+    - role: monit
+    - role: monit-start
   tags:
     - start

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json b/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
deleted file mode 100644
index 0eb34b3..0000000
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
-  "index": "bro",
-  "batchSize": 5,
-  "enrichment" : {
-    "fieldMap": {
-      "geo": ["ip_dst_addr", "ip_src_addr"],
-      "host": ["host"]
-    }
-  },
-  "threatIntel": {
-    "fieldMap": {
-      "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
-    },
-    "fieldToTypeMap": {
-      "ip_src_addr" : ["malicious_ip"],
-      "ip_dst_addr" : ["malicious_ip"]
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json b/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
deleted file mode 100644
index 8a3eab1..0000000
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
+++ /dev/null
@@ -1,23 +0,0 @@
-{
-  "index": "pcap",
-  "batchSize": 5,
-  "enrichment" : {
-    "fieldMap":
-      {
-      "geo": ["ip_src_addr", "ip_dst_addr"],
-      "host": ["ip_src_addr", "ip_dst_addr"]
-    }
-  },
-  "threatIntel" : {
-    "fieldMap":
-      {
-      "hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
-    },
-    "fieldToTypeMap":
-      {
-      "ip_dst_addr" : [ "malicious_ip" ]
-    ,"ip_src_addr" : [ "malicious_ip" ]
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json b/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
deleted file mode 100644
index 9dfc80e..0000000
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
-  "index": "snort",
-  "batchSize": 1,
-  "enrichment" : {
-    "fieldMap":
-      {
-      "geo": ["ip_dst_addr", "ip_src_addr"],
-      "host": ["host"]
-    }
-  },
-  "threatIntel" : {
-    "fieldMap":
-      {
-      "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
-    },
-    "fieldToTypeMap":
-      {
-      "ip_src_addr" : ["malicious_ip"],
-      "ip_dst_addr" : ["malicious_ip"]
-    },
-    "triageConfig" : {
-      "riskLevelRules" : {
-        "not(IN_SUBNET(ip_dst_addr, '192.168.0.0/24'))" : 10
-      },
-      "aggregator" : "MAX"
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json b/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
deleted file mode 100644
index 4e67748..0000000
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
-  "index": "yaf",
-  "batchSize": 5,
-  "enrichment" : {
-    "fieldMap":
-      {
-      "geo": ["ip_dst_addr", "ip_src_addr"],
-      "host": ["host"]
-    }
-  },
-  "threatIntel": {
-    "fieldMap":
-      {
-      "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
-    },
-    "fieldToTypeMap":
-      {
-      "ip_src_addr" : ["malicious_ip"],
-      "ip_dst_addr" : ["malicious_ip"]
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-deployment/roles/metron_streaming/tasks/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/tasks/main.yml b/metron-deployment/roles/metron_streaming/tasks/main.yml
index d44f2ef..c960dba 100644
--- a/metron-deployment/roles/metron_streaming/tasks/main.yml
+++ b/metron-deployment/roles/metron_streaming/tasks/main.yml
@@ -22,6 +22,12 @@
       - { name: 'bin'}
       - { name: 'config'}
 
+- name: Create Source Config Directory
+  file:
+    path: "{{ zookeeper_config_path }}"
+    state: directory
+    mode: 0755
+
 - include: copy_bundles.yml
 
 - name: Get Default mysql passowrd

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-deployment/roles/metron_streaming/tasks/source_config.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/tasks/source_config.yml b/metron-deployment/roles/metron_streaming/tasks/source_config.yml
index 21292b1..1c967bd 100644
--- a/metron-deployment/roles/metron_streaming/tasks/source_config.yml
+++ b/metron-deployment/roles/metron_streaming/tasks/source_config.yml
@@ -15,10 +15,6 @@
 #  limitations under the License.
 #
 ---
-- name: Create Source Config Directory
-  file:
-    path: "{{ zookeeper_config_path }}"
-    state: directory
 
 - name: Copy Elasticsearch Global Config File
   template:
@@ -34,14 +30,6 @@
     mode: 0644
   when: install_solr | default(False) == True
 
-- name: Copy Sensor Config Files
-  copy:
-    src: "{{ item }}"
-    dest: "{{ zookeeper_config_path }}"
-    mode: 0644
-  with_items:
-    - ../roles/metron_streaming/files/config/
-
 - name: Load Config
   shell: "{{ metron_directory }}/bin/zk_load_configs.sh --mode PUSH -i {{ zookeeper_config_path }} -z {{ zookeeper_url }} && touch {{ zookeeper_config_path }}/configured"
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-deployment/roles/metron_streaming/tasks/topologies.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/tasks/topologies.yml b/metron-deployment/roles/metron_streaming/tasks/topologies.yml
index d7fdb0c..17bcfbb 100644
--- a/metron-deployment/roles/metron_streaming/tasks/topologies.yml
+++ b/metron-deployment/roles/metron_streaming/tasks/topologies.yml
@@ -25,15 +25,6 @@
     - { regexp: "kafka.pcap.out=", line: "kafka.pcap.out={{ pcap_hdfs_path }}" }
     - { regexp: "spout.kafka.topic.pcap=", line: "spout.kafka.topic.pcap={{ pycapa_topic }}" }
 
-- name: Configure Metron Parser Topologies
-  lineinfile:
-    dest: "{{ metron_parsers_properties_config_path }}"
-    regexp: "{{ item.regexp }}"
-    line: "{{ item.line }}"
-  with_items:
-    - { regexp: "kafka.zk=", line: "kafka.zk={{ zookeeper_url }}" }
-    - { regexp: "kafka.broker=", line: "kafka.broker={{ kafka_broker_url }}" }
-
 - name: Configure Metron Solr topology
   lineinfile: >
     dest={{ metron_solr_properties_config_path }}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-deployment/roles/monit/templates/scripts/start_topology.sh
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/monit/templates/scripts/start_topology.sh b/metron-deployment/roles/monit/templates/scripts/start_topology.sh
index a8a60fa..5e5286d 100644
--- a/metron-deployment/roles/monit/templates/scripts/start_topology.sh
+++ b/metron-deployment/roles/monit/templates/scripts/start_topology.sh
@@ -22,4 +22,4 @@
 #
 export METRON_VERSION={{ metron_version }}
 export METRON_HOME={{ metron_directory }}
-$METRON_HOME/bin/start_parser_topology.sh {{ item }}
+$METRON_HOME/bin/start_parser_topology.sh -k {{ kafka_broker_url }} -z {{ zookeeper_url }} -s {{ item }}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
index 3c0a77b..4ecb3a4 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java
@@ -27,7 +27,6 @@ public class ConfigurationUtilTest {
 
   @Test
   public void test_getPcapOutputPath() {
-    Assert.assertEquals(ConfigurationUtil.getPcapOutputPath(), null);
     ConfigurationUtil.setPcapOutputPath("/foo");
     Assert.assertEquals(ConfigurationUtil.getPcapOutputPath(), "/foo");
   }
@@ -37,7 +36,6 @@ public class ConfigurationUtilTest {
    */
   @Test
   public void test_getTempQueryDir() {
-    Assert.assertEquals(ConfigurationUtil.getTempQueryOutputPath(), null);
     ConfigurationUtil.setTempQueryOutputPath("/tmp");
     Assert.assertEquals(ConfigurationUtil.getTempQueryOutputPath(), "/tmp");
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 4c7c222..60a5b51 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -19,12 +19,8 @@ package org.apache.metron.common;
 
 public class Constants {
 
-  public static final String GLOBAL_CONFIG_NAME = "global";
-  public static final String SENSORS_CONFIG_NAME = "sensors";
   public static final String ZOOKEEPER_ROOT = "/metron";
   public static final String ZOOKEEPER_TOPOLOGY_ROOT = ZOOKEEPER_ROOT + "/topology";
-  public static final String ZOOKEEPER_GLOBAL_ROOT = ZOOKEEPER_TOPOLOGY_ROOT + "/" + GLOBAL_CONFIG_NAME;
-  public static final String ZOOKEEPER_SENSOR_ROOT = ZOOKEEPER_TOPOLOGY_ROOT + "/" + SENSORS_CONFIG_NAME;
   public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000;
   public static final String SENSOR_TYPE = "source.type";
   public static final String ENRICHMENT_TOPIC = "enrichments";

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
index 1364305..2d5e241 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -29,8 +29,7 @@ import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.log4j.Logger;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ConfigurationType;
 
 import java.io.IOException;
 import java.util.Map;
@@ -41,7 +40,6 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
 
   private String zookeeperUrl;
 
-  protected final Configurations configurations = new Configurations();
   protected CuratorFramework client;
   protected TreeCache cache;
 
@@ -49,10 +47,6 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
     this.zookeeperUrl = zookeeperUrl;
   }
 
-  public Configurations getConfigurations() {
-    return configurations;
-  }
-
   public void setCuratorFramework(CuratorFramework client) {
     this.client = client;
   }
@@ -61,7 +55,7 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
     this.cache = cache;
   }
 
-  public void reloadCallback(String name, Configurations.Type type) {
+  public void reloadCallback(String name, ConfigurationType type) {
   }
 
   @Override
@@ -85,11 +79,7 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
           }
         };
         cache.getListenable().addListener(listener);
-        try {
-          ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
-        } catch (Exception e) {
-          LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
-        }
+        loadConfig();
       }
       cache.start();
     } catch (Exception e) {
@@ -98,23 +88,8 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
     }
   }
 
-  public void updateConfig(String path, byte[] data) throws IOException {
-    if (data.length != 0) {
-      String name = path.substring(path.lastIndexOf("/") + 1);
-      Configurations.Type type;
-      if (path.startsWith(Constants.ZOOKEEPER_SENSOR_ROOT)) {
-        configurations.updateSensorEnrichmentConfig(name, data);
-        type = Configurations.Type.SENSOR;
-      } else if (Constants.ZOOKEEPER_GLOBAL_ROOT.equals(path)) {
-        configurations.updateGlobalConfig(data);
-        type = Configurations.Type.GLOBAL;
-      } else {
-        configurations.updateConfig(name, data);
-        type = Configurations.Type.OTHER;
-      }
-      reloadCallback(name, type);
-    }
-  }
+  abstract public void loadConfig();
+  abstract public void updateConfig(String path, byte[] data) throws IOException;
 
   @Override
   public void cleanup() {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
new file mode 100644
index 0000000..e03e793
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.bolt;
+
+import org.apache.log4j.Logger;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+
+import java.io.IOException;
+
+public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt {
+
+  private static final Logger LOG = Logger.getLogger(ConfiguredEnrichmentBolt.class);
+
+  protected final EnrichmentConfigurations configurations = new EnrichmentConfigurations();
+
+  public ConfiguredEnrichmentBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
+  public EnrichmentConfigurations getConfigurations() {
+    return configurations;
+  }
+
+  @Override
+  public void loadConfig() {
+    try {
+      ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(configurations, client);
+    } catch (Exception e) {
+      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
+    }
+  }
+
+  @Override
+  public void updateConfig(String path, byte[] data) throws IOException {
+    if (data.length != 0) {
+      String name = path.substring(path.lastIndexOf("/") + 1);
+      if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
+        configurations.updateSensorEnrichmentConfig(name, data);
+        reloadCallback(name, ConfigurationType.ENRICHMENT);
+      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
+        configurations.updateGlobalConfig(data);
+        reloadCallback(name, ConfigurationType.GLOBAL);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
new file mode 100644
index 0000000..543f87b
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.bolt;
+
+import org.apache.log4j.Logger;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ParserConfigurations;
+
+import java.io.IOException;
+
+public abstract class ConfiguredParserBolt extends ConfiguredBolt {
+
+  private static final Logger LOG = Logger.getLogger(ConfiguredEnrichmentBolt.class);
+
+  protected final ParserConfigurations configurations = new ParserConfigurations();
+
+  public ConfiguredParserBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
+  public ParserConfigurations getConfigurations() {
+    return configurations;
+  }
+
+  @Override
+  public void loadConfig() {
+    try {
+      ConfigurationsUtils.updateParserConfigsFromZookeeper(configurations, client);
+    } catch (Exception e) {
+      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
+    }
+  }
+
+  @Override
+  public void updateConfig(String path, byte[] data) throws IOException {
+    if (data.length != 0) {
+      String name = path.substring(path.lastIndexOf("/") + 1);
+      if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
+        configurations.updateSensorParserConfig(name, data);
+        reloadCallback(name, ConfigurationType.PARSER);
+      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
+        configurations.updateGlobalConfig(data);
+        reloadCallback(name, ConfigurationType.GLOBAL);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
index e526ee4..ebb46e4 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
@@ -48,10 +48,6 @@ public class Configuration extends Configurations {
         } else {
 
             updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(configFileRoot.toAbsolutePath().toString()));
-            Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(configFileRoot.toAbsolutePath().toString());
-            for(String sensorType: sensorEnrichmentConfigs.keySet()) {
-                updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
-            }
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
index 2b9f6cf..9469bf6 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
@@ -28,18 +28,27 @@ import java.io.IOException;
 import java.util.Map;
 
 public enum ConfigurationType implements Function<String, Object> {
-  GLOBAL("."
-        ,Constants.ZOOKEEPER_GLOBAL_ROOT
-        , s -> {
+  GLOBAL("global"
+          ,"."
+          , s -> {
     try {
       return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() {
       });
     } catch (IOException e) {
       throw new RuntimeException("Unable to load " + s, e);
     }
-  })
-  , SENSOR(Constants.SENSORS_CONFIG_NAME
-          ,Constants.ZOOKEEPER_SENSOR_ROOT
+  }),
+  PARSER("parsers"
+          ,"parsers"
+          , s -> {
+    try {
+      return JSONUtils.INSTANCE.load(s, SensorParserConfig.class);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to load " + s, e);
+    }
+  }),
+  ENRICHMENT("enrichments"
+          ,"enrichments"
           , s -> {
     try {
       return JSONUtils.INSTANCE.load(s, SensorEnrichmentConfig.class);
@@ -47,15 +56,19 @@ public enum ConfigurationType implements Function<String, Object> {
       throw new RuntimeException("Unable to load " + s, e);
     }
   });
+  String name;
   String directory;
   String zookeeperRoot;
   Function<String,?> deserializer;
-  ConfigurationType(String directory, String zookeeperRoot, Function<String, ?> deserializer) {
+  ConfigurationType(String name, String directory, Function<String, ?> deserializer) {
+    this.name = name;
     this.directory = directory;
-    this.zookeeperRoot = zookeeperRoot;
+    this.zookeeperRoot = Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name;
     this.deserializer = deserializer;
   }
 
+  public String getName() { return name; }
+
   public String getDirectory() {
     return directory;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
index a152d40..f33ebd7 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
@@ -19,7 +19,6 @@ package org.apache.metron.common.configuration;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.log4j.Logger;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.ByteArrayInputStream;
@@ -34,20 +33,15 @@ public class Configurations implements Serializable {
 
   private static final Logger LOG = Logger.getLogger(Configurations.class);
 
-  public enum Type {
-    GLOBAL, SENSOR, OTHER
-  }
-
-  public static final String GLOBAL_CONFIG_NAME = "global";
-
-  private ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
+  protected ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
 
   @SuppressWarnings("unchecked")
   public Map<String, Object> getGlobalConfig() {
-    return (Map<String, Object>) configurations.get(GLOBAL_CONFIG_NAME);
+    return (Map<String, Object>) configurations.get(ConfigurationType.GLOBAL.getName());
   }
 
   public void updateGlobalConfig(byte[] data) throws IOException {
+    if (data == null) throw new IllegalStateException("global config data cannot be null");
     updateGlobalConfig(new ByteArrayInputStream(data));
   }
 
@@ -58,39 +52,7 @@ public class Configurations implements Serializable {
   }
 
   public void updateGlobalConfig(Map<String, Object> globalConfig) {
-    configurations.put(GLOBAL_CONFIG_NAME, globalConfig);
-  }
-
-  public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
-    return (SensorEnrichmentConfig) configurations.get(sensorType);
-  }
-
-  public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException {
-    updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
-  }
-
-  public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException {
-    SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
-    updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
-  }
-
-  public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) {
-    configurations.put(sensorType, sensorEnrichmentConfig);
-  }
-
-  @SuppressWarnings("unchecked")
-  public Map<String, Object> getConfig(String name) {
-    return (Map<String, Object>) configurations.get(name);
-  }
-
-  public void updateConfig(String name, byte[] data) throws IOException {
-    if (data == null) throw new IllegalStateException("config data cannot be null");
-    Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>() {});
-    updateConfig(name, config);
-  }
-
-  public void updateConfig(String name, Map<String, Object> config) {
-    configurations.put(name, config);
+    configurations.put(ConfigurationType.GLOBAL.getName(), globalConfig);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
index 1aa2ca8..56ac9d9 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -27,11 +27,11 @@ import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.zookeeper.KeeperException;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -69,6 +69,25 @@ public class ConfigurationsUtils {
     writeToZookeeper(ConfigurationType.GLOBAL.getZookeeperRoot(), globalConfig, client);
   }
 
+  public static void writeSensorParserConfigToZookeeper(String sensorType, SensorParserConfig sensorParserConfig, String zookeeperUrl) throws Exception {
+    writeSensorParserConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorParserConfig), zookeeperUrl);
+  }
+
+  public static void writeSensorParserConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      writeSensorParserConfigToZookeeper(sensorType, configData, client);
+    }
+    finally {
+      client.close();
+    }
+  }
+
+  public static void writeSensorParserConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
+    writeToZookeeper(ConfigurationType.PARSER.getZookeeperRoot() + "/" + sensorType, configData, client);
+  }
+
   public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig, String zookeeperUrl) throws Exception {
     writeSensorEnrichmentConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorEnrichmentConfig), zookeeperUrl);
   }
@@ -85,8 +104,7 @@ public class ConfigurationsUtils {
   }
 
   public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
-    ConfigurationType.SENSOR.deserialize(new String(configData));
-    writeToZookeeper(ConfigurationType.SENSOR.getZookeeperRoot()+ "/" + sensorType, configData, client);
+    writeToZookeeper(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/" + sensorType, configData, client);
   }
 
   public static void writeConfigToZookeeper(String name, Map<String, Object> config, String zookeeperUrl) throws Exception {
@@ -114,18 +132,42 @@ public class ConfigurationsUtils {
 
   public static void updateConfigsFromZookeeper(Configurations configurations, CuratorFramework client) throws Exception {
     configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
-    List<String> sensorTypes = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
+  }
+
+  public static void updateParserConfigsFromZookeeper(ParserConfigurations configurations, CuratorFramework client) throws Exception {
+    updateConfigsFromZookeeper(configurations, client);
+    List<String> sensorTypes = client.getChildren().forPath(ConfigurationType.PARSER.getZookeeperRoot());
+    for(String sensorType: sensorTypes) {
+      configurations.updateSensorParserConfig(sensorType, readSensorParserConfigBytesFromZookeeper(sensorType, client));
+    }
+  }
+
+  public static void updateEnrichmentConfigsFromZookeeper(EnrichmentConfigurations configurations, CuratorFramework client) throws Exception {
+    updateConfigsFromZookeeper(configurations, client);
+    List<String> sensorTypes = client.getChildren().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot());
     for(String sensorType: sensorTypes) {
       configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
     }
   }
 
+  public static SensorEnrichmentConfig readSensorEnrichmentConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client)), SensorEnrichmentConfig.class);
+  }
+
+  public static SensorParserConfig readSensorParserConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(ConfigurationType.PARSER.getZookeeperRoot() + "/" + sensorType, client)), SensorParserConfig.class);
+  }
+
   public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
-    return readFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
+    return readFromZookeeper(ConfigurationType.GLOBAL.getZookeeperRoot(), client);
+  }
+
+  public static byte[] readSensorParserConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+    return readFromZookeeper(ConfigurationType.PARSER.getZookeeperRoot() + "/" + sensorType, client);
   }
 
   public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
-    return readFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+    return readFromZookeeper(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client);
   }
 
   public static byte[] readConfigBytesFromZookeeper(String name, CuratorFramework client) throws Exception {
@@ -136,60 +178,99 @@ public class ConfigurationsUtils {
     return client.getData().forPath(path);
   }
 
+  public static void uploadConfigsToZookeeper(String globalConfigPath, String parsersConfigPath, String enrichmentsConfigPath, String zookeeperUrl) throws Exception {
+    try(CuratorFramework client = getClient(zookeeperUrl)) {
+      client.start();
+      uploadConfigsToZookeeper(globalConfigPath, parsersConfigPath, enrichmentsConfigPath, client);
+    }
+  }
+
   public static void uploadConfigsToZookeeper(String rootFilePath, CuratorFramework client) throws Exception {
-    ConfigurationsUtils.writeGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), client);
-    Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
-    for(String sensorType: sensorEnrichmentConfigs.keySet()) {
-      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), client);
+    uploadConfigsToZookeeper(rootFilePath, rootFilePath, rootFilePath, client);
+  }
+
+  public static void uploadConfigsToZookeeper(String globalConfigPath, String parsersConfigPath, String enrichmentsConfigPath, CuratorFramework client) throws Exception {
+    if (globalConfigPath != null) {
+      byte[] globalConfig = readGlobalConfigFromFile(globalConfigPath);
+      if (globalConfig.length > 0) {
+        ConfigurationsUtils.writeGlobalConfigToZookeeper(readGlobalConfigFromFile(globalConfigPath), client);
+      }
+    }
+    if (parsersConfigPath != null) {
+      Map<String, byte[]> sensorParserConfigs = readSensorParserConfigsFromFile(parsersConfigPath);
+      for (String sensorType : sensorParserConfigs.keySet()) {
+        ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorType, sensorParserConfigs.get(sensorType), client);
+      }
+    }
+    if (enrichmentsConfigPath != null) {
+      Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(enrichmentsConfigPath);
+      for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+        ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), client);
+      }
     }
   }
 
-  public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
-    try(CuratorFramework client = getClient(zookeeperUrl)) {
-      client.start();
-      uploadConfigsToZookeeper(rootFilePath, client);
+  public static byte[] readGlobalConfigFromFile(String rootPath) throws IOException {
+    byte[] globalConfig = new byte[0];
+    File configPath = new File(rootPath, ConfigurationType.GLOBAL.getName() + ".json");
+    if (configPath.exists()) {
+      globalConfig = Files.readAllBytes(configPath.toPath());
     }
+    return globalConfig;
   }
 
-  public static byte[] readGlobalConfigFromFile(String rootFilePath) throws IOException {
-    return Files.readAllBytes(Paths.get(rootFilePath, Constants.GLOBAL_CONFIG_NAME + ".json"));
+  public static Map<String, byte[]> readSensorParserConfigsFromFile(String rootPath) throws IOException {
+    return readSensorConfigsFromFile(rootPath, ConfigurationType.PARSER);
   }
 
   public static Map<String, byte[]> readSensorEnrichmentConfigsFromFile(String rootPath) throws IOException {
-    Map<String, byte[]> sensorEnrichmentConfigs = new HashMap<>();
-    for(File file: new File(rootPath, Constants.SENSORS_CONFIG_NAME).listFiles()) {
-      if(file.getName().endsWith(".json")) {
-        sensorEnrichmentConfigs.put(FilenameUtils.removeExtension(file.getName()), Files.readAllBytes(file.toPath()));
+    return readSensorConfigsFromFile(rootPath, ConfigurationType.ENRICHMENT);
+  }
+
+  public static Map<String, byte[]> readSensorConfigsFromFile(String rootPath, ConfigurationType configType) throws IOException {
+    Map<String, byte[]> sensorConfigs = new HashMap<>();
+    File configPath = new File(rootPath, configType.getDirectory());
+    if (configPath.exists()) {
+      File[] children = configPath.listFiles();
+      if (children != null) {
+        for (File file : children) {
+          sensorConfigs.put(FilenameUtils.removeExtension(file.getName()), Files.readAllBytes(file.toPath()));
+        }
       }
     }
-    return sensorEnrichmentConfigs;
+    return sensorConfigs;
   }
 
+
   public interface ConfigurationVisitor{
     void visit(ConfigurationType configurationType, String name, String data);
   }
+
   public static void visitConfigs(CuratorFramework client, ConfigurationVisitor callback) throws Exception {
-    //Output global configs
-    {
-      ConfigurationType configType = ConfigurationType.GLOBAL;
-      byte[] globalConfigData = client.getData().forPath(configType.getZookeeperRoot());
-      callback.visit(configType, "global", new String(globalConfigData));
-    }
-    //Output sensor specific configs
-    {
-      ConfigurationType configType = ConfigurationType.SENSOR;
-      List<String> children = client.getChildren().forPath(configType.getZookeeperRoot());
-      for (String child : children) {
-        byte[] data = client.getData().forPath(configType.getZookeeperRoot() + "/" + child);
-        callback.visit(configType, child, new String(data));
+    visitConfigs(client, callback, ConfigurationType.GLOBAL);
+    visitConfigs(client, callback, ConfigurationType.PARSER);
+    visitConfigs(client, callback, ConfigurationType.ENRICHMENT);
+  }
+
+  public static void visitConfigs(CuratorFramework client, ConfigurationVisitor callback, ConfigurationType configType) throws Exception {
+    if (client.checkExists().forPath(configType.getZookeeperRoot()) != null) {
+      if (configType.equals(ConfigurationType.GLOBAL)) {
+        byte[] globalConfigData = client.getData().forPath(configType.getZookeeperRoot());
+        callback.visit(configType, "global", new String(globalConfigData));
+      } else if (configType.equals(ConfigurationType.PARSER) || configType.equals(ConfigurationType.ENRICHMENT)) {
+        List<String> children = client.getChildren().forPath(configType.getZookeeperRoot());
+        for (String child : children) {
+          byte[] data = client.getData().forPath(configType.getZookeeperRoot() + "/" + child);
+          callback.visit(configType, child, new String(data));
+        }
       }
     }
   }
+
   public static void dumpConfigs(PrintStream out, CuratorFramework client) throws Exception {
     ConfigurationsUtils.visitConfigs(client, (type, name, data) -> {
       type.deserialize(data);
       out.println(type + " Config: " + name + "\n" + data);
     });
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
new file mode 100644
index 0000000..bf5b856
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class EnrichmentConfigurations extends Configurations {
+
+    public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
+        return (SensorEnrichmentConfig) configurations.get(getKey(sensorType));
+    }
+
+    public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException {
+        updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
+    }
+
+    public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException {
+        SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
+        updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
+    }
+
+    public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) {
+        configurations.put(getKey(sensorType), sensorEnrichmentConfig);
+    }
+
+    private String getKey(String sensorType) {
+        return ConfigurationType.ENRICHMENT.getName() + "." + sensorType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
new file mode 100644
index 0000000..72630d9
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ParserConfigurations extends Configurations {
+
+  public SensorParserConfig getSensorParserConfig(String sensorType) {
+    return (SensorParserConfig) configurations.get(getKey(sensorType));
+  }
+
+  public void updateSensorParserConfig(String sensorType, byte[] data) throws IOException {
+    updateSensorParserConfig(sensorType, new ByteArrayInputStream(data));
+  }
+
+  public void updateSensorParserConfig(String sensorType, InputStream io) throws IOException {
+    SensorParserConfig sensorParserConfig = JSONUtils.INSTANCE.load(io, SensorParserConfig.class);
+    updateSensorParserConfig(sensorType, sensorParserConfig);
+  }
+
+  public void updateSensorParserConfig(String sensorType, SensorParserConfig sensorParserConfig) {
+    configurations.put(getKey(sensorType), sensorParserConfig);
+  }
+
+  private String getKey(String sensorType) {
+    return ConfigurationType.PARSER.getName() + "." + sensorType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
new file mode 100644
index 0000000..8cf1901
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class SensorParserConfig {
+
+  private String parserClassName;
+  private String sensorTopic;
+  private Map<String, Object> parserConfig;
+
+  public String getParserClassName() {
+    return parserClassName;
+  }
+
+  public void setParserClassName(String parserClassName) {
+    this.parserClassName = parserClassName;
+  }
+
+  public String getSensorTopic() {
+    return sensorTopic;
+  }
+
+  public void setSensorTopic(String sensorTopic) {
+    this.sensorTopic = sensorTopic;
+  }
+
+  public Map<String, Object> getParserConfig() {
+    return parserConfig;
+  }
+
+  public void setParserConfig(Map<String, Object> parserConfig) {
+    this.parserConfig = parserConfig;
+  }
+
+  public static SensorParserConfig fromBytes(byte[] config) throws IOException {
+    return JSONUtils.INSTANCE.load(new String(config), SensorParserConfig.class);
+  }
+
+  public String toJSON() throws JsonProcessingException {
+    return JSONUtils.INSTANCE.toJSON(this, true);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    SensorParserConfig that = (SensorParserConfig) o;
+
+    if (getParserClassName() != null ? !getParserClassName().equals(that.getParserClassName()) : that.getParserClassName() != null) return false;
+    if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null) return false;
+    return getParserConfig() != null ? getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() == null;
+  }
+
+  @Override
+  public String toString() {
+    return "{parserClassName=" + parserClassName + ", sensorTopic=" + sensorTopic +
+            ", parserConfig=" + parserConfig + "}";
+  }
+
+  @Override
+  public int hashCode() {
+    int result = getParserClassName() != null ? getParserClassName().hashCode() : 0;
+    result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
+    result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
index 6fb3d78..aaa6c51 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
@@ -18,14 +18,14 @@
 package org.apache.metron.common.interfaces;
 
 import backtype.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 
 import java.util.List;
 import java.util.Map;
 
 public interface BulkMessageWriter<T> extends AutoCloseable {
 
-  void init(Map stormConf, Configurations configuration) throws Exception;
-  void write(String sensorType, Configurations configurations, List<Tuple> tuples, List<T> messages) throws Exception;
+  void init(Map stormConf, EnrichmentConfigurations configuration) throws Exception;
+  void write(String sensorType, EnrichmentConfigurations configurations, List<Tuple> tuples, List<T> messages) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java
index 97085b6..8f0a6af 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java
@@ -21,7 +21,7 @@ package org.apache.metron.common.spout.kafka;
 import storm.kafka.BrokerHosts;
 
 public class SpoutConfig extends storm.kafka.SpoutConfig {
-  private static enum Offset {
+  public static enum Offset {
     BEGINNING, END, WHERE_I_LEFT_OFF;
   }
   public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
@@ -40,7 +40,8 @@ public class SpoutConfig extends storm.kafka.SpoutConfig {
     startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
     return this;
   }
-  private void from(Offset offset) {
+
+  public SpoutConfig from(Offset offset) {
     if(offset == Offset.BEGINNING) {
       ignoreZkOffsets = true;
       startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
@@ -53,5 +54,6 @@ public class SpoutConfig extends storm.kafka.SpoutConfig {
       ignoreZkOffsets = false;
       startOffsetTime = kafka.api.OffsetRequest.LatestTime();
     }
+    return this;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
index 2afa097..141221d 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
@@ -27,20 +27,26 @@ public class ReflectionUtils<T> {
       return defaultClass;
     }
     else {
-      try {
-        Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
-        instance = clazz.getConstructor().newInstance();
-      } catch (InstantiationException e) {
-        throw new IllegalStateException("Unable to instantiate connector.", e);
-      } catch (IllegalAccessException e) {
-        throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
-      } catch (InvocationTargetException e) {
-        throw new IllegalStateException("Unable to instantiate connector", e);
-      } catch (NoSuchMethodException e) {
-        throw new IllegalStateException("Unable to instantiate connector: no such method", e);
-      } catch (ClassNotFoundException e) {
-        throw new IllegalStateException("Unable to instantiate connector: class not found", e);
-      }
+      instance = createInstance(className);
+    }
+    return instance;
+  }
+
+  public static <T> T createInstance(String className) {
+    T instance;
+    try {
+      Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
+      instance = clazz.getConstructor().newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalStateException("Unable to instantiate connector.", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalStateException("Unable to instantiate connector", e);
+    } catch (NoSuchMethodException e) {
+      throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("Unable to instantiate connector: class not found", e);
     }
     return instance;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/BaseConfiguredBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/BaseConfiguredBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/BaseConfiguredBoltTest.java
new file mode 100644
index 0000000..f9901cd
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/BaseConfiguredBoltTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.bolt;
+
+import org.apache.metron.test.bolt.BaseBoltTest;
+import org.junit.Assert;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class BaseConfiguredBoltTest extends BaseBoltTest {
+
+  protected static Set<String> configsUpdated = new HashSet<>();
+
+  protected void waitForConfigUpdate(final String expectedConfigUpdate) {
+    waitForConfigUpdate(new HashSet<String>() {{ add(expectedConfigUpdate); }});
+  }
+
+  protected void waitForConfigUpdate(Set<String> expectedConfigUpdates) {
+    int count = 0;
+    while (!configsUpdated.equals(expectedConfigUpdates)) {
+      if (count++ > 5) {
+        Assert.fail("ConfiguredBolt was not updated in time");
+        return;
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java
deleted file mode 100644
index 6df930b..0000000
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.bolt;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import org.apache.curator.test.TestingServer;
-import org.apache.metron.common.Constants;
-import org.apache.metron.TestConstants;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class ConfiguredBoltTest extends BaseEnrichmentBoltTest {
-  private static Set<String> configsUpdated = new HashSet<>();
-  private Set<String> allConfigurationTypes = new HashSet<>();
-  private String zookeeperUrl;
-
-  public static class StandAloneConfiguredBolt extends ConfiguredBolt {
-
-    public StandAloneConfiguredBolt(String zookeeperUrl) {
-      super(zookeeperUrl);
-    }
-
-    @Override
-    public void execute(Tuple input) {
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    }
-
-    @Override
-    public void reloadCallback(String name, Configurations.Type type) {
-      configsUpdated.add(name);
-    }
-  }
-
-  @Before
-  public void setupConfiguration() throws Exception {
-    TestingServer testZkServer = new TestingServer(true);
-    this.zookeeperUrl = testZkServer.getConnectString();
-    byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH);
-    ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, zookeeperUrl);
-    allConfigurationTypes.add(Constants.GLOBAL_CONFIG_NAME);
-    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
-    for (String sensorType : sensorEnrichmentConfigs.keySet()) {
-      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
-      allConfigurationTypes.add(sensorType);
-    }
-  }
-
-  @Test
-  public void test() throws Exception {
-    Configurations sampleConfigurations = new Configurations();
-    try {
-      StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(null);
-      configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
-      Assert.fail("A valid zookeeper url must be supplied");
-    } catch (RuntimeException e){}
-
-    configsUpdated = new HashSet<>();
-    sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
-    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
-    for (String sensorType : sensorEnrichmentConfigs.keySet()) {
-      sampleConfigurations.updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
-    }
-
-    StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(zookeeperUrl);
-    configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    waitForConfigUpdate(allConfigurationTypes);
-    Assert.assertEquals(sampleConfigurations, configuredBolt.configurations);
-
-    configsUpdated = new HashSet<>();
-    Map<String, Object> sampleGlobalConfig = sampleConfigurations.getGlobalConfig();
-    sampleGlobalConfig.put("newGlobalField", "newGlobalValue");
-    ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
-    waitForConfigUpdate(Constants.GLOBAL_CONFIG_NAME);
-    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.configurations.getGlobalConfig());
-
-    configsUpdated = new HashSet<>();
-    sampleGlobalConfig.remove("newGlobalField");
-    ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
-    waitForConfigUpdate(Constants.GLOBAL_CONFIG_NAME);
-    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.configurations);
-
-    configsUpdated = new HashSet<>();
-    String sensorType = "testSensorConfig";
-    SensorEnrichmentConfig testSensorConfig = new SensorEnrichmentConfig();
-    testSensorConfig.setBatchSize(50);
-    testSensorConfig.setIndex("test");
-    Map<String, List<String>> enrichmentFieldMap = new HashMap<>();
-    enrichmentFieldMap.put("enrichmentTest", new ArrayList<String>() {{
-      add("enrichmentField");
-    }});
-    testSensorConfig.getEnrichment().setFieldMap(enrichmentFieldMap);
-    Map<String, List<String>> threatIntelFieldMap = new HashMap<>();
-    threatIntelFieldMap.put("threatIntelTest", new ArrayList<String>() {{
-      add("threatIntelField");
-    }});
-    testSensorConfig.getThreatIntel().setFieldMap(threatIntelFieldMap);
-    sampleConfigurations.updateSensorEnrichmentConfig(sensorType, testSensorConfig);
-    ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
-    waitForConfigUpdate(sensorType);
-    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.configurations);
-
-    configsUpdated = new HashSet<>();
-    String someConfigType = "someConfig";
-    Map<String, Object> someConfig = new HashMap<>();
-    someConfig.put("someField", "someValue");
-    sampleConfigurations.updateConfig(someConfigType, someConfig);
-    ConfigurationsUtils.writeConfigToZookeeper(someConfigType, someConfig, zookeeperUrl);
-    waitForConfigUpdate(someConfigType);
-    Assert.assertEquals("Add new misc config", sampleConfigurations, configuredBolt.configurations);
-    configuredBolt.cleanup();
-  }
-
-  private void waitForConfigUpdate(final String expectedConfigUpdate) {
-    waitForConfigUpdate(new HashSet<String>() {{ add(expectedConfigUpdate); }});
-  }
-
-  private void waitForConfigUpdate(Set<String> expectedConfigUpdates) {
-    int count = 0;
-    while (!configsUpdated.equals(expectedConfigUpdates)) {
-      if (count++ > 5) {
-        Assert.fail("ConfiguredBolt was not updated in time");
-        return;
-      }
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/df8d682e/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
new file mode 100644
index 0000000..c5f2304
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.curator.test.TestingServer;
+import org.apache.metron.common.Constants;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.configuration.*;
+import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConfiguredEnrichmentBoltTest extends BaseConfiguredBoltTest {
+
+  private Set<String> enrichmentConfigurationTypes = new HashSet<>();
+  private String zookeeperUrl;
+
+  public static class StandAloneConfiguredEnrichmentBolt extends ConfiguredEnrichmentBolt {
+
+    public StandAloneConfiguredEnrichmentBolt(String zookeeperUrl) {
+      super(zookeeperUrl);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+
+    @Override
+    public void reloadCallback(String name, ConfigurationType type) {
+      configsUpdated.add(name);
+    }
+  }
+
+  @Before
+  public void setupConfiguration() throws Exception {
+    TestingServer testZkServer = new TestingServer(true);
+    this.zookeeperUrl = testZkServer.getConnectString();
+    byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, zookeeperUrl);
+    enrichmentConfigurationTypes.add(ConfigurationType.GLOBAL.getName());
+    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.ENRICHMENTS_CONFIGS_PATH);
+    for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+      enrichmentConfigurationTypes.add(sensorType);
+    }
+    Map<String, byte[]> sensorParserConfigs = ConfigurationsUtils.readSensorParserConfigsFromFile(TestConstants.PARSER_CONFIGS_PATH);
+    for (String sensorType : sensorParserConfigs.keySet()) {
+      ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorType, sensorParserConfigs.get(sensorType), zookeeperUrl);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    EnrichmentConfigurations sampleConfigurations = new EnrichmentConfigurations();
+    try {
+      StandAloneConfiguredEnrichmentBolt configuredBolt = new StandAloneConfiguredEnrichmentBolt(null);
+      configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
+      Assert.fail("A valid zookeeper url must be supplied");
+    } catch (RuntimeException e){}
+
+    configsUpdated = new HashSet<>();
+    sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
+    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.ENRICHMENTS_CONFIGS_PATH);
+    for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+      sampleConfigurations.updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
+    }
+
+    StandAloneConfiguredEnrichmentBolt configuredBolt = new StandAloneConfiguredEnrichmentBolt(zookeeperUrl);
+    configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    waitForConfigUpdate(enrichmentConfigurationTypes);
+    Assert.assertEquals(sampleConfigurations, configuredBolt.configurations);
+
+    configsUpdated = new HashSet<>();
+    Map<String, Object> sampleGlobalConfig = sampleConfigurations.getGlobalConfig();
+    sampleGlobalConfig.put("newGlobalField", "newGlobalValue");
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
+    waitForConfigUpdate(ConfigurationType.GLOBAL.getName());
+    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.configurations.getGlobalConfig());
+
+    configsUpdated = new HashSet<>();
+    sampleGlobalConfig.remove("newGlobalField");
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
+    waitForConfigUpdate(ConfigurationType.GLOBAL.getName());
+    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.configurations);
+
+    configsUpdated = new HashSet<>();
+    String sensorType = "testSensorConfig";
+    SensorEnrichmentConfig testSensorConfig = new SensorEnrichmentConfig();
+    testSensorConfig.setBatchSize(50);
+    testSensorConfig.setIndex("test");
+    Map<String, List<String>> enrichmentFieldMap = new HashMap<>();
+    enrichmentFieldMap.put("enrichmentTest", new ArrayList<String>() {{
+      add("enrichmentField");
+    }});
+    testSensorConfig.getEnrichment().setFieldMap(enrichmentFieldMap);
+    Map<String, List<String>> threatIntelFieldMap = new HashMap<>();
+    threatIntelFieldMap.put("threatIntelTest", new ArrayList<String>() {{
+      add("threatIntelField");
+    }});
+    testSensorConfig.getThreatIntel().setFieldMap(threatIntelFieldMap);
+    sampleConfigurations.updateSensorEnrichmentConfig(sensorType, testSensorConfig);
+    ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
+    waitForConfigUpdate(sensorType);
+    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.configurations);
+    configuredBolt.cleanup();
+  }
+}
\ No newline at end of file


Mime
View raw message