metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmerri...@apache.org
Subject [37/51] [partial] incubator-metron git commit: METRON-113 Project Reorganization (merrimanr) closes apache/incubator-metron#88
Date Tue, 26 Apr 2016 14:46:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
new file mode 100644
index 0000000..8050418
--- /dev/null
+++ b/metron-platform/metron-common/pom.xml
@@ -0,0 +1,300 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+  the specific language governing permissions and limitations under the License. 
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-platform</artifactId>
+        <version>0.1BETA</version>
+    </parent>
+    <artifactId>metron-common</artifactId>
+    <name>metron-common</name>
+    <description>Components common to all enrichments</description>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <commons.config.version>1.10</commons.config.version>
+    </properties>
+    <repositories>
+        <repository>
+            <id>Metron-Kraken-Repo</id>
+            <name>Metron Kraken Repository</name>
+            <url>https://raw.github.com/opensoc/kraken/mvn-repo</url>
+        </repository>
+    </repositories>
+    <dependencies>
+        <dependency>
+            <groupId>com.opencsv</groupId>
+            <artifactId>opencsv</artifactId>
+            <version>${global_opencsv_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <version>${global_json_simple_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.9.2</artifactId>
+            <version>${global_kafka_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>${global_metrics_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-graphite</artifactId>
+            <version>${global_metrics_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+            <version>${commons.config.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.krakenapps</groupId>
+            <artifactId>kraken-pcap</artifactId>
+            <version>${global_pcap_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>slf4j-simple</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_guava_version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>json-schema-validator</artifactId>
+            <version>${global_json_schema_validator_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>2.7.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>flux-core</artifactId>
+            <version>${global_flux_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+            <version>${global_storm_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>org.apache.curator</artifactId>
+                    <groupId>curator-client</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>2.7.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${global_mockito_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>nl.jqno.equalsverifier</groupId>
+            <artifactId>equalsverifier</artifactId>
+            <version>2.0.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-test-utilities</artifactId>
+            <version>0.1BETA</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <reporting>
+        <plugins>
+            <!-- Normally, dependency report takes time, skip it -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-project-info-reports-plugin</artifactId>
+                <version>2.7</version>
+
+                <configuration>
+                    <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>emma-maven-plugin</artifactId>
+                <version>1.0-alpha-3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-pmd-plugin</artifactId>
+                <configuration>
+                    <targetJdk>1.7</targetJdk>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.7</source>
+                    <compilerArgument>-Xlint:unchecked</compilerArgument>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>1.4</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                    <artifactSet>
+                        <excludes>
+                            <exclude>*slf4j*</exclude>
+                        </excludes>
+                    </artifactSet>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.metron.guava</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                    <resource>.yaml</resource>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/main/assembly/assembly.xml</descriptor>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/assembly/assembly.xml b/metron-platform/metron-common/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..305f0a6
--- /dev/null
+++ b/metron-platform/metron-common/src/main/assembly/assembly.xml
@@ -0,0 +1,33 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<assembly>
+  <id>archive</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${project.basedir}/src/main/scripts</directory>
+      <outputDirectory>/scripts</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0755</fileMode>
+      <lineEnding>unix</lineEnding>
+    </fileSet>
+  </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
new file mode 100644
index 0000000..3418e9c
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common;
+
+public class Constants {
+
+  public static final String GLOBAL_CONFIG_NAME = "global";
+  public static final String SENSORS_CONFIG_NAME = "sensors";
+  public static final String ZOOKEEPER_ROOT = "/metron";
+  public static final String ZOOKEEPER_TOPOLOGY_ROOT = ZOOKEEPER_ROOT + "/topology";
+  public static final String ZOOKEEPER_GLOBAL_ROOT = ZOOKEEPER_TOPOLOGY_ROOT + "/" + GLOBAL_CONFIG_NAME;
+  public static final String ZOOKEEPER_SENSOR_ROOT = ZOOKEEPER_TOPOLOGY_ROOT + "/" + SENSORS_CONFIG_NAME;
+  public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000;
+  public static final String SENSOR_TYPE = "source.type";
+  public static final String ENRICHMENT_TOPIC = "enrichments";
+  public static final String ERROR_STREAM = "error";
+  public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment";
+  public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
new file mode 100644
index 0000000..aa654fb
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+
+import java.io.IOException;
+import java.util.Map;
+
+public abstract class ConfiguredBolt extends BaseRichBolt {
+
+  private static final Logger LOG = Logger.getLogger(ConfiguredBolt.class);
+
+  private String zookeeperUrl;
+
+  protected final Configurations configurations = new Configurations();
+  protected CuratorFramework client;
+  protected TreeCache cache;
+
+  public ConfiguredBolt(String zookeeperUrl) {
+    this.zookeeperUrl = zookeeperUrl;
+  }
+
+  public Configurations getConfigurations() {
+    return configurations;
+  }
+
+  public void setCuratorFramework(CuratorFramework client) {
+    this.client = client;
+  }
+
+  public void setTreeCache(TreeCache cache) {
+    this.cache = cache;
+  }
+
+  public void reloadCallback(String name, Configurations.Type type) {
+  }
+
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    try {
+      if (client == null) {
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+      }
+      client.start();
+      if (cache == null) {
+        cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+        TreeCacheListener listener = new TreeCacheListener() {
+          @Override
+          public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+            if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
+              String path = event.getData().getPath();
+              byte[] data = event.getData().getData();
+              updateConfig(path, data);
+            }
+          }
+        };
+        cache.getListenable().addListener(listener);
+        try {
+          ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
+        } catch (Exception e) {
+          LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
+        }
+      }
+      cache.start();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void updateConfig(String path, byte[] data) throws IOException {
+    if (data.length != 0) {
+      String name = path.substring(path.lastIndexOf("/") + 1);
+      Configurations.Type type;
+      if (path.startsWith(Constants.ZOOKEEPER_SENSOR_ROOT)) {
+        configurations.updateSensorEnrichmentConfig(name, data);
+        type = Configurations.Type.SENSOR;
+      } else if (Constants.ZOOKEEPER_GLOBAL_ROOT.equals(path)) {
+        configurations.updateGlobalConfig(data);
+        type = Configurations.Type.GLOBAL;
+      } else {
+        configurations.updateConfig(name, data);
+        type = Configurations.Type.OTHER;
+      }
+      reloadCallback(name, type);
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    cache.close();
+    client.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
new file mode 100644
index 0000000..27f4c2a
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.SensorEnrichmentConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigurationsUtils {
+
+  public static CuratorFramework getClient(String zookeeperUrl) {
+    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+  }
+
+  public static void writeGlobalConfigToZookeeper(Map<String, Object> globalConfig, String zookeeperUrl) throws Exception {
+    writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), zookeeperUrl);
+  }
+
+  public static void writeGlobalConfigToZookeeper(byte[] globalConfig, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      writeGlobalConfigToZookeeper(globalConfig, client);
+    }
+    finally {
+      client.close();
+    }
+  }
+
+  public static void writeGlobalConfigToZookeeper(byte[] globalConfig, CuratorFramework client) throws Exception {
+    writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, globalConfig, client);
+  }
+
+  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig, String zookeeperUrl) throws Exception {
+    writeSensorEnrichmentConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorEnrichmentConfig), zookeeperUrl);
+  }
+
+  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      writeSensorEnrichmentConfigToZookeeper(sensorType, configData, client);
+    }
+    finally {
+      client.close();
+    }
+  }
+
+  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
+    writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, client);
+  }
+
+  public static void writeConfigToZookeeper(String name, Map<String, Object> config, String zookeeperUrl) throws Exception {
+    writeConfigToZookeeper(name, JSONUtils.INSTANCE.toJSON(config), zookeeperUrl);
+  }
+
+  public static void writeConfigToZookeeper(String name, byte[] config, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      writeToZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, config, client);
+    }
+    finally {
+      client.close();
+    }
+  }
+
+  public static void writeToZookeeper(String path, byte[] configData, CuratorFramework client) throws Exception {
+    try {
+      client.setData().forPath(path, configData);
+    } catch (KeeperException.NoNodeException e) {
+      client.create().creatingParentsIfNeeded().forPath(path, configData);
+    }
+  }
+
+  public static void updateConfigsFromZookeeper(Configurations configurations, CuratorFramework client) throws Exception {
+    configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
+    List<String> sensorTypes = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
+    for(String sensorType: sensorTypes) {
+      configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
+    }
+  }
+
+  public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
+    return readFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
+  }
+
+  public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+    return readFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+  }
+
+  public static byte[] readConfigBytesFromZookeeper(String name, CuratorFramework client) throws Exception {
+    return readFromZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, client);
+  }
+
+  public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
+    return client.getData().forPath(path);
+  }
+
+  public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), zookeeperUrl);
+    Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
+    for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+    }
+  }
+
+  public static byte[] readGlobalConfigFromFile(String rootFilePath) throws IOException {
+    return Files.readAllBytes(Paths.get(rootFilePath, Constants.GLOBAL_CONFIG_NAME + ".json"));
+  }
+
+  public static Map<String, byte[]> readSensorEnrichmentConfigsFromFile(String rootPath) throws IOException {
+    Map<String, byte[]> sensorEnrichmentConfigs = new HashMap<>();
+    for(File file: new File(rootPath, Constants.SENSORS_CONFIG_NAME).listFiles()) {
+      sensorEnrichmentConfigs.put(FilenameUtils.removeExtension(file.getName()), Files.readAllBytes(file.toPath()));
+    }
+    return sensorEnrichmentConfigs;
+  }
+
+  public static void dumpConfigs(String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    //Output global configs
+    {
+      System.out.println("Global config");
+      byte[] globalConfigData = client.getData().forPath(Constants.ZOOKEEPER_GLOBAL_ROOT);
+      System.out.println(new String(globalConfigData));
+    }
+    //Output sensor specific configs
+    {
+      List<String> children = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
+      for (String child : children) {
+        byte[] data = client.getData().forPath(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + child);
+        System.out.println("Config for source " + child);
+        System.out.println(new String(data));
+        System.out.println();
+      }
+    }
+    client.close();
+  }
+
+  public static void main(String[] args) {
+
+    Options options = new Options();
+    {
+      Option o = new Option("h", "help", false, "This screen");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("p", "config_files", true, "Path to the source config files.  Must be named like \"$source\".json");
+      o.setArgName("DIR_NAME");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("z", "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+      o.setArgName("ZK_QUORUM");
+      o.setRequired(true);
+      options.addOption(o);
+    }
+
+    try {
+      CommandLineParser parser = new PosixParser();
+      CommandLine cmd = null;
+      try {
+        cmd = parser.parse(options, args);
+      } catch (ParseException pe) {
+        pe.printStackTrace();
+        final HelpFormatter usageFormatter = new HelpFormatter();
+        usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
+        System.exit(-1);
+      }
+      if (cmd.hasOption("h")) {
+        final HelpFormatter usageFormatter = new HelpFormatter();
+        usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
+        System.exit(0);
+      }
+
+      String zkQuorum = cmd.getOptionValue("z");
+      if (cmd.hasOption("p")) {
+        String sourcePath = cmd.getOptionValue("p");
+        uploadConfigsToZookeeper(sourcePath, zkQuorum);
+      }
+
+      ConfigurationsUtils.dumpConfigs(zkQuorum);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      System.exit(-1);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
new file mode 100644
index 0000000..1ccf47b
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+public class Configuration extends Configurations {
+
+    protected CuratorFramework curatorFramework = null;
+    private Path configFileRoot;
+
+    public Configuration(CuratorFramework curatorFramework){
+
+        this.curatorFramework = curatorFramework;
+
+    }
+
+
+    public Configuration(Path configFileRoot){
+
+        this.configFileRoot = configFileRoot;
+    }
+
+    public void update() throws Exception {
+
+        if( null != curatorFramework ) {
+
+            ConfigurationsUtils.updateConfigsFromZookeeper(this, this.curatorFramework);
+
+        } else {
+
+            updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(configFileRoot.toAbsolutePath().toString()));
+            Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(configFileRoot.toAbsolutePath().toString());
+            for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+                updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
new file mode 100644
index 0000000..6aaa2b4
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class Configurations implements Serializable {
+
+  private static final Logger LOG = Logger.getLogger(Configurations.class);
+
+  public enum Type {
+    GLOBAL, SENSOR, OTHER
+  }
+
+  public static final String GLOBAL_CONFIG_NAME = "global";
+
+  private ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
+
+  @SuppressWarnings("unchecked")
+  public Map<String, Object> getGlobalConfig() {
+    return (Map<String, Object>) configurations.get(GLOBAL_CONFIG_NAME);
+  }
+
+  public void updateGlobalConfig(byte[] data) throws IOException {
+    updateGlobalConfig(new ByteArrayInputStream(data));
+  }
+
+  public void updateGlobalConfig(InputStream io) throws IOException {
+    Map<String, Object> globalConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {
+    });
+    updateGlobalConfig(globalConfig);
+  }
+
+  public void updateGlobalConfig(Map<String, Object> globalConfig) {
+    configurations.put(GLOBAL_CONFIG_NAME, globalConfig);
+  }
+
+  public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
+    return (SensorEnrichmentConfig) configurations.get(sensorType);
+  }
+
+  public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException {
+    updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
+  }
+
+  public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException {
+    SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
+    updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
+  }
+
+  public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) {
+    configurations.put(sensorType, sensorEnrichmentConfig);
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<String, Object> getConfig(String name) {
+    return (Map<String, Object>) configurations.get(name);
+  }
+
+  public void updateConfig(String name, byte[] data) throws IOException {
+    if (data == null) throw new IllegalStateException("config data cannot be null");
+    Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>() {});
+    updateConfig(name, config);
+  }
+
+  public void updateConfig(String name, Map<String, Object> config) {
+    configurations.put(name, config);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    Configurations that = (Configurations) o;
+    return configurations.equals(that.configurations);
+  }
+
+  @Override
+  public int hashCode() {
+    return configurations.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return configurations.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
new file mode 100644
index 0000000..2ead81e
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import com.google.common.base.Joiner;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class EnrichmentConfig {
+  public static enum Type {
+     THREAT_INTEL
+    ,ENRICHMENT
+  }
+
+  protected static final Logger _LOG = LoggerFactory.getLogger(EnrichmentConfig.class);
+  public static class FieldList {
+    Type type;
+    Map<String, List<String>> fieldToEnrichmentTypes;
+
+    public Type getType() {
+      return type;
+    }
+
+    public void setType(Type type) {
+      this.type = type;
+    }
+
+    public Map<String, List<String>> getFieldToEnrichmentTypes() {
+      return fieldToEnrichmentTypes;
+    }
+
+    public void setFieldToEnrichmentTypes(Map<String, List<String>> fieldToEnrichmentTypes) {
+      this.fieldToEnrichmentTypes = fieldToEnrichmentTypes;
+    }
+  }
+  public String zkQuorum;
+  public Map<String, FieldList> sensorToFieldList;
+
+  public String getZkQuorum() {
+    return zkQuorum;
+  }
+
+  public void setZkQuorum(String zkQuorum) {
+    this.zkQuorum = zkQuorum;
+  }
+
+  public Map<String, FieldList> getSensorToFieldList() {
+    return sensorToFieldList;
+  }
+
+  public void setSensorToFieldList(Map<String, FieldList> sensorToFieldList) {
+    this.sensorToFieldList = sensorToFieldList;
+  }
+
+  public void updateSensorConfigs( ) throws Exception {
+    CuratorFramework client = ConfigurationsUtils.getClient(getZkQuorum());
+    try {
+      client.start();
+      updateSensorConfigs(new ZKSourceConfigHandler(client), sensorToFieldList);
+    }
+    finally {
+      client.close();
+    }
+  }
+
+  public static interface SourceConfigHandler {
+    SensorEnrichmentConfig readConfig(String sensor) throws Exception;
+    void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception;
+  }
+
+  public static class ZKSourceConfigHandler implements SourceConfigHandler {
+    CuratorFramework client;
+    public ZKSourceConfigHandler(CuratorFramework client) {
+      this.client = client;
+    }
+    @Override
+    public SensorEnrichmentConfig readConfig(String sensor) throws Exception {
+      return SensorEnrichmentConfig.fromBytes(ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensor, client));
+    }
+
+    @Override
+    public void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception {
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.toJSON().getBytes(), client);
+    }
+  }
+
+  public static void updateSensorConfigs( SourceConfigHandler scHandler
+                                        , Map<String, FieldList> sensorToFieldList
+                                        ) throws Exception
+  {
+    Map<String, SensorEnrichmentConfig> sourceConfigsChanged = new HashMap<>();
+    for (Map.Entry<String, FieldList> kv : sensorToFieldList.entrySet()) {
+      SensorEnrichmentConfig config = sourceConfigsChanged.get(kv.getKey());
+      if(config == null) {
+        config = scHandler.readConfig(kv.getKey());
+        if(_LOG.isDebugEnabled()) {
+          _LOG.debug(config.toJSON());
+        }
+      }
+      Map<String, List<String> > fieldMap = null;
+      Map<String, List<String> > fieldToTypeMap = null;
+      List<String> fieldList = null;
+      if(kv.getValue().type == Type.THREAT_INTEL) {
+        fieldMap = config.getThreatIntelFieldMap();
+        if(fieldMap!= null) {
+          fieldList = fieldMap.get(Constants.SIMPLE_HBASE_THREAT_INTEL);
+        }
+        if(fieldList == null) {
+          fieldList = new ArrayList<>();
+          fieldMap.put(Constants.SIMPLE_HBASE_THREAT_INTEL, fieldList);
+        }
+        fieldToTypeMap = config.getFieldToThreatIntelTypeMap();
+        if(fieldToTypeMap == null) {
+          fieldToTypeMap = new HashMap<>();
+          config.setFieldToThreatIntelTypeMap(fieldToTypeMap);
+        }
+      }
+      else if(kv.getValue().type == Type.ENRICHMENT) {
+        fieldMap = config.getEnrichmentFieldMap();
+        if(fieldMap!= null) {
+          fieldList = fieldMap.get(Constants.SIMPLE_HBASE_ENRICHMENT);
+        }
+        if(fieldList == null) {
+          fieldList = new ArrayList<>();
+          fieldMap.put(Constants.SIMPLE_HBASE_ENRICHMENT, fieldList);
+        }
+        fieldToTypeMap = config.getFieldToEnrichmentTypeMap();
+        if(fieldToTypeMap == null) {
+          fieldToTypeMap = new HashMap<>();
+          config.setFieldToEnrichmentTypeMap(fieldToTypeMap);
+        }
+      }
+      if(fieldToTypeMap == null  || fieldMap == null) {
+        _LOG.debug("fieldToTypeMap is null or fieldMap is null, so skipping");
+        continue;
+      }
+      //Add the additional fields to the field list associated with the hbase adapter
+      {
+        HashSet<String> fieldSet = new HashSet<>(fieldList);
+        List<String> additionalFields = new ArrayList<>();
+        for (String field : kv.getValue().getFieldToEnrichmentTypes().keySet()) {
+          if (!fieldSet.contains(field)) {
+            additionalFields.add(field);
+          }
+        }
+        //adding only the ones that we don't already have to the field list
+        if (additionalFields.size() > 0) {
+          _LOG.debug("Adding additional fields: " + Joiner.on(',').join(additionalFields));
+          fieldList.addAll(additionalFields);
+          sourceConfigsChanged.put(kv.getKey(), config);
+        }
+      }
+      //Add the additional enrichment types to the mapping between the fields
+      {
+        for(Map.Entry<String, List<String>> fieldToType : kv.getValue().getFieldToEnrichmentTypes().entrySet()) {
+          String field = fieldToType.getKey();
+          final HashSet<String> types = new HashSet<>(fieldToType.getValue());
+          int sizeBefore = 0;
+          if(fieldToTypeMap.containsKey(field)) {
+            List<String> typeList = fieldToTypeMap.get(field);
+            sizeBefore = new HashSet<>(typeList).size();
+            types.addAll(typeList);
+          }
+          int sizeAfter = types.size();
+          boolean changed = sizeBefore != sizeAfter;
+          if(changed) {
+            fieldToTypeMap.put(field, new ArrayList<String>() {{
+                addAll(types);
+              }});
+            sourceConfigsChanged.put(kv.getKey(), config);
+          }
+        }
+      }
+    }
+    for(Map.Entry<String, SensorEnrichmentConfig> kv : sourceConfigsChanged.entrySet()) {
+      scHandler.persistConfig(kv.getKey(), kv.getValue());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
new file mode 100644
index 0000000..bc30327
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SensorEnrichmentConfig {
+
+  private String index;
+  private Map<String, List<String>> enrichmentFieldMap;
+  private Map<String, List<String>> threatIntelFieldMap;
+  private Map<String, List<String>> fieldToEnrichmentTypeMap = new HashMap<>();
+  private Map<String, List<String>> fieldToThreatIntelTypeMap = new HashMap<>();
+  private int batchSize;
+
+  public String getIndex() {
+    return index;
+  }
+
+  public void setIndex(String index) {
+    this.index = index;
+  }
+
+  public Map<String, List<String>> getEnrichmentFieldMap() {
+    return enrichmentFieldMap;
+  }
+
+  public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
+    this.enrichmentFieldMap = enrichmentFieldMap;
+  }
+
+  public Map<String, List<String>> getThreatIntelFieldMap() {
+    return threatIntelFieldMap;
+  }
+
+  public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
+    this.threatIntelFieldMap = threatIntelFieldMap;
+  }
+
+  public Map<String, List<String>> getFieldToEnrichmentTypeMap() {
+    return fieldToEnrichmentTypeMap;
+  }
+
+  public Map<String, List<String>> getFieldToThreatIntelTypeMap() {
+    return fieldToThreatIntelTypeMap;
+  }
+  public void setFieldToEnrichmentTypeMap(Map<String, List<String>> fieldToEnrichmentTypeMap) {
+    this.fieldToEnrichmentTypeMap = fieldToEnrichmentTypeMap;
+  }
+
+  public void setFieldToThreatIntelTypeMap(Map<String, List<String>> fieldToThreatIntelTypeMap) {
+    this.fieldToThreatIntelTypeMap= fieldToThreatIntelTypeMap;
+  }
+  public int getBatchSize() {
+    return batchSize;
+  }
+
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  public static SensorEnrichmentConfig fromBytes(byte[] config) throws IOException {
+    return JSONUtils.INSTANCE.load(new String(config), SensorEnrichmentConfig.class);
+  }
+
+  public String toJSON() throws JsonProcessingException {
+    return JSONUtils.INSTANCE.toJSON(this, true);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    SensorEnrichmentConfig that = (SensorEnrichmentConfig) o;
+
+    if (getBatchSize() != that.getBatchSize()) return false;
+    if (getIndex() != null ? !getIndex().equals(that.getIndex()) : that.getIndex() != null) return false;
+    if (getEnrichmentFieldMap() != null ? !getEnrichmentFieldMap().equals(that.getEnrichmentFieldMap()) : that.getEnrichmentFieldMap() != null)
+      return false;
+    if (getThreatIntelFieldMap() != null ? !getThreatIntelFieldMap().equals(that.getThreatIntelFieldMap()) : that.getThreatIntelFieldMap() != null)
+      return false;
+    if (getFieldToEnrichmentTypeMap() != null ? !getFieldToEnrichmentTypeMap().equals(that.getFieldToEnrichmentTypeMap()) : that.getFieldToEnrichmentTypeMap() != null)
+      return false;
+    return getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().equals(that.getFieldToThreatIntelTypeMap()) : that.getFieldToThreatIntelTypeMap() == null;
+
+  }
+
+  @Override
+  public String toString() {
+    return "{index=" + index + ", batchSize=" + batchSize +
+            ", enrichmentFieldMap=" + enrichmentFieldMap +
+            ", threatIntelFieldMap" + threatIntelFieldMap +
+            ", fieldToEnrichmentTypeMap=" + fieldToEnrichmentTypeMap +
+            ", fieldToThreatIntelTypeMap=" + fieldToThreatIntelTypeMap + "}";
+  }
+
+  @Override
+  public int hashCode() {
+    int result = getIndex() != null ? getIndex().hashCode() : 0;
+    result = 31 * result + (getEnrichmentFieldMap() != null ? getEnrichmentFieldMap().hashCode() : 0);
+    result = 31 * result + (getThreatIntelFieldMap() != null ? getThreatIntelFieldMap().hashCode() : 0);
+    result = 31 * result + (getFieldToEnrichmentTypeMap() != null ? getFieldToEnrichmentTypeMap().hashCode() : 0);
+    result = 31 * result + (getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().hashCode() : 0);
+    result = 31 * result + getBatchSize();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
new file mode 100644
index 0000000..6fb3d78
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/BulkMessageWriter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.interfaces;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.Configurations;
+
+import java.util.List;
+import java.util.Map;
+
+public interface BulkMessageWriter<T> extends AutoCloseable {
+
+  void init(Map stormConf, Configurations configuration) throws Exception;
+  void write(String sensorType, Configurations configurations, List<Tuple> tuples, List<T> messages) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
new file mode 100644
index 0000000..a90a8cb
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/MessageWriter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.interfaces;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.Configurations;
+
+public interface MessageWriter<T> extends AutoCloseable {
+
+  void init();
+  void write(String sensorType, Configurations configurations, Tuple tuple, T message) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
new file mode 100644
index 0000000..6e139c8
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Values;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.metron.common.Constants;
+import org.json.simple.JSONObject;
+
+public class ErrorUtils {
+
+	@SuppressWarnings("unchecked")
+	public static JSONObject generateErrorMessage(String message, Throwable t)
+	{
+		JSONObject error_message = new JSONObject();
+		
+		/*
+		 * Save full stack trace in object.
+		 */
+		String stackTrace = ExceptionUtils.getStackTrace(t);
+		
+		String exception = t.toString();
+		
+		error_message.put("time", System.currentTimeMillis());
+		try {
+			error_message.put("hostname", InetAddress.getLocalHost().getHostName());
+		} catch (UnknownHostException ex) {
+			// TODO Auto-generated catch block
+			ex.printStackTrace();
+		}
+		
+		error_message.put("message", message);
+		error_message.put(Constants.SENSOR_TYPE, "error");
+		error_message.put("exception", exception);
+		error_message.put("stack", stackTrace);
+		
+		return error_message;
+	}
+
+	public static void handleError(OutputCollector collector, Throwable t, String errorStream) {
+		JSONObject error = ErrorUtils.generateErrorMessage(t.getMessage(), t);
+		collector.emit(errorStream, new Values(error));
+		collector.reportError(t);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
new file mode 100644
index 0000000..4af9ad1
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.*;
+
+public enum JSONUtils {
+  INSTANCE;
+  private static ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
+    /**
+     * Returns the current thread's "initial value" for this
+     * thread-local variable.  This method will be invoked the first
+     * time a thread accesses the variable with the {@link #get}
+     * method, unless the thread previously invoked the {@link #set}
+     * method, in which case the {@code initialValue} method will not
+     * be invoked for the thread.  Normally, this method is invoked at
+     * most once per thread, but it may be invoked again in case of
+     * subsequent invocations of {@link #remove} followed by {@link #get}.
+     * <p>
+     * <p>This implementation simply returns {@code null}; if the
+     * programmer desires thread-local variables to have an initial
+     * value other than {@code null}, {@code ThreadLocal} must be
+     * subclassed, and this method overridden.  Typically, an
+     * anonymous inner class will be used.
+     *
+     * @return the initial value for this thread-local
+     */
+    @Override
+    protected ObjectMapper initialValue() {
+      return new ObjectMapper();
+    }
+  };
+
+  public <T> T load(InputStream is, TypeReference<T> ref) throws IOException {
+    return _mapper.get().readValue(is, ref);
+  }
+  public <T> T load(String is, TypeReference<T> ref) throws IOException {
+    return _mapper.get().readValue(is, ref);
+  }
+  public <T> T load(File f, TypeReference<T> ref) throws IOException {
+    return _mapper.get().readValue(new BufferedInputStream(new FileInputStream(f)), ref);
+  }
+  public <T> T load(InputStream is, Class<T> clazz) throws IOException {
+    return _mapper.get().readValue(is, clazz);
+  }
+
+  public <T> T load(File f, Class<T> clazz) throws IOException {
+    return _mapper.get().readValue(new BufferedInputStream(new FileInputStream(f)), clazz);
+  }
+  public <T> T load(String is, Class<T> clazz) throws IOException {
+    return _mapper.get().readValue(is, clazz);
+  }
+
+  public String toJSON(Object o, boolean pretty) throws JsonProcessingException {
+    if(pretty) {
+      return _mapper.get().writerWithDefaultPrettyPrinter().writeValueAsString(o);
+    }
+    else {
+      return _mapper.get().writeValueAsString(o);
+    }
+  }
+
+  public byte[] toJSON(Object config) throws JsonProcessingException {
+    return _mapper.get().writeValueAsBytes(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java
new file mode 100644
index 0000000..df711fa
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import org.apache.metron.common.Constants;
+import org.json.simple.JSONObject;
+
+public class MessageUtils {
+
+  public static String getSensorType(JSONObject message) {
+    return (String) message.get(Constants.SENSOR_TYPE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
new file mode 100644
index 0000000..2afa097
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class ReflectionUtils<T> {
+
+  public static <T> T createInstance(String className, T defaultClass) {
+    T instance;
+    if(className == null || className.length() == 0 || className.charAt(0) == '$') {
+      return defaultClass;
+    }
+    else {
+      try {
+        Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
+        instance = clazz.getConstructor().newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalStateException("Unable to instantiate connector.", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
+      } catch (InvocationTargetException e) {
+        throw new IllegalStateException("Unable to instantiate connector", e);
+      } catch (NoSuchMethodException e) {
+        throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException("Unable to instantiate connector: class not found", e);
+      }
+    }
+    return instance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/main/scripts/zk_load_configs.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/scripts/zk_load_configs.sh b/metron-platform/metron-common/src/main/scripts/zk_load_configs.sh
new file mode 100755
index 0000000..4a928bd
--- /dev/null
+++ b/metron-platform/metron-common/src/main/scripts/zk_load_configs.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+export METRON_VERSION=0.1BETA
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export PARSERS_JAR=metron-parsers-$METRON_VERSION.jar
+export ZK_HOME=${ZK_HOME:-/usr/hdp/current/hbase-client}
+java -cp $METRON_HOME/lib/$PARSERS_JAR org.apache.metron.common.cli.ConfigurationsUtils "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java
new file mode 100644
index 0000000..a791086
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredBoltTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.curator.test.TestingServer;
+import org.apache.metron.common.Constants;
+import org.apache.metron.TestConstants;
+import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.SensorEnrichmentConfig;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConfiguredBoltTest extends BaseEnrichmentBoltTest {
+  private static Set<String> configsUpdated = new HashSet<>();
+  private Set<String> allConfigurationTypes = new HashSet<>();
+  private String zookeeperUrl;
+
+  public static class StandAloneConfiguredBolt extends ConfiguredBolt {
+
+    public StandAloneConfiguredBolt(String zookeeperUrl) {
+      super(zookeeperUrl);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+
+    @Override
+    public void reloadCallback(String name, Configurations.Type type) {
+      configsUpdated.add(name);
+    }
+  }
+
+  @Before
+  public void setupConfiguration() throws Exception {
+    TestingServer testZkServer = new TestingServer(true);
+    this.zookeeperUrl = testZkServer.getConnectString();
+    byte[] globalConfig = ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, zookeeperUrl);
+    allConfigurationTypes.add(Constants.GLOBAL_CONFIG_NAME);
+    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+    for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+      allConfigurationTypes.add(sensorType);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    Configurations sampleConfigurations = new Configurations();
+    try {
+      StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(null);
+      configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
+      Assert.fail("A valid zookeeper url must be supplied");
+    } catch (RuntimeException e){}
+
+    configsUpdated = new HashSet<>();
+    sampleConfigurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
+    Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+    for (String sensorType : sensorEnrichmentConfigs.keySet()) {
+      sampleConfigurations.updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
+    }
+
+    StandAloneConfiguredBolt configuredBolt = new StandAloneConfiguredBolt(zookeeperUrl);
+    configuredBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    waitForConfigUpdate(allConfigurationTypes);
+    Assert.assertEquals(sampleConfigurations, configuredBolt.configurations);
+
+    configsUpdated = new HashSet<>();
+    Map<String, Object> sampleGlobalConfig = sampleConfigurations.getGlobalConfig();
+    sampleGlobalConfig.put("newGlobalField", "newGlobalValue");
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
+    waitForConfigUpdate(Constants.GLOBAL_CONFIG_NAME);
+    Assert.assertEquals("Add global config field", sampleConfigurations.getGlobalConfig(), configuredBolt.configurations.getGlobalConfig());
+
+    configsUpdated = new HashSet<>();
+    sampleGlobalConfig.remove("newGlobalField");
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(sampleGlobalConfig, zookeeperUrl);
+    waitForConfigUpdate(Constants.GLOBAL_CONFIG_NAME);
+    Assert.assertEquals("Remove global config field", sampleConfigurations, configuredBolt.configurations);
+
+    configsUpdated = new HashSet<>();
+    String sensorType = "testSensorConfig";
+    SensorEnrichmentConfig testSensorConfig = new SensorEnrichmentConfig();
+    testSensorConfig.setBatchSize(50);
+    testSensorConfig.setIndex("test");
+    Map<String, List<String>> enrichmentFieldMap = new HashMap<>();
+    enrichmentFieldMap.put("enrichmentTest", new ArrayList<String>() {{
+      add("enrichmentField");
+    }});
+    testSensorConfig.setEnrichmentFieldMap(enrichmentFieldMap);
+    Map<String, List<String>> threatIntelFieldMap = new HashMap<>();
+    threatIntelFieldMap.put("threatIntelTest", new ArrayList<String>() {{
+      add("threatIntelField");
+    }});
+    testSensorConfig.setThreatIntelFieldMap(threatIntelFieldMap);
+    sampleConfigurations.updateSensorEnrichmentConfig(sensorType, testSensorConfig);
+    ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, testSensorConfig, zookeeperUrl);
+    waitForConfigUpdate(sensorType);
+    Assert.assertEquals("Add new sensor config", sampleConfigurations, configuredBolt.configurations);
+
+    configsUpdated = new HashSet<>();
+    String someConfigType = "someConfig";
+    Map<String, Object> someConfig = new HashMap<>();
+    someConfig.put("someField", "someValue");
+    sampleConfigurations.updateConfig(someConfigType, someConfig);
+    ConfigurationsUtils.writeConfigToZookeeper(someConfigType, someConfig, zookeeperUrl);
+    waitForConfigUpdate(someConfigType);
+    Assert.assertEquals("Add new misc config", sampleConfigurations, configuredBolt.configurations);
+    configuredBolt.cleanup();
+  }
+
+  private void waitForConfigUpdate(final String expectedConfigUpdate) {
+    waitForConfigUpdate(new HashSet<String>() {{ add(expectedConfigUpdate); }});
+  }
+
+  private void waitForConfigUpdate(Set<String> expectedConfigUpdates) {
+    int count = 0;
+    while (!configsUpdated.equals(expectedConfigUpdates)) {
+      if (count++ > 5) {
+        Assert.fail("ConfiguredBolt was not updated in time");
+        return;
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/test/java/org/apache/metron/common/cli/ConfigurationsUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/cli/ConfigurationsUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/cli/ConfigurationsUtilsTest.java
new file mode 100644
index 0000000..bee4af7
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/cli/ConfigurationsUtilsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.cli;
+
+import junit.framework.Assert;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.cli.ConfigurationsUtils;
+import org.apache.metron.common.utils.JSONUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConfigurationsUtilsTest {
+
+  private TestingServer testZkServer;
+  private String zookeeperUrl;
+  private CuratorFramework client;
+  private byte[] testGlobalConfig;
+  private Map<String, byte[]> testSensorConfigMap;
+
+  @Before
+  public void setup() throws Exception {
+    testZkServer = new TestingServer(true);
+    zookeeperUrl = testZkServer.getConnectString();
+    client = ConfigurationsUtils.getClient(zookeeperUrl);
+    client.start();
+    testGlobalConfig = ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+    testSensorConfigMap = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+  }
+
+  @Test
+  public void test() throws Exception {
+    Assert.assertTrue(testGlobalConfig.length > 0);
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(testGlobalConfig, zookeeperUrl);
+    byte[] readGlobalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
+    Assert.assertTrue(Arrays.equals(testGlobalConfig, readGlobalConfigBytes));
+
+    Assert.assertTrue(testSensorConfigMap.size() > 0);
+    String testSensorType = "yaf";
+    byte[] testSensorConfigBytes = testSensorConfigMap.get(testSensorType);
+    ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(testSensorType, testSensorConfigBytes, zookeeperUrl);
+    byte[] readSensorConfigBytes = ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(testSensorType, client);
+    Assert.assertTrue(Arrays.equals(testSensorConfigBytes, readSensorConfigBytes));
+    String name = "testConfig";
+    Map<String, Object> testConfig = new HashMap<>();
+    testConfig.put("stringField", "value");
+    testConfig.put("intField", 1);
+    testConfig.put("doubleField", 1.1);
+    ConfigurationsUtils.writeConfigToZookeeper(name, testConfig, zookeeperUrl);
+    byte[] readConfigBytes = ConfigurationsUtils.readConfigBytesFromZookeeper(name, client);
+    Assert.assertTrue(Arrays.equals(JSONUtils.INSTANCE.toJSON(testConfig), readConfigBytes));
+
+  }
+
+  @Test
+  public void testCmdLine() throws Exception {
+    String[] args = {"-z", zookeeperUrl, "-p", TestConstants.SAMPLE_CONFIG_PATH};
+    ConfigurationsUtils.main(args);
+    byte[] readGlobalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
+    Assert.assertTrue(Arrays.equals(testGlobalConfig, readGlobalConfigBytes));
+    for(String sensorType: testSensorConfigMap.keySet()) {
+      byte[] readSensorConfigBytes = ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client);
+      Assert.assertTrue(Arrays.equals(testSensorConfigMap.get(sensorType), readSensorConfigBytes));
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    client.close();
+    testZkServer.close();
+    testZkServer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigurationTest.java
new file mode 100644
index 0000000..fb45ccc
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigurationTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ExistsBuilder;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.metron.common.Constants;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfigurationTest {
+
+    private static final String TEST_PROPERTY = "configuration.class.test.property";
+    private static final String TEST_VALUE = "Configuration";
+    @Test
+    public void testCanReadFromFile() throws Exception {
+
+        Configuration configuration = new Configuration(Paths.get("./src/test/resources/config/"));
+        configuration.update();
+
+        checkResult(configuration);
+
+    }
+
+    @Test
+    public void testCanReadFromZookeeper() throws Exception {
+
+        CuratorFramework curatorFramework = mock(CuratorFramework.class);
+        ExistsBuilder existsBuilder = mock(ExistsBuilder.class);
+        GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+        GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
+
+        when(getDataBuilder.forPath(Constants.ZOOKEEPER_GLOBAL_ROOT)).thenReturn(mockGlobalData());
+        when(curatorFramework.checkExists()).thenReturn(existsBuilder);
+        when(curatorFramework.getData()).thenReturn(getDataBuilder);
+        when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+        when(getChildrenBuilder.forPath(anyString())).thenReturn(Collections.<String> emptyList());
+
+        Configuration configuration = new Configuration(Paths.get("foo"));
+        configuration.curatorFramework = curatorFramework;
+        configuration.update();
+
+        checkResult(configuration);
+    }
+
+
+    private byte[] mockGlobalData(){
+
+        JSONObject global = new JSONObject();
+        global.put(TEST_PROPERTY, TEST_VALUE);
+        return global.toString().getBytes();
+
+    }
+
+
+    private void checkResult( Configuration configuration ){
+
+        assertEquals("File contains 1 entry: ",1,configuration.getGlobalConfig().size());
+        String testValue = configuration.getGlobalConfig().get(TEST_PROPERTY).toString();
+        assertEquals(TEST_PROPERTY + " should be \"" + TEST_VALUE + "\"",TEST_VALUE,testValue);
+
+
+    }
+}
+


Mime
View raw message