metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [40/89] [abbrv] [partial] incubator-metron git commit: Rename all OpenSOC files to Metron
Date Tue, 26 Jan 2016 14:18:18 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-FlumeConfigs/SampleFlumeConf.rtf
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-FlumeConfigs/SampleFlumeConf.rtf b/metron-streaming/Metron-FlumeConfigs/SampleFlumeConf.rtf
new file mode 100644
index 0000000..728f08a
--- /dev/null
+++ b/metron-streaming/Metron-FlumeConfigs/SampleFlumeConf.rtf
@@ -0,0 +1,43 @@
+{\rtf1\ansi\ansicpg1252\cocoartf1265\cocoasubrtf210
+{\fonttbl\f0\fnil\fcharset0 Menlo-Regular;}
+{\colortbl;\red255\green255\blue255;}
+\margl1440\margr1440\vieww10800\viewh8400\viewkind0
+\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural
+
+\f0\fs22 \cf0 \CocoaLigature0 #agent section\
+flumesf1.sources = s\
+flumesf1.channels = c\
+flumesf1.sinks = r\
+\
+#source section\
+flumesf1.sources.s.type = syslogUdp\
+flumesf1.sources.s.port = 21001\
+flumesf1.sources.s.host = 0.0.0.0\
+flumesf1.sources.s.channels = c\
+\
+flumesf1.sinks.r.type = org.apache.flume.plugins.KafkaSink\
+\
+flumesf1.sinks.r.metadata.broker.list=dn01:9092,dn02:9092,dn03:9092,dn04:9092,dn08:9092,dn09:9092,dn10:9092\
+\
+#flumesf1.sinks.r.partition.key=0\
+#flumesf1.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition\
+flumesf1.sinks.r.serializer.class=kafka.serializer.StringEncoder\
+flumesf1.sinks.r.request.required.acks=0\
+flumesf1.sinks.r.max.message.size=1000000\
+flumesf1.sinks.r.flumesf1.type=sync\
+flumesf1.sinks.r.custom.encoding=UTF-8\
+flumesf1.sinks.r.custom.topic.name=sourcefire_raw\
+\
+\
+\
+flumesf1.sinks.a.type = file_roll\
+flumesf1.sinks.a.channel = c\
+flumesf1.sinks.a.sink.directory = /tmp/flumesf1/\
+\
+\
+#Specify the channel the sink should use\
+flumesf1.sinks.r.channel = c\
+\
+# Each channel's type is defined.\
+flumesf1.channels.c.type = memory\
+flumesf1.channels.c.capacity = 1000}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/pom.xml b/metron-streaming/Metron-Indexing/pom.xml
new file mode 100644
index 0000000..d55ab7f
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/pom.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.6BETA</version>
+	</parent>
+	<artifactId>OpenSOC-Indexing</artifactId>
+	<properties>
+       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>		
+		<elastic.search.version>1.3.1</elastic.search.version>
+		<http.client.version>4.3.4</http.client.version>
+		<jsonsimple.version>1.1.1</jsonsimple.version>
+	</properties>
+	<dependencies>
+
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${project.parent.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${global_storm_version}</version>
+			<scope>provided</scope>
+			  <exclusions>
+				<exclusion>
+				   <artifactId>servlet-api</artifactId>
+				   <groupId>javax.servlet</groupId>
+				  </exclusion>
+		    </exclusions>					
+		</dependency>
+		<dependency>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${elastic.search.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpclient</artifactId>
+			<version>${http.client.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>${jsonsimple.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>commons-configuration</groupId>
+			<artifactId>commons-configuration</artifactId>
+			<version>1.9</version>
+			<scope>provided</scope>
+		</dependency>
+  		<dependency>
+  			<groupId>junit</groupId>
+  			<artifactId>junit</artifactId>
+  			<version>3.8.2</version>
+  		</dependency>	
+
+	</dependencies>
+<reporting>
+    <plugins>
+	<!-- Normally, dependency report takes time, skip it -->
+      <plugin>
+		<groupId>org.apache.maven.plugins</groupId>
+		<artifactId>maven-project-info-reports-plugin</artifactId>
+		<version>2.7</version>
+	 
+		<configuration>
+	          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+		</configuration>
+      </plugin>
+ 
+      <plugin>
+		<groupId>org.codehaus.mojo</groupId>
+		<artifactId>emma-maven-plugin</artifactId>
+		<version>1.0-alpha-3</version>
+		<inherited>true</inherited>
+      </plugin>
+    </plugins>
+  </reporting>  	
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Indexing/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/pom.xml.versionsBackup b/metron-streaming/Metron-Indexing/pom.xml.versionsBackup
new file mode 100644
index 0000000..11bf51e
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/pom.xml.versionsBackup
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+	<artifactId>OpenSOC-Indexing</artifactId>
+	<properties>
+		<opensoc.common.version>0.0.1-SNAPSHOT</opensoc.common.version>
+		<storm.version>0.9.1-incubating</storm.version>
+		<elastic.search.version>1.2.1</elastic.search.version>
+		<http.client.version>4.3.4</http.client.version>
+		<jsonsimple.version>1.1.1</jsonsimple.version>
+	</properties>
+	<dependencies>
+
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${opensoc.common.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${storm.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>elasticsearch</artifactId>
+			<version>${elastic.search.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpclient</artifactId>
+			<version>${http.client.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>${jsonsimple.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>commons-configuration</groupId>
+			<artifactId>commons-configuration</artifactId>
+			<version>1.9</version>
+			<scope>provided</scope>
+		</dependency>
+  		<dependency>
+  			<groupId>junit</groupId>
+  			<artifactId>junit</artifactId>
+  			<version>3.8.2</version>
+  		</dependency>	
+
+	</dependencies>
+<reporting>
+    <plugins>
+	<!-- Normally, dependency report takes time, skip it -->
+      <plugin>
+		<groupId>org.apache.maven.plugins</groupId>
+		<artifactId>maven-project-info-reports-plugin</artifactId>
+		<version>2.7</version>
+	 
+		<configuration>
+	          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+		</configuration>
+      </plugin>
+ 
+      <plugin>
+		<groupId>org.codehaus.mojo</groupId>
+		<artifactId>emma-maven-plugin</artifactId>
+		<version>1.0-alpha-3</version>
+		<inherited>true</inherited>
+      </plugin>
+    </plugins>
+  </reporting>  	
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Indexing/readme.md
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/readme.md b/metron-streaming/Metron-Indexing/readme.md
new file mode 100644
index 0000000..bd9d7ac
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/readme.md
@@ -0,0 +1,61 @@
+#OpenSOC-Indexing
+
+##Module Description
+
+This module provides the indexing capability to OpenSOC components.  The primary indexing engine for now is Elastic Search, but Solr may be supported at some point in the future as well.  There are three types of messages that are commonly indexed in OpenSOC topologies: messages, alerts, and errors.  Messages are telemetry messages parsed by the parser bolt.  Alerts are alerts generated by the alerts bolt.  Errors are an optional feature where each OpenSOC bolt in addition to outputting errors in the log file will also index them for immediate analysis.
+
+###Index bolt
+
+The signature of the index bolt is as follows:
+
+```
+TelemetryIndexingBolt indexing_bolt = new TelemetryIndexingBolt()
+.withIndexIP(config.getString("es.ip"))
+.withIndexPort(config.getInt("es.port"))
+.withClusterName(config.getString("es.clustername"))
+.withIndexName(
+config.getString("bolt.error.indexing.indexname"))
+.withDocumentName(
+config.getString("bolt.error.indexing.documentname"))
+.withBulk(config.getInt("bolt.error.indexing.bulk"))
+.withIndexAdapter(adapter)
+.withMetricConfiguration(config);
+
+```
+
+###IndexAdapters
+
+*com.opensoc.indexing.adapters.ESBaseBulkAdapter - bulk ingest messages into Elastic Search
+*com.opensoc.indexing.adapters.ESBaseBulkRotatingAdapter - does everything adapter above does, but is able to rotate the index names based on size
+*com.opensoc.indexing.adapters.ESTimedBulkRotatingAdapter - does everything adapter above does, but is able to rotate the index names based on size and time
+*com.opensoc.indexing.adapters.SolrAdapter - currently under development
+
+/etc/ directory contains all environment-related configs
+
+##Sample Input and Generator Spout
+
+The sample input for topologies provided in this release was checked in here:
+
+```
+https://github.com/OpenSOC/opensoc-streaming/tree/master/OpenSOC-Topologies/src/main/resources/SampleInput
+```
+
+We provide a generator spout that is able to drive these topologies.  In production we run with the kafka spout, but for documentation on that please reference the Storm project documentation
+
+The generator spout comes with the following signature:
+
+```
+GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+.withFilename(test_file_path).withRepeating(
+config.getBoolean("spout.test.parallelism.repeat"));
+```
+
+* the repeat variable defines if the generator spout will loop through the input or stop once it gets to the end of file
+
+###Additional Storm Bolts
+In addition to custom bolts developed for OpenSOC we utilize standard bolts and spouts included with the Storm release.  We will not provide documentation for these spouts and bolts since they are provided as part of Storm.  These spouts bolts are:
+
+* KafkaSpout
+* KafkaBolt
+* HDFSBolt
+* HBaseBolt
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/AbstractIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/AbstractIndexingBolt.java b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/AbstractIndexingBolt.java
new file mode 100644
index 0000000..d70d4b3
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/AbstractIndexingBolt.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.indexing;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+
+import com.codahale.metrics.Counter;
+import com.opensoc.index.interfaces.IndexAdapter;
+import com.opensoc.metrics.MetricReporter;
+
+@SuppressWarnings("rawtypes")
+public abstract class AbstractIndexingBolt extends BaseRichBolt {
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -6710596708304282838L;
+
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractIndexingBolt.class);
+
+	protected OutputCollector _collector;
+	protected IndexAdapter _adapter;
+	protected MetricReporter _reporter;
+
+	protected String _IndexIP;
+	protected int _IndexPort = 0;
+	protected String _ClusterName;
+	protected String _IndexName;
+	protected String _DocumentName;
+	protected int _BulkIndexNumber = 10;
+
+	protected Counter ackCounter, emitCounter, failCounter;
+
+	protected void registerCounters() {
+
+		String ackString = _adapter.getClass().getSimpleName() + ".ack";
+
+		String emitString = _adapter.getClass().getSimpleName() + ".emit";
+
+		String failString = _adapter.getClass().getSimpleName() + ".fail";
+
+		ackCounter = _reporter.registerCounter(ackString);
+		emitCounter = _reporter.registerCounter(emitString);
+		failCounter = _reporter.registerCounter(failString);
+
+	}
+
+	public final void prepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) {
+		_collector = collector;
+
+		if (this._IndexIP == null)
+			throw new IllegalStateException("_IndexIP must be specified");
+		if (this._IndexPort == 0)
+			throw new IllegalStateException("_IndexPort must be specified");
+		if (this._ClusterName == null)
+			throw new IllegalStateException("_ClusterName must be specified");
+		if (this._IndexName == null)
+			throw new IllegalStateException("_IndexName must be specified");
+		if (this._DocumentName == null)
+			throw new IllegalStateException("_DocumentName must be specified");
+		if (this._adapter == null)
+			throw new IllegalStateException("IndexAdapter must be specified");
+
+		try {
+			doPrepare(conf, topologyContext, collector);
+		} catch (IOException e) {
+			LOG.error("Counld not initialize...");
+			e.printStackTrace();
+		}
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declearer) {
+		
+	}
+
+	abstract void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/TelemetryIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/TelemetryIndexingBolt.java b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/TelemetryIndexingBolt.java
new file mode 100644
index 0000000..2c4e0a9
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/TelemetryIndexingBolt.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.indexing;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONObject;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.opensoc.helpers.topology.ErrorGenerator;
+import com.opensoc.index.interfaces.IndexAdapter;
+import com.opensoc.json.serialization.JSONEncoderHelper;
+import com.opensoc.metrics.MetricReporter;
+
+/**
+ * 
+ * Bolt for indexing telemetry messages into Elastic Search, Solr, Druid, etc...
+ * For a list of all adapters provided please see com.opensoc.indexing.adapters
+ * 
+ * As of release of this code the following adapters for indexing are provided:
+ * <p>
+ * <ul>
+ * 
+ * <li>ESBulkAdapter = adapter that can bulk index messages into ES
+ * <li>ESBulkRotatingAdapter = adapter that can bulk index messages into ES,
+ * rotate the index, and apply an alias to the rotated index
+ * <ul>
+ * <p>
+ *
+ */
+
+@SuppressWarnings("serial")
+public class TelemetryIndexingBolt extends AbstractIndexingBolt {
+
+	private JSONObject metricConfiguration;
+	private String _indexDateFormat;
+	
+	private Set<Tuple> tuple_queue = new HashSet<Tuple>();
+
+	/**
+	 * 
+	 * @param IndexIP
+	 *            ip of ElasticSearch/Solr/etc...
+	 * @return instance of bolt
+	 */
+	public TelemetryIndexingBolt withIndexIP(String IndexIP) {
+		_IndexIP = IndexIP;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param IndexPort
+	 *            port of ElasticSearch/Solr/etc...
+	 * @return instance of bolt
+	 */
+
+	public TelemetryIndexingBolt withIndexPort(int IndexPort) {
+		_IndexPort = IndexPort;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param IndexName
+	 *            name of the index in ElasticSearch/Solr/etc...
+	 * @return instance of bolt
+	 */
+	public TelemetryIndexingBolt withIndexName(String IndexName) {
+		_IndexName = IndexName;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param ClusterName
+	 *            name of cluster to index into in ElasticSearch/Solr/etc...
+	 * @return instance of bolt
+	 */
+	public TelemetryIndexingBolt withClusterName(String ClusterName) {
+		_ClusterName = ClusterName;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param DocumentName
+	 *            name of document to be indexed in ElasticSearch/Solr/etc...
+	 * @return
+	 */
+
+	public TelemetryIndexingBolt withDocumentName(String DocumentName) {
+		_DocumentName = DocumentName;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param BulkIndexNumber
+	 *            number of documents to bulk index together
+	 * @return instance of bolt
+	 */
+	public TelemetryIndexingBolt withBulk(int BulkIndexNumber) {
+		_BulkIndexNumber = BulkIndexNumber;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param adapter
+	 *            adapter that handles indexing of JSON strings
+	 * @return instance of bolt
+	 */
+	public TelemetryIndexingBolt withIndexAdapter(IndexAdapter adapter) {
+		_adapter = adapter;
+
+		return this;
+	}
+	
+	/**
+	 * 
+	 * @param dateFormat
+	 *           timestamp to append to index names
+	 * @return instance of bolt
+	 */
+	public TelemetryIndexingBolt withIndexTimestamp(String indexTimestamp) {
+		_indexDateFormat = indexTimestamp;
+
+		return this;
+	}
+	/**
+	 * 
+	 * @param config
+	 *            - configuration for pushing metrics into graphite
+	 * @return instance of bolt
+	 */
+	public TelemetryIndexingBolt withMetricConfiguration(Configuration config) {
+		this.metricConfiguration = JSONEncoderHelper.getJSON(config
+				.subset("com.opensoc.metrics"));
+		return this;
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException {
+
+		try {
+			
+			_adapter.initializeConnection(_IndexIP, _IndexPort,
+					_ClusterName, _IndexName, _DocumentName, _BulkIndexNumber, _indexDateFormat);
+			
+			_reporter = new MetricReporter();
+			_reporter.initialize(metricConfiguration,
+					TelemetryIndexingBolt.class);
+			this.registerCounters();
+		} catch (Exception e) {
+			
+			e.printStackTrace();
+					
+			JSONObject error = ErrorGenerator.generateErrorMessage(new String("bulk index problem"), e);
+			_collector.emit("error", new Values(error));
+		}
+
+	}
+
+	public void execute(Tuple tuple) {
+
+		JSONObject message = null;
+
+		try {
+			LOG.trace("[OpenSOC] Indexing bolt gets:  " + message);
+
+			message = (JSONObject) tuple.getValueByField("message");
+
+			if (message == null || message.isEmpty())
+				throw new Exception(
+						"Could not parse message from binary stream");
+
+			int result_code = _adapter.bulkIndex(message);
+
+			if (result_code == 0) {
+				tuple_queue.add(tuple);
+			} else if (result_code == 1) {
+				tuple_queue.add(tuple);
+				
+				Iterator<Tuple> iterator = tuple_queue.iterator();
+				while(iterator.hasNext())
+				{
+					Tuple setElement = iterator.next();
+					_collector.ack(setElement);
+					ackCounter.inc();
+				}
+				tuple_queue.clear();
+			} else if (result_code == 2) {
+				throw new Exception("Failed to index elements with client");
+			}
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			
+			
+			Iterator<Tuple> iterator = tuple_queue.iterator();
+			while(iterator.hasNext())
+			{
+				Tuple setElement = iterator.next();
+				_collector.fail(setElement);
+				failCounter.inc();
+				
+				
+				JSONObject error = ErrorGenerator.generateErrorMessage(new String("bulk index problem"), e);
+				_collector.emit("error", new Values(error));
+			}
+			tuple_queue.clear();
+
+			
+		}
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declearer) {
+		declearer.declareStream("error", new Fields("Index"));
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/AbstractIndexAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/AbstractIndexAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/AbstractIndexAdapter.java
new file mode 100644
index 0000000..6dafbe7
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/AbstractIndexAdapter.java
@@ -0,0 +1,25 @@
+package com.opensoc.indexing.adapters;
+
+import java.io.Serializable;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.index.interfaces.IndexAdapter;
+import com.opensoc.indexing.AbstractIndexingBolt;
+
+@SuppressWarnings("serial")
+public abstract class AbstractIndexAdapter implements IndexAdapter, Serializable{
+	
+	protected static final Logger _LOG = LoggerFactory
+			.getLogger(AbstractIndexingBolt.class);
+
+
+	
+
+	abstract public boolean initializeConnection(String ip, int port,
+			String cluster_name, String index_name, String document_name,
+			int bulk, String date_format) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESBaseBulkAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESBaseBulkAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESBaseBulkAdapter.java
new file mode 100644
index 0000000..e5ed283
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESBaseBulkAdapter.java
@@ -0,0 +1,148 @@
+package com.opensoc.indexing.adapters;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.collections.Bag;
+import org.apache.commons.collections.bag.HashBag;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.json.simple.JSONObject;
+
+@SuppressWarnings("serial")
+public class ESBaseBulkAdapter extends AbstractIndexAdapter implements
+		Serializable {
+
+	private int _bulk_size;
+	private String _index_name;
+	private String _document_name;
+	private String _cluster_name;
+	private int _port;
+	private String _ip;
+	public transient TransportClient client;
+
+	private Bag bulk_set;
+
+	private Settings settings;
+
+	@Override
+	public boolean initializeConnection(String ip, int port,
+			String cluster_name, String index_name, String document_name,
+			int bulk_size, String date_format) throws Exception {
+
+		bulk_set = new HashBag();
+
+		_LOG.trace("[OpenSOC] Initializing ESBulkAdapter...");
+
+		try {
+			_ip = ip;
+			_port = port;
+			_cluster_name = cluster_name;
+			_index_name = index_name;
+			_document_name = document_name;
+			_bulk_size = bulk_size;
+
+			_LOG.trace("[OpenSOC] Bulk indexing is set to: " + _bulk_size);
+
+			settings = ImmutableSettings.settingsBuilder()
+					.put("cluster.name", _cluster_name).build();
+			client = new TransportClient(settings)
+					.addTransportAddress(new InetSocketTransportAddress(_ip,
+							_port));
+
+			return true;
+		} catch (Exception e) {
+			e.printStackTrace();
+			return false;
+		}
+	}
+
+	/**
+	 * @param raw_message
+	 *            message to bulk index in Elastic Search
+	 * @return integer (0) loaded into a bulk queue, (1) bulk indexing executed,
+	 *         (2) error
+	 */
+	@SuppressWarnings("unchecked")
+	public int bulkIndex(JSONObject raw_message) {
+
+		boolean success = true;
+		int set_size = 0;
+
+		synchronized (bulk_set) {
+			bulk_set.add(raw_message);
+			set_size = bulk_set.size();
+			
+			_LOG.trace("[OpenSOC] Bulk size is now: " + bulk_set.size());
+		}
+
+		try {
+
+			if (set_size >= _bulk_size) {
+				success = doIndex();
+
+				if (success)
+					return 1;
+				else
+					return 2;
+			}
+
+			return 0;
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			return 2;
+		}
+	}
+
+	public boolean doIndex() throws Exception {
+
+		try {
+
+			synchronized (bulk_set) {
+				if (client == null)
+					throw new Exception("client is null");
+
+				BulkRequestBuilder bulkRequest = client.prepareBulk();
+
+				Iterator<JSONObject> iterator = bulk_set.iterator();
+
+				while (iterator.hasNext()) {
+					JSONObject setElement = iterator.next();
+
+					IndexRequestBuilder a = client.prepareIndex(_index_name,
+							_document_name);
+					a.setSource(setElement.toString());
+					bulkRequest.add(a);
+
+				}
+
+				_LOG.trace("[OpenSOC] Performing bulk load of size: "
+						+ bulkRequest.numberOfActions());
+
+				BulkResponse resp = bulkRequest.execute().actionGet();
+				_LOG.trace("[OpenSOC] Received bulk response: "
+						+ resp.toString());
+				bulk_set.clear();
+			}
+
+			return true;
+		}
+
+		catch (Exception e) {
+			e.printStackTrace();
+			return false;
+		}
+	}
+
+	public void setOptionalSettings(Map<String, String> settings) {
+		// TODO Auto-generated method stub
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java
new file mode 100644
index 0000000..ebdc7b0
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java
@@ -0,0 +1,160 @@
+package com.opensoc.indexing.adapters;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.json.simple.JSONObject;
+
+@SuppressWarnings({ "deprecation", "serial" })
+public class ESBulkRotatingAdapter extends AbstractIndexAdapter {
+
+	private Client client;
+	private BulkRequestBuilder bulkRequest;
+	private int _bulk_size;
+	private String _index_name;
+	private String _document_name;
+	private int element_count;
+	private String index_postfix;
+	private String running_index_postfix;
+
+	private HttpClient httpclient;
+	private HttpPost post;
+
+	private DateFormat dateFormat;
+
+	public boolean initializeConnection(String ip, int port,
+			String cluster_name, String index_name, String document_name,
+			int bulk_size, String date_format) {
+
+		_LOG.info("Initializing ESBulkAdapter...");
+
+		try {
+			httpclient = new DefaultHttpClient();
+			String alias_link = "http://" + ip + ":" + 9200 + "/_aliases";
+			post = new HttpPost(alias_link);
+
+			_index_name = index_name;
+			_document_name = document_name;
+
+			_bulk_size = bulk_size - 1;
+			
+
+			dateFormat = new SimpleDateFormat(date_format);
+			
+			element_count = 0;
+			running_index_postfix = "NONE";
+
+			Settings settings = ImmutableSettings.settingsBuilder()
+					.put("cluster.name", cluster_name).build();
+			client = new TransportClient(settings)
+					.addTransportAddress(new InetSocketTransportAddress(ip,
+							port));
+
+			bulkRequest = client.prepareBulk();
+
+			return true;
+		} catch (Exception e) {
+			e.printStackTrace();
+			return false;
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public int bulkIndex(JSONObject raw_message) {
+
+		index_postfix = dateFormat.format(new Date());
+
+		bulkRequest.add(client.prepareIndex(_index_name + "_" + index_postfix,
+				_document_name).setSource(raw_message));
+
+		return doIndex();
+	}
+
+	public int bulkIndex(String raw_message) {
+
+		index_postfix = dateFormat.format(new Date());
+
+		bulkRequest.add(client.prepareIndex(_index_name + "_" + index_postfix,
+				_document_name).setSource(raw_message));
+
+		return doIndex();
+	}
+
+	public int doIndex() {
+
+		element_count++;
+		
+		if (element_count != _bulk_size)
+			return 0;
+
+		if (element_count == _bulk_size) {
+			_LOG.debug("Starting bulk load of size: " + _bulk_size);
+			BulkResponse resp = bulkRequest.execute().actionGet();
+			element_count = 0;
+			_LOG.debug("Received bulk response: " + resp.toString());
+
+			if (resp.hasFailures()) {
+				_LOG.error("Bulk update failed");
+				return 2;
+			}
+
+			if (!running_index_postfix.equals(index_postfix)) {
+
+				_LOG.debug("Attempting to apply a new alias");
+
+				try {
+
+					String alias = "{\"actions\" : [{ \"add\" : { \"index\" : \""
+							+ _index_name
+							+ "-"
+							+ index_postfix
+							+ "\", \"alias\" : \"" + _index_name + "\" } } ]}";
+
+					post.setEntity(new StringEntity(alias));
+
+					HttpResponse response = httpclient.execute(post);
+					String res = EntityUtils.toString(response.getEntity());
+
+					_LOG.debug("Alias request received the following response: "
+							+ res);
+
+					running_index_postfix = index_postfix;
+				}
+
+				catch (Exception e) {
+					e.printStackTrace();
+					_LOG.error("Alias request failed...");
+					return 2;
+				}
+			}
+
+			index_postfix = dateFormat.format(new Date());
+		}
+
+		_LOG.debug("Adding to bulk load: element " + element_count
+				+ " of bulk size " + _bulk_size);
+
+		return 1;
+	}
+
+	public void setOptionalSettings(Map<String, String> settings) {
+		// TODO Auto-generated method stub
+		
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java
new file mode 100644
index 0000000..a94ef97
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java
@@ -0,0 +1,190 @@
+package com.opensoc.indexing.adapters;
+
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.collections.Bag;
+import org.apache.commons.collections.HashBag;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.json.simple.JSONObject;
+
+@SuppressWarnings("serial")
+public class ESTimedRotatingAdapter extends AbstractIndexAdapter implements
+		Serializable {
+
+	private int _bulk_size;
+	private String _index_name;
+	private String _document_name;
+	private String _cluster_name;
+	private int _port;
+	private String _ip;
+	public transient TransportClient client;
+	private DateFormat dateFormat;
+	
+	private Map<String, String> tuning_settings;
+
+	private Bag bulk_set;
+
+	private Settings settings;
+	
+	public void setOptionalSettings(Map<String, String> settings)
+	{
+		tuning_settings = settings;
+	}
+
+	@Override
+	public boolean initializeConnection(String ip, int port,
+			String cluster_name, String index_name, String document_name,
+			int bulk_size, String date_format) throws Exception {
+
+		bulk_set = new HashBag();
+
+		_LOG.trace("[OpenSOC] Initializing ESBulkAdapter...");
+
+		try {
+			_ip = ip;
+			_port = port;
+			_cluster_name = cluster_name;
+			_index_name = index_name;
+			_document_name = document_name;
+			_bulk_size = bulk_size;
+			
+
+			dateFormat = new SimpleDateFormat(date_format);
+
+			System.out.println("Bulk indexing is set to: " + _bulk_size);
+
+			ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder() ;	
+			
+			if(tuning_settings != null && tuning_settings.size() > 0)
+			{
+					builder.put(tuning_settings);
+			}
+			
+			builder.put("cluster.name", _cluster_name);
+			builder.put("client.transport.ping_timeout","500s");
+			
+			
+			settings = builder.build();
+					
+			client = new TransportClient(settings)
+					.addTransportAddress(new InetSocketTransportAddress(_ip,
+							_port));
+
+			return true;
+		} catch (Exception e) {
+			e.printStackTrace();
+			return false;
+		}
+	}
+
+	/**
+	 * @param raw_message
+	 *            message to bulk index in Elastic Search
+	 * @return integer (0) loaded into a bulk queue, (1) bulk indexing executed,
+	 *         (2) error
+	 */
+	@SuppressWarnings("unchecked")
+	public int bulkIndex(JSONObject raw_message) {
+
+		boolean success = true;
+		int set_size = 0;
+
+		synchronized (bulk_set) {
+			bulk_set.add(raw_message);
+			set_size = bulk_set.size();
+			
+			_LOG.trace("[OpenSOC] Incremented bulk size to: " + bulk_set.size());
+		}
+
+		try {
+
+			if (set_size >= _bulk_size) {
+				success = doIndex();
+
+				if (success)
+					return 1;
+				else
+					return 2;
+			}
+
+			return 0;
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			return 2;
+		}
+	}
+
+	public boolean doIndex() throws Exception {
+
+		try {
+
+			synchronized (bulk_set) {
+				if (client == null)
+					throw new Exception("client is null");
+
+				BulkRequestBuilder bulkRequest = client.prepareBulk();
+
+				Iterator<JSONObject> iterator = bulk_set.iterator();
+				
+				String index_postfix = dateFormat.format(new Date());
+
+				while (iterator.hasNext()) {
+					JSONObject setElement = iterator.next();
+					
+					_LOG.trace("[OpenSOC] Flushing to index: " + _index_name+ "_" + index_postfix);
+
+					IndexRequestBuilder a = client.prepareIndex(_index_name+ "_" + index_postfix,
+							_document_name);
+					a.setSource(setElement.toString());
+					bulkRequest.add(a);
+
+				}
+
+				_LOG.trace("[OpenSOC] Performing bulk load of size: "
+						+ bulkRequest.numberOfActions());
+
+				BulkResponse resp = bulkRequest.execute().actionGet();
+				
+				for(BulkItemResponse r: resp.getItems())
+				{
+					r.getResponse();
+					_LOG.trace("[OpenSOC] ES SUCCESS MESSAGE: " + r.getFailureMessage());
+				}
+				
+				bulk_set.clear();
+				
+				if (resp.hasFailures()) {
+					_LOG.error("[OpenSOC] Received bulk response error: "
+							+ resp.buildFailureMessage());
+					
+					for(BulkItemResponse r: resp.getItems())
+					{
+						r.getResponse();
+						_LOG.error("[OpenSOC] ES FAILURE MESSAGE: " + r.getFailureMessage());
+					}
+				}
+				
+			}
+
+			return true;
+		}
+
+		catch (Exception e) {
+			e.printStackTrace();
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/SolrAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/SolrAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/SolrAdapter.java
new file mode 100644
index 0000000..5d12dc2
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/com/apache/metron/indexing/adapters/SolrAdapter.java
@@ -0,0 +1,5 @@
+package com.opensoc.indexing.adapters;
+
+public class SolrAdapter {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/pom.xml b/metron-streaming/Metron-MessageParsers/pom.xml
new file mode 100644
index 0000000..9a7d651
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/pom.xml
@@ -0,0 +1,106 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.6BETA</version>
+	</parent>
+	<artifactId>OpenSOC-MessageParsers</artifactId>
+    <properties>
+ 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>	
+    </properties>
+	<dependencies>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${project.parent.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${global_storm_version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>${global_junit_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${global_guava_version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.thekraken</groupId>
+			<artifactId>grok</artifactId>
+			<version>0.1.0</version>
+		</dependency>
+	</dependencies>
+	<reporting>
+		<plugins>
+			<!-- Normally, dependency report takes time, skip it -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-project-info-reports-plugin</artifactId>
+				<version>2.7</version>
+
+				<configuration>
+					<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>emma-maven-plugin</artifactId>
+				<version>1.0-alpha-3</version>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-pmd-plugin</artifactId>
+				<configuration>
+					<targetJdk>1.7</targetJdk>
+				</configuration>
+				
+			</plugin>
+		</plugins>
+	</reporting>
+	<build>
+	<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<inherited>true</inherited>
+				<configuration>
+					<source>1.7</source>
+					<target>1.7</target>
+				</configuration>
+			</plugin>
+			</plugins>
+		<resources>
+		<resource>
+				<directory>src/main/resources</directory>
+			</resource>
+			<resource>
+				<directory>src/main/resources/patterns</directory>
+			</resource>
+			<resource>
+				<directory>src/test/resources</directory>
+			</resource>
+		</resources>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/pom.xml.versionsBackup b/metron-streaming/Metron-MessageParsers/pom.xml.versionsBackup
new file mode 100644
index 0000000..ef2d97d
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/pom.xml.versionsBackup
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?><!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+     http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>com.opensoc</groupId>
+    <artifactId>OpenSOC-Streaming</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+  </parent>
+  <artifactId>OpenSOC-MessageParsers</artifactId>
+  	<properties>
+		<opensoc.common.version>0.0.1-SNAPSHOT</opensoc.common.version>
+		<storm.version>0.9.1-incubating</storm.version>
+	</properties>
+  <dependencies>
+  		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${opensoc.common.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${storm.version}</version>
+			<scope>provided</scope>
+		</dependency>
+  		<dependency>
+  			<groupId>junit</groupId>
+  			<artifactId>junit</artifactId>
+  			<version>3.8.2</version>
+  		</dependency>		
+  </dependencies> 
+<reporting>
+    <plugins>
+	<!-- Normally, dependency report takes time, skip it -->
+      <plugin>
+		<groupId>org.apache.maven.plugins</groupId>
+		<artifactId>maven-project-info-reports-plugin</artifactId>
+		<version>2.7</version>
+	 
+		<configuration>
+	          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+		</configuration>
+      </plugin>    
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>emma-maven-plugin</artifactId>
+        <version>1.0-alpha-3</version>
+      </plugin>    
+      <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-pmd-plugin</artifactId>
+          <configuration>
+            <targetJdk>1.7</targetJdk>
+	  </configuration>
+        </plugin>            
+    </plugins>
+  </reporting>    
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/readme.md
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/readme.md b/metron-streaming/Metron-MessageParsers/readme.md
new file mode 100644
index 0000000..128932a
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/readme.md
@@ -0,0 +1,82 @@
+#OpenSOC-Parsers
+
+##Module Description
+
+This module provides a list of parsers that can be used with the OpenSOC framework.  There are two types of parsers.  First type is a Java parser.  This kind of parser is optimized for speed and performance and is built for use with higher velicity topologies.  These parsers are not easily modifiable and in order to make changes to them the entire topology need to be recompiled.  The second type of parser provided with the system is a Grok parser.  This type of parser is primarily designed for lower-velocity topologies or for quickly standing up a parser for a new telemetry before a permanent Java parser can be written for it.
+
+##Message Format
+
+All opensoc messages follow a specific format in order to ingest a message.  If a message does not conform to this format it will be dropped and put onto an error queue for further examination.  The message must be of a JSON format and must have a JSON tag message like so:
+
+```
+{"message" : message content}
+
+```
+
+Where appropriate there is also a standardization around the 5-tuple JSON fields.  This is done so the topology correlation engine further down stream can correlate messages from different topologies by these fields.  We are currently working on expanding the message standardization beyond these fields, but this feature is not yet availabe.  The standard field names are as follows:
+
+* ip_src_addr: layer 3 source IP
+* ip_dst_addr: layer 3 dest IP
+* ip_src_port: layer 4 source port
+* ip_dst_port: layer 4 dest port
+* protocol: layer 4 protocol
+* timestamp (epoch)
+* original_string: A human friendly string representation of the message
+
+The timestamp and original_string fields are madatory. The remaining standard fields are optional.  If any of the optional fields are not applicable then the field should be left out of the JSON.
+
+So putting it all together a typical OpenSOC message with all 5-tuple fields present would look like the following:
+
+```json
+{
+"message": 
+{"ip_src_addr": xxxx, 
+"ip_dst_addr": xxxx, 
+"ip_src_port": xxxx, 
+"ip_dst_port": xxxx, 
+"protocol": xxxx, 
+"original_string": xxx,
+"additional-field 1": xxx,
+}
+
+}
+```
+
+##Parser Bolt
+
+The OpenSOC parser bolt is a standard bolt, which can be extended with multiple Java and Grok parser adapter for parsing different topology messages.  The bolt signature for declaration in a storm topology is as follows:
+
+```
+AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+.withMessageParser(parser)
+.withMessageFilter(new GenericMessageFilter())
+.withMetricConfig(config);
+
+```
+
+Metric Config - optional argument for exporting custom metrics to graphite.  If set to null no metrics will be exported.  If set, then a list of metrics defined in the metrics.conf file of each topology will define will metrics are exported and how often.
+
+Message Filter - a filter defining which messages can be dropped.  This feature is only present in the Java paerer adapters
+
+Message Parser - defines the parser adapter to be used for a topology
+
+##Parser Adapters
+
+Parser adapters are loaded dynamically in each OpenSOC topology.  They are defined in topology.conf in the configuration item bolt.parser.adapter
+
+###Java Parser Adapters
+Java parser adapters are indended for higher-velocity topologies and are not easily changed or extended.  As the adoption of OpenSOC continues we plan on extending our library of Java adapters to process more log formats.  As of this moment the Java adapters included with OpenSOC are:
+
+* com.opensoc.parsing.parsers.BasicIseParser : Parse ISE messages
+* com.opensoc.parsing.parsers.BasicBroParser : Parse Bro messages
+* com.opensoc.parsing.parsers.BasicSourcefireParser : Parse Sourcefire messages
+* com.opensoc.parsing.parsers.BasicLancopeParser : Parse Lancope messages
+
+###Grok Parser Adapters
+Grok parser adapters are designed primarly for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies.  Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible.  Grok parsers are defined via a config file and the topplogy does not need to be recombiled in order to make changes to them.  An example of a Grok perser is:
+
+* com.opensoc.parsing.parsers.GrokSourcefireParser
+
+For more information on the Grok project please refer to the following link:
+
+https://github.com/thekrakken/java-grok

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/filters/BroMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/filters/BroMessageFilter.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/filters/BroMessageFilter.java
new file mode 100644
index 0000000..5b58d59
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/filters/BroMessageFilter.java
@@ -0,0 +1,44 @@
+package com.opensoc.filters;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONObject;
+
+import com.opensoc.parser.interfaces.MessageFilter;
+
+public class BroMessageFilter implements MessageFilter,Serializable {
+
+	/**
+	 * Filter protocols based on whitelists and blacklists
+	 */
+	
+	private static final long serialVersionUID = -3824683649114625033L;
+	private String _key;
+	private final Set<String> _known_protocols;
+
+	 /**
+	 * @param  filter  Commons configuration for reading properties files
+	 * @param  key Key in a JSON mesage where the protocol field is located
+	 */
+	
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public BroMessageFilter(Configuration conf, String key) {
+		_key = key;
+		_known_protocols = new HashSet<String>();
+		List known_protocols = conf.getList("source.known.protocols");
+		_known_protocols.addAll(known_protocols);
+	}
+
+	 /**
+	 * @param  message  JSON representation of a message with a protocol field
+	 * @return      False if message if filtered and True if message is not filtered
+	 */
+	
+	public boolean emitTuple(JSONObject message) {
+		return _known_protocols.contains(message.get(_key));
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/filters/GenericMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/filters/GenericMessageFilter.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/filters/GenericMessageFilter.java
new file mode 100644
index 0000000..4693bf9
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/filters/GenericMessageFilter.java
@@ -0,0 +1,19 @@
+package com.opensoc.filters;
+import java.io.Serializable;
+
+import org.json.simple.JSONObject;
+
+import com.opensoc.parser.interfaces.MessageFilter;
+
+public class GenericMessageFilter implements MessageFilter,Serializable {
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 3626397212398318852L;
+
+	public boolean emitTuple(JSONObject message) {
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/AbstractParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/AbstractParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/AbstractParserBolt.java
new file mode 100644
index 0000000..7dc5d4f
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/AbstractParserBolt.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.parsing;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+
+import com.codahale.metrics.Counter;
+import com.opensoc.metrics.MetricReporter;
+import com.opensoc.parser.interfaces.MessageFilter;
+import com.opensoc.parser.interfaces.MessageParser;
+
+@SuppressWarnings("rawtypes")
+public abstract class AbstractParserBolt extends BaseRichBolt {
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -6710596708304282838L;
+
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractParserBolt.class);
+
+	protected OutputCollector _collector;
+	protected MessageParser _parser;
+
+	protected String OutputFieldName;
+	protected MetricReporter _reporter;
+	protected MessageFilter _filter;
+
+	protected Counter ackCounter, emitCounter, failCounter;
+
+	/**
+	 * Register counters to be reported to graphite
+	 * */
+
+	protected void registerCounters() {
+
+		String ackString = _parser.getClass().getSimpleName() + ".ack";
+
+		String emitString = _parser.getClass().getSimpleName() + ".emit";
+
+		String failString = _parser.getClass().getSimpleName() + ".fail";
+
+		ackCounter = _reporter.registerCounter(ackString);
+		emitCounter = _reporter.registerCounter(emitString);
+		failCounter = _reporter.registerCounter(failString);
+
+	}
+
+	/**
+	 * Check to make sure all required variables have been initialized
+	 * */
+
+	public final void prepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) {
+		_collector = collector;
+		if (this._parser == null)
+			throw new IllegalStateException("MessageParser must be specified");
+		if (this.OutputFieldName == null)
+			throw new IllegalStateException("OutputFieldName must be specified");
+
+		if (this._filter == null)
+			throw new IllegalStateException("MessageFilter must be specified");
+
+		try {
+			doPrepare(conf, topologyContext, collector);
+		} catch (IOException e) {
+			LOG.error("Counld not initialize...");
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * @param parser
+	 *            The parser class for parsing the incoming raw message byte
+	 *            stream
+	 * @return Instance of this class
+	 */
+
+	public boolean checkForSchemaCorrectness(JSONObject message) {
+		int correct = 0;
+
+		
+		if (!(message.containsKey("original_string"))) {
+			LOG.trace("[OpenSOC] Message does not have original_string: " + message);
+			return false;
+		} else if (!(message.containsKey("timestamp"))) { 
+			LOG.trace("[OpenSOC] Message does not have timestamp: " + message);
+			return false;
+		} else {
+			LOG.trace("[OpenSOC] Message conforms to schema: "
+					+ message);
+			return true;
+		}
+	}
+
+	abstract void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException;
+
+	protected String generateTopologyKey(String src_ip, String dst_ip)
+			throws Exception {
+		try {
+			if (dst_ip == null && src_ip == null)
+				return "0";
+
+			if (src_ip == null || src_ip.length() == 0)
+				return dst_ip;
+
+			if (dst_ip == null || dst_ip.length() == 0)
+				return src_ip;
+
+			double ip1 = Double.parseDouble(src_ip.replace(".", ""));
+			double ip2 = Double.parseDouble(dst_ip.replace(".", ""));
+
+			return String.valueOf(ip1 + ip2);
+		} catch (Exception e) {
+			return "0";
+		}
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/PcapParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/PcapParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/PcapParserBolt.java
new file mode 100644
index 0000000..4fb6482
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/PcapParserBolt.java
@@ -0,0 +1,227 @@
+package com.opensoc.parsing;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import com.opensoc.helpers.topology.ErrorGenerator;
+import com.opensoc.parsing.parsers.PcapParser;
+import com.opensoc.pcap.PacketInfo;
+
+import backtype.storm.generated.Grouping;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+
+
+/**
+ * The Class PcapParserBolt parses each input tuple and emits a new tuple which
+ * contains the information (header_json,group_key,pcap_id, timestamp, pcap) as
+ * defined in the output schema.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapParserBolt implements IRichBolt {
+
+  /** The Constant serialVersionUID. */
+  private static final long serialVersionUID = -1449830233777209255L;
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger.getLogger(PcapParserBolt.class);
+
+  /** The collector. */
+  private OutputCollector collector = null;
+
+  /** The conf. */
+  @SuppressWarnings("rawtypes")
+private Map conf;
+
+  /** The number of chars to use for shuffle grouping. */
+  @SuppressWarnings("unused")
+private int numberOfCharsToUseForShuffleGrouping = 4;
+
+  /** The divisor to convert nanos to expected time precision. */
+  private long timePrecisionDivisor = 1L;
+
+
+  // HBaseStreamPartitioner hBaseStreamPartitioner = null ;
+
+  /**
+   * The Constructor.
+   */
+  public PcapParserBolt() {
+
+  }
+
+  public PcapParserBolt withTsPrecision(String tsPrecision) {
+	if (tsPrecision.equalsIgnoreCase("MILLI")) {
+	  //Convert nanos to millis
+	  LOG.info("Configured for MILLI, setting timePrecisionDivisor to 1000000L" );
+	  timePrecisionDivisor = 1000000L;
+	} else if (tsPrecision.equalsIgnoreCase("MICRO")) {
+	  //Convert nanos to micro
+	  LOG.info("Configured for MICRO, setting timePrecisionDivisor to 1000L" );
+	  timePrecisionDivisor = 1000L;
+	} else if (tsPrecision.equalsIgnoreCase("NANO")) {
+	  //Keep nano as is.
+	  LOG.info("Configured for NANO, setting timePrecisionDivisor to 1L" );
+	  timePrecisionDivisor = 1L;
+	} else {
+	  LOG.info("bolt.parser.ts.precision not set. Default to NANO");
+	  timePrecisionDivisor = 1L;
+	}
+	return this;
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm
+   * .topology.OutputFieldsDeclarer)
+   */
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+	  declarer.declareStream("message", new Fields("key", "message")); 
+    //declarer.declareStream("pcap_index_stream", new Fields("index_json", "pcap_id"));
+    declarer.declareStream("pcap_header_stream", new Fields("header_json", "pcap_id"));
+    declarer.declareStream("pcap_data_stream", new Fields("pcap_id", "timestamp", "pcap"));
+    declarer.declareStream("error", new Fields("error"));
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see backtype.storm.topology.IComponent#getComponentConfiguration()
+   */
+  /**
+   * Method getComponentConfiguration.
+   * 
+   * 
+   * 
+   * @return Map<String,Object> * @see
+   *         backtype.storm.topology.IComponent#getComponentConfiguration() * @see
+   *         backtype.storm.topology.IComponent#getComponentConfiguration() * @see
+   *         backtype.storm.topology.IComponent#getComponentConfiguration()
+   */
+
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see backtype.storm.task.IBolt#prepare(java.util.Map,
+   * backtype.storm.task.TopologyContext, backtype.storm.task.OutputCollector)
+   */
+
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+    this.conf = stormConf;
+    if (conf.containsKey("bolt.parser.num.of.key.chars.to.use.for.shuffle.grouping")) {
+      this.numberOfCharsToUseForShuffleGrouping = Integer.valueOf(conf.get(
+          "bolt.parser.num.of.key.chars.to.use.for.shuffle.grouping").toString());
+    }
+    
+    Grouping._Fields a;
+
+
+    // hBaseStreamPartitioner = new HBaseStreamPartitioner(
+    // conf.get("bolt.hbase.table.name").toString(),
+    // 0,
+    // Integer.parseInt(conf.get("bolt.hbase.partitioner.region.info.refresh.interval.mins").toString()))
+    // ;
+    // hBaseStreamPartitioner.prepare();
+
+  }
+
+  /**
+   * Processes each input tuple and emits tuple which holds the following
+   * information about a network packet : group_key : first 3 digits of the
+   * pcap_id pcap_id : generated from network packet srcIp, dstIp, protocol,
+   * srcPort, dstPort header_json : contains global header, ipv4 header, tcp
+   * header(if the n/w protocol is tcp), udp header (if the n/w protocol is udp)
+   * timestamp : the n/w packet capture timestamp pcap : tuple in binary array.
+   * 
+   * @param input
+   *          Tuple
+   * @see backtype.storm.task.IBolt#execute(Tuple)
+   */
+
+  @SuppressWarnings("unchecked")
+public void execute(Tuple input) {
+
+    // LOG.debug("In PcapParserBolt bolt: Got tuple " + input);
+    // LOG.debug("Got this pcap : " + new String(input.getBinary(0)));
+
+    List<PacketInfo> packetInfoList = null;
+    try {
+      packetInfoList = PcapParser.parse(input.getBinary(0));
+
+      if (packetInfoList != null) {
+
+        for (PacketInfo packetInfo : packetInfoList) {
+        	
+        	
+        	String string_pcap = packetInfo.getJsonIndexDoc();
+        	Object obj=JSONValue.parse(string_pcap);
+        	  JSONObject header=(JSONObject)obj;
+        	
+        	JSONObject message = new JSONObject();
+        	//message.put("key", packetInfo.getKey());
+        	
+        	message.put("message", header);
+        	
+        	collector.emit("message", new Values(packetInfo.getKey(), message));
+        	
+        	//collector.emit("pcap_index_stream", new Values(packetInfo.getJsonIndexDoc(), packetInfo.getKey()));
+        	
+          collector.emit("pcap_header_stream", new Values(packetInfo.getJsonDoc(), packetInfo.getKey()));
+          collector.emit("pcap_data_stream", new Values(packetInfo.getKey(),
+             packetInfo.getPacketTimeInNanos() / timePrecisionDivisor,
+              input.getBinary(0)));
+
+          // collector.emit(new Values(packetInfo.getJsonDoc(), packetInfo
+          // .getKey().substring(0, numberOfCharsToUseForShuffleGrouping),
+          // packetInfo.getKey(), (packetInfo.getPacketHeader().getTsSec()
+          // * secMultiplier + packetInfo.getPacketHeader().getTsUsec()
+          // * microSecMultiplier), input.getBinary(0)));
+        }
+      }
+
+    } catch (Exception e) {
+      collector.fail(input);
+      e.printStackTrace();
+      LOG.error("Exception while processing tuple", e);
+      
+
+		JSONObject error = ErrorGenerator.generateErrorMessage(
+				"Alerts problem: " + input.getBinary(0), e);
+		collector.emit("error", new Values(error));
+		
+      return;
+    }
+    collector.ack(input);
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see backtype.storm.task.IBolt#cleanup()
+   */
+
+  public void cleanup() {
+    // TODO Auto-generated method stub
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/TelemetryParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/TelemetryParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/TelemetryParserBolt.java
new file mode 100644
index 0000000..8a48764
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/TelemetryParserBolt.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.parsing;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONObject;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.opensoc.helpers.topology.ErrorGenerator;
+import com.opensoc.json.serialization.JSONEncoderHelper;
+import com.opensoc.metrics.MetricReporter;
+import com.opensoc.parser.interfaces.MessageFilter;
+import com.opensoc.parser.interfaces.MessageParser;
+
+/**
+ * Uses an adapter to parse a telemetry message from its native format into a
+ * standard JSON. For a list of available adapter please check
+ * com.opensoc.parser.parsers. The input is a raw byte array and the output is a
+ * JSONObject
+ * <p>
+ * The parsing conventions are as follows:
+ * <p>
+ * <ul>
+ * 
+ * <li>ip_src_addr = source ip of a message
+ * <li>ip_dst_addr = destination ip of a message
+ * <li>ip_src_port = source port of a message
+ * <li>ip_dst_port = destination port of a message
+ * <li>protocol = protocol of a message
+ * <ul>
+ * <p>
+ * <p>
+ * If a message does not contain at least one of these variables it will be
+ * failed
+ **/
+
+@SuppressWarnings("rawtypes")
+public class TelemetryParserBolt extends AbstractParserBolt {
+
+	private static final long serialVersionUID = -2647123143398352020L;
+	private JSONObject metricConfiguration;
+
+	/**
+	 * @param parser
+	 *            The parser class for parsing the incoming raw message byte
+	 *            stream
+	 * @return Instance of this class
+	 */
+
+	public TelemetryParserBolt withMessageParser(MessageParser parser) {
+		_parser = parser;
+		return this;
+	}
+
+	/**
+	 * @param OutputFieldName
+	 *            Field name of the output tuple
+	 * @return Instance of this class
+	 */
+
+	public TelemetryParserBolt withOutputFieldName(String OutputFieldName) {
+		this.OutputFieldName = OutputFieldName;
+		return this;
+	}
+
+	/**
+	 * @param filter
+	 *            A class for filtering/dropping incomming telemetry messages
+	 * @return Instance of this class
+	 */
+
+	public TelemetryParserBolt withMessageFilter(MessageFilter filter) {
+		this._filter = filter;
+		return this;
+	}
+
+	/**
+	 * @param config
+	 *            A class for generating custom metrics into graphite
+	 * @return Instance of this class
+	 */
+
+	public TelemetryParserBolt withMetricConfig(Configuration config) {
+		this.metricConfiguration = JSONEncoderHelper.getJSON(config
+				.subset("com.opensoc.metrics"));
+		return this;
+	}
+
+	@Override
+	void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException {
+
+		LOG.info("[OpenSOC] Preparing TelemetryParser Bolt...");
+
+		if (metricConfiguration != null) {
+			_reporter = new MetricReporter();
+			_reporter
+					.initialize(metricConfiguration, TelemetryParserBolt.class);
+			LOG.info("[OpenSOC] Metric reporter is initialized");
+		} else {
+			LOG.info("[OpenSOC] Metric reporter is not initialized");
+		}
+		this.registerCounters();
+		
+		if(_parser != null)
+		_parser.init();
+		
+		
+	}
+
+	@SuppressWarnings("unchecked")
+	public void execute(Tuple tuple) {
+
+		LOG.trace("[OpenSOC] Starting to process a new incoming tuple");
+
+		byte[] original_message = null;
+
+		try {
+
+			original_message = tuple.getBinary(0);
+
+			LOG.trace("[OpenSOC] Starting the parsing process");
+
+			if (original_message == null || original_message.length == 0) {
+				LOG.error("Incomming tuple is null");
+				throw new Exception("Invalid message length");
+			}
+
+			LOG.trace("[OpenSOC] Attempting to transofrm binary message to JSON");
+			JSONObject transformed_message = _parser.parse(original_message);
+			LOG.debug("[OpenSOC] Transformed Telemetry message: "
+					+ transformed_message);
+
+			if (transformed_message == null || transformed_message.isEmpty())
+				throw new Exception("Unable to turn binary message into a JSON");
+
+			LOG.trace("[OpenSOC] Checking if the transformed JSON conforms to the right schema");
+
+			if (!checkForSchemaCorrectness(transformed_message)) {
+				throw new Exception("Incorrect formatting on message: "
+						+ transformed_message);
+			}
+
+			else {
+				LOG.trace("[OpenSOC] JSON message has the right schema");
+				boolean filtered = false;
+
+				if (_filter != null) {
+					if (!_filter.emitTuple(transformed_message)) {
+						filtered = true;
+					}
+				}
+
+				if (!filtered) {
+					String ip1 = null;
+
+					if (transformed_message.containsKey("ip_src_addr"))
+						ip1 = transformed_message.get("ip_src_addr").toString();
+
+					String ip2 = null;
+
+					if (transformed_message.containsKey("ip_dst_addr"))
+						ip2 = transformed_message.get("ip_dst_addr").toString();
+
+					String key = generateTopologyKey(ip1, ip2);
+
+					JSONObject new_message = new JSONObject();
+					new_message.put("message", transformed_message);
+					_collector.emit("message", new Values(key, new_message));
+				}
+
+				_collector.ack(tuple);
+				if (metricConfiguration != null)
+					ackCounter.inc();
+			}
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			LOG.error("Failed to parse telemetry message :" + original_message);
+			_collector.fail(tuple);
+
+			if (metricConfiguration != null)
+				failCounter.inc();
+
+			JSONObject error = ErrorGenerator.generateErrorMessage(
+					"Parsing problem: " + new String(original_message),
+					e);
+			_collector.emit("error", new Values(error));
+		}
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declearer) {
+		declearer.declareStream("message", new Fields("key", "message"));
+		declearer.declareStream("error", new Fields("message"));
+
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/AbstractParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/AbstractParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/AbstractParser.java
new file mode 100644
index 0000000..728e275
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/AbstractParser.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.parsing.parsers;
+
+import java.io.Serializable;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.parser.interfaces.MessageParser;
+import com.opensoc.parsing.AbstractParserBolt;
+
+@SuppressWarnings("serial")
+public abstract class AbstractParser implements MessageParser, Serializable {
+
+	protected static final Logger _LOG = LoggerFactory
+			.getLogger(AbstractParserBolt.class);
+
+	public void initializeParser() {
+		_LOG.debug("Initializing adapter...");
+		
+
+	}
+	
+	public void init() {
+		
+	}
+	
+	
+	abstract public JSONObject parse(byte[] raw_message);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicBroParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicBroParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicBroParser.java
new file mode 100644
index 0000000..741fd75
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicBroParser.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.parsing.parsers;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.tldextractor.BasicTldExtractor;
+
+@SuppressWarnings("serial")
+public class BasicBroParser extends AbstractParser {
+
+	protected static final Logger _LOG = LoggerFactory
+			.getLogger(BasicBroParser.class);
+	private JSONCleaner cleaner = new JSONCleaner();
+	private BasicTldExtractor tldex = new BasicTldExtractor();
+
+	@SuppressWarnings("unchecked")
+	public JSONObject parse(byte[] msg) {
+
+		_LOG.trace("[OpenSOC] Starting to parse incoming message");
+
+		String raw_message = null;
+
+		try {
+
+			raw_message = new String(msg, "UTF-8");
+			_LOG.trace("[OpenSOC] Received message: " + raw_message);
+
+			JSONObject cleaned_message = cleaner.Clean(raw_message);
+			_LOG.debug("[OpenSOC] Cleaned message: " + raw_message);
+
+			if (cleaned_message == null || cleaned_message.isEmpty())
+				throw new Exception("Unable to clean message: " + raw_message);
+
+			String key = cleaned_message.keySet().iterator().next().toString();
+
+			if (key == null)
+				throw new Exception("Unable to retrieve key for message: "
+						+ raw_message);
+
+			JSONObject payload = (JSONObject) cleaned_message.get(key);
+
+			String originalString = " |";
+			for (Object k : payload.keySet()) {
+				originalString = originalString + " " + k.toString() + ":"
+						+ payload.get(k).toString();
+			}
+			originalString = key.toUpperCase() + originalString;
+			payload.put("original_string", originalString);
+
+			if (payload == null)
+				throw new Exception("Unable to retrieve payload for message: "
+						+ raw_message);
+
+			if (payload.containsKey("ts")) {
+				String ts = payload.remove("ts").toString();
+				payload.put("timestamp", ts);
+				_LOG.trace("[OpenSOC] Added ts to: " + payload);
+			}
+
+			if (payload.containsKey("id.orig_h")) {
+				String source_ip = payload.remove("id.orig_h").toString();
+				payload.put("ip_src_addr", source_ip);
+				_LOG.trace("[OpenSOC] Added ip_src_addr to: " + payload);
+			} else if (payload.containsKey("tx_hosts")) {
+				JSONArray txHosts = (JSONArray) payload.remove("tx_hosts");
+				if (txHosts != null && !txHosts.isEmpty()) {
+					payload.put("ip_src_addr", txHosts.get(0));
+					_LOG.trace("[OpenSOC] Added ip_src_addr to: " + payload);
+				}
+			}
+			
+			if (payload.containsKey("id.resp_h")) {
+				String source_ip = payload.remove("id.resp_h").toString();
+				payload.put("ip_dst_addr", source_ip);
+				_LOG.trace("[OpenSOC] Added ip_dst_addr to: " + payload);
+			} else if (payload.containsKey("rx_hosts")) {
+				JSONArray rxHosts = (JSONArray) payload.remove("rx_hosts");
+				if (rxHosts != null && !rxHosts.isEmpty()) {
+					payload.put("ip_dst_addr", rxHosts.get(0));
+					_LOG.trace("[OpenSOC] Added ip_dst_addr to: " + payload);
+				}
+			}
+			
+			if (payload.containsKey("id.orig_p")) {
+				String source_port = payload.remove("id.orig_p").toString();
+				payload.put("ip_src_port", source_port);
+				_LOG.trace("[OpenSOC] Added ip_src_port to: " + payload);
+			}
+			if (payload.containsKey("id.resp_p")) {
+				String dest_port = payload.remove("id.resp_p").toString();
+				payload.put("ip_dst_port", dest_port);
+				_LOG.trace("[OpenSOC] Added ip_dst_port to: " + payload);
+			}
+			
+//			if (payload.containsKey("host")) {
+//
+//				String host = payload.get("host").toString().trim();
+//				String tld = tldex.extractTLD(host);
+//
+//				payload.put("tld", tld);
+//				_LOG.trace("[OpenSOC] Added tld to: " + payload);
+//
+//			}
+//			if (payload.containsKey("query")) {
+//				String host = payload.get("query").toString();
+//				String[] parts = host.split("\\.");
+//				int length = parts.length;
+//				if (length >= 2) {
+//					payload.put("tld", parts[length - 2] + "."
+//							+ parts[length - 1]);
+//					_LOG.trace("[OpenSOC] Added tld to: " + payload);
+//				}
+//			}
+
+			_LOG.trace("[OpenSOC] Inner message: " + payload);
+
+			payload.put("protocol", key);
+			_LOG.debug("[OpenSOC] Returning parsed message: " + payload);
+
+			return payload;
+
+		} catch (Exception e) {
+
+			_LOG.error("Unable to Parse Message: " + raw_message);
+			e.printStackTrace();
+			return null;
+		}
+
+	}
+
+	
+}


Mime
View raw message