metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [14/15] incubator-metron git commit: METRON 86: Adding Solr indexing support (merrimanr via cestella) closes apache/incubator-metron#67
Date Tue, 05 Apr 2016 19:42:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
index 30c8e23..a832ebb 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
@@ -23,17 +23,16 @@ import backtype.storm.topology.base.BaseRichBolt;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.log4j.Logger;
 import org.apache.metron.Constants;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.utils.ConfigurationsUtils;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 public abstract class ConfiguredBolt extends BaseRichBolt {
@@ -41,51 +40,62 @@ public abstract class ConfiguredBolt extends BaseRichBolt {
   private static final Logger LOG = Logger.getLogger(ConfiguredBolt.class);
 
   private String zookeeperUrl;
+  private long timeout = Constants.DEFAULT_CONFIGURED_BOLT_TIMEOUT;
 
-  protected Map<String, SourceConfig> configurations = Collections.synchronizedMap(new HashMap<String, SourceConfig>());
+  protected final Configurations configurations = new Configurations();
   private CuratorFramework client;
-  private PathChildrenCache cache;
+  private TreeCache cache;
 
   public ConfiguredBolt(String zookeeperUrl) {
     this.zookeeperUrl = zookeeperUrl;
   }
 
+  public ConfiguredBolt withTimeout(long timeout) {
+    this.timeout = timeout;
+    return this;
+  }
+
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
     client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
     client.start();
-    cache = new PathChildrenCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT, true);
-    PathChildrenCacheListener listener = new PathChildrenCacheListener() {
-      @Override
-      public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
-        if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
-          byte[] data = event.getData().getData();
-          if (data != null) {
-            SourceConfig temp = SourceConfig.load(data);
-            if (temp != null) {
-              String[] path = event.getData().getPath().split("/");
-              configurations.put(path[path.length - 1], temp);
-            }
+    try {
+      ConfigurationsUtils.updateConfigsFromZookeeper(configurations, client);
+      cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+      TreeCacheListener listener = new TreeCacheListener() {
+        @Override
+        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+          if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
+            String path = event.getData().getPath();
+            byte[] data = event.getData().getData();
+            updateConfig(path, data);
           }
         }
-      }
-    };
-    cache.getListenable().addListener(listener);
-    try {
+      };
+      cache.getListenable().addListener(listener);
       cache.start();
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
+  public void updateConfig(String path, byte[] data) throws IOException {
+    if (data.length != 0 && path != null) {
+      String name = path.substring(path.lastIndexOf("/") + 1);
+      if (path.startsWith(Constants.ZOOKEEPER_SENSOR_ROOT)) {
+        configurations.updateSensorEnrichmentConfig(name, data);
+      } else if (Constants.ZOOKEEPER_GLOBAL_ROOT.equals(path)) {
+        configurations.updateGlobalConfig(data);
+      } else {
+        configurations.updateConfig(name, data);
+      }
+    }
+  }
+
   @Override
   public void cleanup() {
-    try {
-      cache.close();
-      client.close();
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
+    cache.close();
+    client.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
new file mode 100644
index 0000000..d93cc5f
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configurations.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.log4j.Logger;
+import org.apache.metron.utils.JSONUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class Configurations implements Serializable {
+
+  private static final Logger LOG = Logger.getLogger(Configurations.class);
+
+  public static final String GLOBAL_CONFIG_NAME = "global";
+
+  private ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>();
+
+  @SuppressWarnings("unchecked")
+  public Map<String, Object> getGlobalConfig() {
+    return (Map<String, Object>) configurations.get(GLOBAL_CONFIG_NAME);
+  }
+
+  public void updateGlobalConfig(byte[] data) throws IOException {
+    updateGlobalConfig(new ByteArrayInputStream(data));
+  }
+
+  public void updateGlobalConfig(InputStream io) throws IOException {
+    Map<String, Object> globalConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {});
+    updateGlobalConfig(globalConfig);
+  }
+
+  public void updateGlobalConfig(Map<String, Object> globalConfig) {
+    configurations.put(GLOBAL_CONFIG_NAME, globalConfig);
+  }
+
+  public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
+    return (SensorEnrichmentConfig) configurations.get(sensorType);
+  }
+
+  public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException {
+    updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
+  }
+
+  public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException {
+    SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
+    updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
+  }
+
+  public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) {
+    configurations.put(sensorType, sensorEnrichmentConfig);
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<String, Object> getConfig(String name) {
+    return (Map<String, Object>) configurations.get(name);
+  }
+
+  public void updateConfig(String name, byte[] data) {
+    try {
+      Map<String, Object> config = JSONUtils.INSTANCE.load(new ByteArrayInputStream(data), new TypeReference<Map<String, Object>>(){});
+      updateConfig(name, config);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public void updateConfig(String name, Map<String, Object> config) {
+    configurations.put(name, config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
new file mode 100644
index 0000000..b24e8a8
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SensorEnrichmentConfig.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import java.util.List;
+import java.util.Map;
+
+public class SensorEnrichmentConfig {
+
+  private String index;
+  private Map<String, List<String>> enrichmentFieldMap;
+  private Map<String, List<String>> threatIntelFieldMap;
+  private int batchSize;
+
+  public String getIndex() {
+    return index;
+  }
+
+  public void setIndex(String index) {
+    this.index = index;
+  }
+
+  public Map<String, List<String>> getEnrichmentFieldMap() {
+    return enrichmentFieldMap;
+  }
+
+  public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
+    this.enrichmentFieldMap = enrichmentFieldMap;
+  }
+
+  public Map<String, List<String>> getThreatIntelFieldMap() {
+    return threatIntelFieldMap;
+  }
+
+  public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
+    this.threatIntelFieldMap = threatIntelFieldMap;
+  }
+
+  public int getBatchSize() {
+    return batchSize;
+  }
+
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
deleted file mode 100644
index 8e1a960..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.domain;
-
-import org.codehaus.jackson.map.ObjectMapper;
-import org.yaml.snakeyaml.TypeDescription;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.Constructor;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-public class SourceConfig {
-
-  final static ObjectMapper _mapper = new ObjectMapper();
-
-  private String index;
-  private Map<String, List<String>> enrichmentFieldMap;
-  private Map<String, List<String>> threatIntelFieldMap;
-  private int batchSize;
-
-  public String getIndex() {
-    return index;
-  }
-
-  public void setIndex(String index) {
-    this.index = index;
-  }
-
-  public Map<String, List<String>> getEnrichmentFieldMap() {
-    return enrichmentFieldMap;
-  }
-
-  public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
-    this.enrichmentFieldMap = enrichmentFieldMap;
-  }
-
-  public Map<String, List<String>> getThreatIntelFieldMap() {
-    return threatIntelFieldMap;
-  }
-
-  public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
-    this.threatIntelFieldMap = threatIntelFieldMap;
-  }
-
-  public int getBatchSize() {
-    return batchSize;
-  }
-
-  public void setBatchSize(int batchSize) {
-    this.batchSize = batchSize;
-  }
-
-  public static synchronized SourceConfig load(InputStream is) throws IOException {
-    SourceConfig ret = _mapper.readValue(is, SourceConfig.class);
-    return ret;
-  }
-
-  public static synchronized SourceConfig load(byte[] data) throws IOException {
-    return load( new ByteArrayInputStream(data));
-  }
-
-  public static synchronized SourceConfig load(String s, Charset c) throws IOException {
-    return load( s.getBytes(c));
-  }
-  public static synchronized SourceConfig load(String s) throws IOException {
-    return load( s, Charset.defaultCharset());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
index b02cbaf..a1b7ccc 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
@@ -49,7 +49,7 @@ public class ErrorUtils {
 		}
 		
 		error_message.put("message", message);
-		error_message.put(Constants.SOURCE_TYPE, "error");
+		error_message.put(Constants.SENSOR_TYPE, "error");
 		error_message.put("exception", exception);
 		error_message.put("stack", stackTrace);
 		

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
index a046801..10ab03d 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
@@ -17,10 +17,13 @@
  */
 package org.apache.metron.pcap;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import com.google.common.base.Joiner;
 import org.apache.commons.lang.StringUtils;
 
 import com.google.common.collect.BiMap;
@@ -210,6 +213,15 @@ public class PcapUtils {
 
   }
 
+  public static String convertHexToIpv4Ip(String hex) {
+    List<Integer> ipSegments = new ArrayList<>();
+    for(int i = 0; i < hex.length(); i += 2) {
+      String segment = hex.substring(i, i + 2);
+      ipSegments.add(Integer.parseInt(segment, 16));
+    }
+    return Joiner.on(".").join(ipSegments);
+  }
+
   /**
    * Gets the session key.
    * 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
index 581d74f..78371d8 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
@@ -22,7 +22,7 @@ import org.json.simple.JSONObject;
 
 public class TopologyUtils {
 
-  public static String getSourceType(JSONObject message) {
-    return (String) message.get(Constants.SOURCE_TYPE);
+  public static String getSensorType(JSONObject message) {
+    return (String) message.get(Constants.SENSOR_TYPE);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
new file mode 100644
index 0000000..20026b2
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigurationsUtils.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.Configurations;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigurationsUtils {
+
+  public static CuratorFramework getClient(String zookeeperUrl) {
+    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+  }
+
+  public static void writeToZookeeperFromFile(String path, String filePath, String zookeeperUrl) throws Exception {
+    writeToZookeeper(path, Files.readAllBytes(Paths.get(filePath)), zookeeperUrl);
+  }
+
+  public static void writerGlobalConfigToZookeeper(byte[] configData, String zookeeperUrl) throws Exception {
+    writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, configData, zookeeperUrl);
+  }
+
+  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
+    writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, zookeeperUrl);
+  }
+
+  public static void writeToZookeeper(String path, byte[] configData, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      client.setData().forPath(path, configData);
+    } catch (KeeperException.NoNodeException e) {
+      client.create().creatingParentsIfNeeded().forPath(path, configData);
+    }
+    client.close();
+  }
+
+  public static void updateConfigsFromZookeeper(Configurations configurations, CuratorFramework client) throws Exception {
+    configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
+    List<String> sensorTypes = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
+    for(String sensorType: sensorTypes) {
+      configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
+    }
+  }
+
+  public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
+    return readConfigBytesFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
+  }
+
+  public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+    return readConfigBytesFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+  }
+
+  public static byte[] readConfigBytesFromZookeeper(String path, CuratorFramework client) throws Exception {
+    return client.getData().forPath(path);
+  }
+
+  public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
+    ConfigurationsUtils.writerGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), zookeeperUrl);
+    Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
+    for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
+    }
+  }
+
+  public static byte[] readGlobalConfigFromFile(String rootFilePath) throws IOException {
+    return Files.readAllBytes(Paths.get(rootFilePath, Constants.GLOBAL_CONFIG_NAME + ".json"));
+  }
+
+  public static Map<String, byte[]> readSensorEnrichmentConfigsFromFile(String rootPath) throws IOException {
+    Map<String, byte[]> sensorEnrichmentConfigs = new HashMap<>();
+    for(File file: new File(rootPath, Constants.SENSORS_CONFIG_NAME).listFiles()) {
+      sensorEnrichmentConfigs.put(FilenameUtils.removeExtension(file.getName()), Files.readAllBytes(file.toPath()));
+    }
+    return sensorEnrichmentConfigs;
+  }
+
+  public static void dumpConfigs(String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    List<String> children = client.getChildren().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+    for (String child : children) {
+      byte[] data = client.getData().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + child);
+      System.out.println("Config for source " + child);
+      System.out.println(new String(data));
+      System.out.println();
+    }
+    client.close();
+  }
+
+  public static void main(String[] args) {
+
+    Options options = new Options();
+    {
+      Option o = new Option("h", "help", false, "This screen");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("p", "config_files", true, "Path to the source config files.  Must be named like \"$source\"-config.json");
+      o.setArgName("DIR_NAME");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("z", "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+      o.setArgName("ZK_QUORUM");
+      o.setRequired(true);
+      options.addOption(o);
+    }
+
+    try {
+      CommandLineParser parser = new PosixParser();
+      CommandLine cmd = null;
+      try {
+        cmd = parser.parse(options, args);
+      } catch (ParseException pe) {
+        pe.printStackTrace();
+        final HelpFormatter usageFormatter = new HelpFormatter();
+        usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
+        System.exit(-1);
+      }
+      if (cmd.hasOption("h")) {
+        final HelpFormatter usageFormatter = new HelpFormatter();
+        usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
+        System.exit(0);
+      }
+
+      String zkQuorum = cmd.getOptionValue("z");
+      if (cmd.hasOption("p")) {
+        String sourcePath = cmd.getOptionValue("p");
+        uploadConfigsToZookeeper(sourcePath, zkQuorum);
+      }
+
+      ConfigurationsUtils.dumpConfigs(zkQuorum);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      System.exit(-1);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
index 93b0a58..cffcd68 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
@@ -75,4 +75,8 @@ public enum JSONUtils {
       return _mapper.get().writeValueAsString(o);
     }
   }
+
+  public byte[] toJSON(Object config) throws JsonProcessingException {
+    return _mapper.get().writeValueAsBytes(config);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
index b257b24..291b849 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
 import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.utils.ConfigUtils;
@@ -62,7 +62,7 @@ public abstract class HBaseWriter implements MessageWriter<JSONObject>, Serializ
   }
 
   @Override
-  public void write(String sourceType, SourceConfig configuration, Tuple tuple, JSONObject message) throws Exception {
+  public void write(String sourceType, Configurations configurations, Tuple tuple, JSONObject message) throws Exception {
     Put put = new Put(getKey(tuple, message));
     Map<String, byte[]> values = getValues(tuple, message);
     for(String column: values.keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
index 9b627e6..c3a930c 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
@@ -18,14 +18,14 @@
 package org.apache.metron.writer.interfaces;
 
 import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
 
 import java.util.List;
 import java.util.Map;
 
 public interface BulkMessageWriter<T> extends AutoCloseable {
 
-  void init(Map stormConf);
-  void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<T> messages) throws Exception;
+  void init(Map stormConf, Configurations configuration) throws Exception;
+  void write(String sensorType, Configurations configurations, List<Tuple> tuples, List<T> messages) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
index 12de836..25c8a5a 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
@@ -18,10 +18,10 @@
 package org.apache.metron.writer.interfaces;
 
 import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
 
 public interface MessageWriter<T> extends AutoCloseable {
 
   void init();
-  void write(String sourceType, SourceConfig configuration, Tuple tuple, T message) throws Exception;
+  void write(String sensorType, Configurations configurations, Tuple tuple, T message) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/test/java/org/apache/metron/pcap/PcapUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/pcap/PcapUtilsTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/pcap/PcapUtilsTest.java
new file mode 100644
index 0000000..fa2385c
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/pcap/PcapUtilsTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class PcapUtilsTest {
+
+  @Test
+  public void testConvertHexToIpv4Ip() {
+    String hex = "c0a88a9e";
+    String ipAddress = PcapUtils.convertHexToIpv4Ip(hex);
+    Assert.assertEquals("192.168.138.158", ipAddress);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
deleted file mode 100644
index 34109b8..0000000
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/bro-config.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
-  "index": "bro",
-  "batchSize": 5,
-  "enrichmentFieldMap":
-  {
-    "geo": ["ip_dst_addr", "ip_src_addr"],
-    "host": ["host"]
-  },
-  "threatIntelFieldMap":
-  {
-    "ip": ["ip_dst_addr", "ip_src_addr"]
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/test/resources/config/source/pcap-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/pcap-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/pcap-config.json
deleted file mode 100644
index 82c7c5e..0000000
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/pcap-config.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
-  "index": "pcap",
-  "batchSize": 5,
-  "enrichmentFieldMap":
-  {
-    "geo": ["ip_src_addr", "ip_dst_addr"],
-    "host": ["ip_src_addr", "ip_dst_addr"]
-  },
-  "threatIntelFieldMap":
-  {
-    "ip": ["ip_src_addr", "ip_dst_addr"]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
deleted file mode 100644
index 1208637..0000000
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/snort-config.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
-  "index": "snort",
-  "batchSize": 1,
-  "enrichmentFieldMap":
-  {
-    "geo": ["ip_dst_addr", "ip_src_addr"],
-    "host": ["host"]
-  },
-  "threatIntelFieldMap":
-  {
-    "ip": ["ip_dst_addr", "ip_src_addr"]
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json b/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
deleted file mode 100644
index 65de961..0000000
--- a/metron-streaming/Metron-Common/src/test/resources/config/source/yaf-config.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
-  "index": "yaf",
-  "batchSize": 5,
-  "enrichmentFieldMap":
-  {
-    "geo": ["ip_dst_addr", "ip_src_addr"],
-    "host": ["host"]
-  },
-  "threatIntelFieldMap":
-  {
-    "ip": ["ip_dst_addr", "ip_src_addr"]
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/pom.xml b/metron-streaming/Metron-Elasticsearch/pom.xml
new file mode 100644
index 0000000..ab9242a
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/pom.xml
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>Metron-Streaming</artifactId>
+        <version>0.1BETA</version>
+    </parent>
+    <artifactId>Metron-Elasticsearch</artifactId>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${global_elasticsearch_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Testing</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Topologies</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${global_mockito_version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+    <reporting>
+        <plugins>
+            <!-- Normally, dependency report takes time, skip it -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-project-info-reports-plugin</artifactId>
+                <version>2.7</version>
+
+                <configuration>
+                    <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>emma-maven-plugin</artifactId>
+                <version>1.0-alpha-3</version>
+                <inherited>true</inherited>
+            </plugin>
+        </plugins>
+    </reporting>
+
+    <build>
+        <plugins>
+            <plugin>
+                <!-- Separates the unit tests from the integration tests. -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.12.4</version>
+                <configuration>
+                    <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+                    <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+                    <skip>true</skip>
+                    <!-- Show 100% of the lines from the stack trace (doesn't work) -->
+                    <trimStackTrace>false</trimStackTrace>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>unit-tests</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Never skip running the tests when the test phase is invoked -->
+                            <skip>false</skip>
+                            <includes>
+                                <!-- Include unit tests within integration-test phase. -->
+                                <include>**/*Test.java</include>
+                            </includes>
+                            <excludes>
+                                <!-- Exclude integration tests within (unit) test phase. -->
+                                <exclude>**/*IntegrationTest.java</exclude>
+                            </excludes>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>integration-tests</id>
+                        <phase>integration-test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Never skip running the tests when the integration-test phase is invoked -->
+                            <skip>false</skip>
+                            <includes>
+                                <!-- Include integration tests within integration-test phase. -->
+                                <include>**/*IntegrationTest.java</include>
+                            </includes>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.3</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>storm:storm-core:*</exclude>
+                                    <exclude>storm:storm-lib:*</exclude>
+                                    <exclude>org.slf4j.impl*</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                    <resource>.yaml</resource>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/main/assembly/assembly.xml</descriptor>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/main/assembly/assembly.xml b/metron-streaming/Metron-Elasticsearch/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..35cbcc3
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/src/main/assembly/assembly.xml
@@ -0,0 +1,41 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<assembly>
+  <id>archive</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${project.basedir}/src/main/resources/Metron_Configs/etc</directory>
+      <outputDirectory>/config/etc</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0644</fileMode>
+      <lineEnding>unix</lineEnding>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/target</directory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+      <outputDirectory>/lib</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+    </fileSet>
+  </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
new file mode 100644
index 0000000..45631f2
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.domain.SensorEnrichmentConfig;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
+
+  private Map<String, String> optionalSettings;
+  private transient TransportClient client;
+  private SimpleDateFormat dateFormat;
+  private static final Logger LOG = LoggerFactory
+          .getLogger(ElasticsearchWriter.class);
+
+  public ElasticsearchWriter withOptionalSettings(Map<String, String> optionalSettings) {
+    this.optionalSettings = optionalSettings;
+    return this;
+  }
+
+  @Override
+  public void init(Map stormConf, Configurations configurations) {
+    Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
+    ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
+    builder.put("cluster.name", globalConfiguration.get("es.clustername"));
+    builder.put("client.transport.ping_timeout","500s");
+    if (optionalSettings != null) {
+      builder.put(optionalSettings);
+    }
+    client = new TransportClient(builder.build())
+            .addTransportAddress(new InetSocketTransportAddress((String) globalConfiguration.get("es.ip"), (Integer) globalConfiguration.get("es.port")));
+    dateFormat = new SimpleDateFormat((String) globalConfiguration.get("es.date.format"));
+
+  }
+
+  @Override
+  public void write(String sensorType, Configurations configurations, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
+    SensorEnrichmentConfig sensorEnrichmentConfig = configurations.getSensorEnrichmentConfig(sensorType);
+    String indexPostfix = dateFormat.format(new Date());
+    BulkRequestBuilder bulkRequest = client.prepareBulk();
+    for(JSONObject message: messages) {
+      String indexName = sensorType;
+      if (sensorEnrichmentConfig != null) {
+        indexName = sensorEnrichmentConfig.getIndex();
+      }
+      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_index_" + indexPostfix,
+              sensorType);
+
+      indexRequestBuilder.setSource(message.toJSONString());
+      bulkRequest.add(indexRequestBuilder);
+    }
+    BulkResponse resp = bulkRequest.execute().actionGet();
+    if (resp.hasFailures()) {
+      throw new Exception(resp.buildFailureMessage());
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    client.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/src/main/resources/Metron_Configs/etc/env/elasticsearch.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/main/resources/Metron_Configs/etc/env/elasticsearch.properties b/metron-streaming/Metron-Elasticsearch/src/main/resources/Metron_Configs/etc/env/elasticsearch.properties
new file mode 100644
index 0000000..1381b49
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/src/main/resources/Metron_Configs/etc/env/elasticsearch.properties
@@ -0,0 +1,109 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+##### Kafka #####
+
+kafka.zk=node1:2181
+kafka.broker=node1:6667
+spout.kafka.topic.asa=asa
+spout.kafka.topic.bro=bro
+spout.kafka.topic.fireeye=fireeye
+spout.kafka.topic.ise=ise
+spout.kafka.topic.lancope=lancope
+spout.kafka.topic.paloalto=paloalto
+spout.kafka.topic.pcap=pcap
+spout.kafka.topic.snort=snort
+spout.kafka.topic.yaf=yaf
+
+##### Indexing #####
+writer.class.name=org.apache.metron.writer.ElasticsearchWriter
+
+##### ElasticSearch #####
+
+es.ip=10.22.0.214
+es.port=9300
+es.clustername=elasticsearch
+
+##### MySQL #####
+
+mysql.ip=10.22.0.214
+mysql.port=3306
+mysql.username=root
+mysql.password=hadoop123
+
+##### Metrics #####
+
+#reporters
+org.apache.metron.metrics.reporter.graphite=true
+org.apache.metron.metrics.reporter.console=false
+org.apache.metron.metrics.reporter.jmx=false
+
+#Graphite Addresses
+
+org.apache.metron.metrics.graphite.address=localhost
+org.apache.metron.metrics.graphite.port=2023
+
+#TelemetryParserBolt
+org.apache.metron.metrics.TelemetryParserBolt.acks=true
+org.apache.metron.metrics.TelemetryParserBolt.emits=true
+org.apache.metron.metrics.TelemetryParserBolt.fails=true
+
+
+#GenericEnrichmentBolt
+org.apache.metron.metrics.GenericEnrichmentBolt.acks=true
+org.apache.metron.metrics.GenericEnrichmentBolt.emits=true
+org.apache.metron.metrics.GenericEnrichmentBolt.fails=true
+
+
+#TelemetryIndexingBolt
+org.apache.metron.metrics.TelemetryIndexingBolt.acks=true
+org.apache.metron.metrics.TelemetryIndexingBolt.emits=true
+org.apache.metron.metrics.TelemetryIndexingBolt.fails=true
+
+##### Host Enrichment #####
+
+org.apache.metron.enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\
+{"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\
+{"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}]
+
+##### HDFS #####
+
+bolt.hdfs.batch.size=5000
+bolt.hdfs.field.delimiter=|
+bolt.hdfs.file.rotation.size.in.mb=5
+bolt.hdfs.file.system.url=hdfs://iot01.cloud.hortonworks.com:8020
+bolt.hdfs.wip.file.path=/paloalto/wip
+bolt.hdfs.finished.file.path=/paloalto/rotated
+bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec
+index.hdfs.output=/tmp/metron/enriched
+
+##### HBase #####
+bolt.hbase.table.name=pcap
+bolt.hbase.table.fields=t:value
+bolt.hbase.table.key.tuple.field.name=key
+bolt.hbase.table.timestamp.tuple.field.name=timestamp
+bolt.hbase.enable.batching=false
+bolt.hbase.write.buffer.size.in.bytes=2000000
+bolt.hbase.durability=SKIP_WAL
+bolt.hbase.partitioner.region.info.refresh.interval.mins=60
+
+##### Threat Intel #####
+
+threat.intel.tracker.table=
+threat.intel.tracker.cf=
+threat.intel.ip.table=
+threat.intel.ip.cf=

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
new file mode 100644
index 0000000..2765c25
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import org.apache.metron.integration.util.integration.ComponentRunner;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.Processor;
+import org.apache.metron.integration.util.integration.ReadinessState;
+import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ElasticsearchEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
+
+  private String indexDir = "target/elasticsearch";
+  private String dateFormat = "yyyy.MM.dd.hh";
+  private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
+
+  @Override
+  InMemoryComponent getSearchComponent(final Properties topologyProperties) {
+    return new ElasticSearchComponent.Builder()
+            .withHttpPort(9211)
+            .withIndexDir(new File(indexDir))
+            .build();
+  }
+
+  @Override
+  Processor<List<Map<String, Object>>> getProcessor(final List<byte[]> inputMessages) {
+    return new Processor<List<Map<String, Object>>>() {
+      List<Map<String, Object>> docs = null;
+      public ReadinessState process(ComponentRunner runner) {
+        ElasticSearchComponent elasticSearchComponent = runner.getComponent("search", ElasticSearchComponent.class);
+        if (elasticSearchComponent.hasIndex(index)) {
+          List<Map<String, Object>> docsFromDisk;
+          try {
+            docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf");
+            docsFromDisk = readDocsFromDisk(hdfsDir);
+            System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
+          } catch (IOException e) {
+            throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+          }
+          if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
+            return ReadinessState.NOT_READY;
+          } else {
+            return ReadinessState.READY;
+          }
+        } else {
+          return ReadinessState.NOT_READY;
+        }
+      }
+
+      public List<Map<String, Object>> getResult() {
+        return docs;
+      }
+    };
+  }
+
+  @Override
+  void setAdditionalProperties(Properties topologyProperties) {
+    topologyProperties.setProperty("writer.class.name", "org.apache.metron.writer.ElasticsearchWriter");
+  }
+
+  @Override
+  public String cleanField(String field) {
+    return field;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
new file mode 100644
index 0000000..671c4f5
--- /dev/null
+++ b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.components;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.UnableToStartException;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.ElasticsearchClient;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.search.SearchHit;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ElasticSearchComponent implements InMemoryComponent {
+
+    public static class Builder{
+        private int httpPort;
+        private File indexDir;
+        private Map<String, String> extraElasticSearchSettings = null;
+        public Builder withHttpPort(int httpPort) {
+            this.httpPort = httpPort;
+            return this;
+        }
+        public Builder withIndexDir(File indexDir) {
+            this.indexDir = indexDir;
+            return this;
+        }
+        public Builder withExtraElasticSearchSettings(Map<String, String> extraElasticSearchSettings) {
+            this.extraElasticSearchSettings = extraElasticSearchSettings;
+            return this;
+        }
+        public ElasticSearchComponent build() {
+            return new ElasticSearchComponent(httpPort, indexDir, extraElasticSearchSettings);
+        }
+    }
+
+    private Client client;
+    private Node node;
+    private int httpPort;
+    private File indexDir;
+    private Map<String, String> extraElasticSearchSettings;
+
+    public ElasticSearchComponent(int httpPort, File indexDir) {
+        this(httpPort, indexDir, null);
+    }
+    public ElasticSearchComponent(int httpPort, File indexDir, Map<String, String> extraElasticSearchSettings) {
+        this.httpPort = httpPort;
+        this.indexDir = indexDir;
+        this.extraElasticSearchSettings = extraElasticSearchSettings;
+    }
+    public Client getClient() {
+        return client;
+    }
+
+    private void cleanDir(File dir) throws IOException {
+        if(dir.exists()) {
+            FileUtils.deleteDirectory(dir);
+        }
+        dir.mkdirs();
+    }
+    public void start() throws UnableToStartException {
+        File logDir= new File(indexDir, "/logs");
+        File dataDir= new File(indexDir, "/data");
+        try {
+            cleanDir(logDir);
+            cleanDir(dataDir);
+
+        } catch (IOException e) {
+            throw new UnableToStartException("Unable to clean log or data directories", e);
+        }
+        ImmutableSettings.Builder immutableSettings = ImmutableSettings.settingsBuilder()
+                .put("node.http.enabled", true)
+                .put("http.port", httpPort)
+                .put("cluster.name", "metron")
+                .put("path.logs",logDir.getAbsolutePath())
+                .put("path.data",dataDir.getAbsolutePath())
+                .put("gateway.type", "none")
+                .put("index.store.type", "memory")
+                .put("index.number_of_shards", 1)
+                .put("node.mode", "network")
+                .put("index.number_of_replicas", 1);
+        if(extraElasticSearchSettings != null) {
+            immutableSettings = immutableSettings.put(extraElasticSearchSettings);
+        }
+        Settings settings = immutableSettings.build();
+        node = NodeBuilder.nodeBuilder().settings(settings).node();
+        node.start();
+        settings = ImmutableSettings.settingsBuilder()
+					.put("cluster.name", "metron").build();
+		client = new TransportClient(settings)
+					.addTransportAddress(new InetSocketTransportAddress("localhost",
+							9300));
+
+        waitForCluster(client, ClusterHealthStatus.YELLOW, new TimeValue(60000));
+    }
+
+    public static void waitForCluster(ElasticsearchClient client, ClusterHealthStatus status, TimeValue timeout) throws UnableToStartException {
+        try {
+            ClusterHealthResponse healthResponse =
+                    (ClusterHealthResponse)client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(status).timeout(timeout)).actionGet();
+            if (healthResponse != null && healthResponse.isTimedOut()) {
+                throw new UnableToStartException("cluster state is " + healthResponse.getStatus().name()
+                        + " and not " + status.name()
+                        + ", from here on, everything will fail!");
+            }
+        } catch (ElasticsearchTimeoutException e) {
+            throw new UnableToStartException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations");
+        }
+    }
+
+    public List<Map<String, Object>> getAllIndexedDocs(String index, String sourceType) throws IOException {
+       return getAllIndexedDocs(index, sourceType, null);
+    }
+    public List<Map<String, Object>> getAllIndexedDocs(String index, String sourceType, String subMessage) throws IOException {
+        getClient().admin().indices().refresh(new RefreshRequest());
+        SearchResponse response = getClient().prepareSearch(index)
+                .setTypes(sourceType)
+                .setSource("message")
+                .setFrom(0)
+                .setSize(1000)
+                .execute().actionGet();
+        List<Map<String, Object>> ret = new ArrayList<Map<String, Object>>();
+        for (SearchHit hit : response.getHits()) {
+            Object o = null;
+            if(subMessage == null) {
+                o = hit.getSource();
+            }
+            else {
+                o = hit.getSource().get(subMessage);
+            }
+            ret.add((Map<String, Object>)(o));
+        }
+        return ret;
+    }
+    public boolean hasIndex(String indexName) {
+        Set<String> indices = getClient().admin()
+                                    .indices()
+                                    .stats(new IndicesStatsRequest())
+                                    .actionGet()
+                                    .getIndices()
+                                    .keySet();
+        return indices.contains(indexName);
+
+    }
+
+    public void stop() {
+        node.stop();
+        node = null;
+        client = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index a2cec5a..bfb4d91 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -25,7 +25,11 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
 
@@ -51,7 +55,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
   @Override
   public Set<String> getStreamIds(JSONObject message) {
     Set<String> streamIds = new HashSet<>();
-    String sourceType = TopologyUtils.getSourceType(message);
+    String sourceType = TopologyUtils.getSensorType(message);
     for (String enrichmentType : getFieldMap(sourceType).keySet()) {
       streamIds.add(enrichmentType);
     }
@@ -81,7 +85,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
     return message;
   }
 
-  public Map<String, List<String>> getFieldMap(String sourceType) {
-    return configurations.get(sourceType).getEnrichmentFieldMap();
+  protected Map<String, List<String>> getFieldMap(String sensorType) {
+    return configurations.getSensorEnrichmentConfig(sensorType).getEnrichmentFieldMap();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index 7970674..c37133d 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -105,10 +105,9 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
     @SuppressWarnings("unchecked")
     @Override
     public Map<String, JSONObject> splitMessage(JSONObject message) {
-
         Map<String, JSONObject> streamMessageMap = new HashMap<>();
-        String sourceType = TopologyUtils.getSourceType(message);
-        Map<String, List<String>> enrichmentFieldMap = getFieldMap(sourceType);
+        String sensorType = TopologyUtils.getSensorType(message);
+        Map<String, List<String>> enrichmentFieldMap = getFieldMap(sensorType);
         for (String enrichmentType : enrichmentFieldMap.keySet()) {
             List<String> fields = enrichmentFieldMap.get(enrichmentType);
             JSONObject enrichmentObject = new JSONObject();
@@ -116,7 +115,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
                 for (String field : fields) {
                     enrichmentObject.put(getKeyName(enrichmentType, field), message.get(field));
                 }
-                enrichmentObject.put(Constants.SOURCE_TYPE, sourceType);
+                enrichmentObject.put(Constants.SENSOR_TYPE, sensorType);
                 streamMessageMap.put(enrichmentType, enrichmentObject);
             }
         }
@@ -124,8 +123,8 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
         return streamMessageMap;
     }
 
-    protected Map<String, List<String>> getFieldMap(String sourceType) {
-        return configurations.get(sourceType).getEnrichmentFieldMap();
+    protected Map<String, List<String>> getFieldMap(String sensorType) {
+        return configurations.getSensorEnrichmentConfig(sensorType).getEnrichmentFieldMap();
     }
 
     protected String getKeyName(String type, String field) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index b5c4c44..08b223c 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -18,29 +18,26 @@
 
 package org.apache.metron.enrichment.bolt;
 
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Splitter;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Iterables;
 import org.apache.metron.Constants;
 import org.apache.metron.bolt.ConfiguredBolt;
 import org.apache.metron.domain.Enrichment;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.helpers.topology.ErrorUtils;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import org.apache.metron.helpers.topology.ErrorUtils;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Uses an adapter to enrich telemetry messages with additional metadata
@@ -157,8 +154,8 @@ public class GenericEnrichmentBolt extends ConfiguredBolt {
       for (Object o : rawMessage.keySet()) {
         String field = (String) o;
         String value = (String) rawMessage.get(field);
-        if (field.equals(Constants.SOURCE_TYPE)) {
-          enrichedMessage.put(Constants.SOURCE_TYPE, value);
+        if (field.equals(Constants.SENSOR_TYPE)) {
+          enrichedMessage.put(Constants.SENSOR_TYPE, value);
         } else {
           JSONObject enrichedField = new JSONObject();
           if (value != null && value.length() != 0) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index 3516ee0..014e0a9 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -34,8 +34,8 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
   }
 
   @Override
-  public Map<String, List<String>> getFieldMap(String sourceType) {
-    return configurations.get(sourceType).getThreatIntelFieldMap();
+  public Map<String, List<String>> getFieldMap(String sensorType) {
+    return configurations.getSensorEnrichmentConfig(sensorType).getThreatIntelFieldMap();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
index a43360e..692c327 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
@@ -29,8 +29,8 @@ public class ThreatIntelSplitterBolt extends EnrichmentSplitterBolt {
   }
 
   @Override
-  protected Map<String, List<String>> getFieldMap(String sourceType) {
-    return configurations.get(sourceType).getThreatIntelFieldMap();
+  protected Map<String, List<String>> getFieldMap(String sensorType) {
+    return configurations.getSensorEnrichmentConfig(sensorType).getThreatIntelFieldMap();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/pom.xml b/metron-streaming/Metron-Indexing/pom.xml
index 1f5d04d..d32fc07 100644
--- a/metron-streaming/Metron-Indexing/pom.xml
+++ b/metron-streaming/Metron-Indexing/pom.xml
@@ -71,11 +71,6 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.elasticsearch</groupId>
-            <artifactId>elasticsearch</artifactId>
-            <version>${global_elasticsearch_version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
             <version>${http.client.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/AbstractIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/AbstractIndexingBolt.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/AbstractIndexingBolt.java
deleted file mode 100644
index 423a5c2..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/AbstractIndexingBolt.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.indexing;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.metron.bolt.ConfiguredBolt;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-
-import com.codahale.metrics.Counter;
-import org.apache.metron.index.interfaces.IndexAdapter;
-import org.apache.metron.metrics.MetricReporter;
-
-@SuppressWarnings("rawtypes")
-public abstract class AbstractIndexingBolt extends ConfiguredBolt {
-	/**
-	 * 
-	 */
-	private static final long serialVersionUID = -6710596708304282838L;
-
-	protected static final Logger LOG = LoggerFactory
-			.getLogger(AbstractIndexingBolt.class);
-
-	protected OutputCollector _collector;
-	protected IndexAdapter _adapter;
-	protected MetricReporter _reporter;
-
-	protected String _IndexIP;
-	protected int _IndexPort = 0;
-	protected String _ClusterName;
-	protected String _IndexName;
-	protected String _DocumentName;
-	protected int _BulkIndexNumber = 10;
-
-	protected Counter ackCounter, emitCounter, failCounter;
-
-	public AbstractIndexingBolt(String zookeeperUrl) {
-		super(zookeeperUrl);
-	}
-
-	protected void registerCounters() {
-
-		String ackString = _adapter.getClass().getSimpleName() + ".ack";
-
-		String emitString = _adapter.getClass().getSimpleName() + ".emit";
-
-		String failString = _adapter.getClass().getSimpleName() + ".fail";
-
-		ackCounter = _reporter.registerCounter(ackString);
-		emitCounter = _reporter.registerCounter(emitString);
-		failCounter = _reporter.registerCounter(failString);
-
-	}
-
-	public final void prepare(Map conf, TopologyContext topologyContext,
-			OutputCollector collector) {
-		_collector = collector;
-
-		if (this._IndexIP == null)
-			throw new IllegalStateException("_IndexIP must be specified");
-		if (this._IndexPort == 0)
-			throw new IllegalStateException("_IndexPort must be specified");
-		if (this._ClusterName == null)
-			throw new IllegalStateException("_ClusterName must be specified");
-		if (this._IndexName == null)
-			throw new IllegalStateException("_IndexName must be specified");
-		if (this._DocumentName == null)
-			throw new IllegalStateException("_DocumentName must be specified");
-		if (this._adapter == null)
-			throw new IllegalStateException("IndexAdapter must be specified");
-
-		try {
-			doPrepare(conf, topologyContext, collector);
-		} catch (IOException e) {
-			LOG.error("Counld not initialize...");
-			e.printStackTrace();
-		}
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declearer) {
-		
-	}
-
-	abstract void doPrepare(Map conf, TopologyContext topologyContext,
-			OutputCollector collector) throws IOException;
-
-}


Mime
View raw message