metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [50/51] [partial] incubator-metron git commit: Initial import of code from https://github.com/OpenSOC/opensoc at ac0b00373f8f56dfae03a8109af5feb373ea598e.
Date Tue, 08 Dec 2015 06:38:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/alerts/adapters/AllAlertAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/alerts/adapters/AllAlertAdapter.java b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/alerts/adapters/AllAlertAdapter.java
new file mode 100644
index 0000000..035a865
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/alerts/adapters/AllAlertAdapter.java
@@ -0,0 +1,223 @@
+package com.opensoc.alerts.adapters;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.validator.routines.InetAddressValidator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.opensoc.alerts.interfaces.AlertsAdapter;
+
+public class AllAlertAdapter implements AlertsAdapter, Serializable {
+
+	HTableInterface whitelist_table;
+	InetAddressValidator ipvalidator = new InetAddressValidator();
+	String _whitelist_table_name;
+	// String _blacklist_table_name;
+	String _quorum;
+	String _port;
+	String _topologyname;
+	Configuration conf = null;
+
+	protected  Cache<String, String> cache;
+
+	Map<String, String> id_list = new HashMap<String, String>();
+
+	Set<String> loaded_whitelist = new HashSet<String>();
+	Set<String> loaded_blacklist = new HashSet<String>();
+
+	String _topology_name;
+
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AllAlertAdapter.class);
+
+	public AllAlertAdapter(String whitelist_table_name,
+			String blacklist_table_name, String quorum, String port,
+			int _MAX_TIME_RETAIN, int _MAX_CACHE_SIZE) {
+
+		_whitelist_table_name = whitelist_table_name;
+
+		_quorum = quorum;
+		_port = port;
+
+		cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE)
+				.expireAfterWrite(_MAX_TIME_RETAIN, TimeUnit.MINUTES).build();
+
+
+	}
+
+
+	public boolean initialize() {
+
+		conf = HBaseConfiguration.create();
+		conf.set("hbase.zookeeper.quorum", _quorum);
+		conf.set("hbase.zookeeper.property.clientPort", _port);
+
+		LOG.trace("[OpenSOC] Connecting to hbase with conf:" + conf);		
+		LOG.trace("[OpenSOC] Whitelist table name: " + _whitelist_table_name);
+		LOG.trace("[OpenSOC] ZK Client/port: " + conf.get("hbase.zookeeper.quorum") + " -> " + conf.get("hbase.zookeeper.property.clientPort"));
+
+		try {
+
+			HConnection connection = HConnectionManager.createConnection(conf);
+
+			LOG.trace("[OpenSOC] CONNECTED TO HBASE");
+
+			HBaseAdmin hba = new HBaseAdmin(conf);
+
+			if (!hba.tableExists(_whitelist_table_name))
+				throw new Exception("Whitelist table doesn't exist");
+
+			whitelist_table = new HTable(conf, _whitelist_table_name);
+
+			LOG.trace("[OpenSOC] CONNECTED TO TABLE: "+ _whitelist_table_name);
+
+			Scan scan = new Scan();
+
+
+			ResultScanner rs = whitelist_table.getScanner(scan);
+			try {
+				for (Result r = rs.next(); r != null; r = rs.next()) {
+					loaded_whitelist.add(Bytes.toString(r.getRow()));
+				}
+			} catch (Exception e) {
+				LOG.trace("[OpenSOC] COULD NOT READ FROM HBASE");
+				e.printStackTrace();
+			} finally {
+				rs.close(); 
+			}
+			whitelist_table.close();
+
+			LOG.trace("[OpenSOC] Number of entires in white list: " + loaded_whitelist.size());
+			
+			if(loaded_whitelist.size() == 0)
+				throw new Exception("Hbase connection is OK, but the table is empty: " + whitelist_table);
+
+			return true;
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+
+		return false;
+
+	}
+
+	protected String generateAlertId(String source_ip, String dst_ip,
+			int alert_type) {
+
+		String key = makeKey(source_ip, dst_ip, alert_type);
+
+		if (cache.getIfPresent(key) != null)
+			return cache.getIfPresent(key);
+
+		String new_UUID = System.currentTimeMillis() + "-" + UUID.randomUUID();
+
+		cache.put(key, new_UUID);
+		key = makeKey(dst_ip, source_ip, alert_type);
+		cache.put(key, new_UUID);
+
+		return new_UUID;
+
+	}
+
+	public boolean getByKey(String metadata, HTableInterface table) {
+
+		LOG.trace("[OpenSOC] Pinging HBase For:" + metadata);
+
+
+		Get get = new Get(metadata.getBytes());
+		Result rs;
+
+		try {
+			rs = table.get(get);
+
+			if (rs.size() > 0)
+				return true;
+			else
+				return false;
+
+		} catch (IOException e) {
+
+			e.printStackTrace();
+		}
+
+		return false;
+
+	}
+
+	public boolean refresh() throws Exception {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	private String makeKey(String ip1, String ip2, int alert_type) {
+		return (ip1 + "-" + ip2 + "-" + alert_type);
+	}
+
+	@SuppressWarnings("unchecked")
+	public Map<String, JSONObject> alert(JSONObject raw_message) {
+
+		Map<String, JSONObject> alerts = new HashMap<String, JSONObject>();
+		JSONObject alert = new JSONObject();
+
+		JSONObject content = (JSONObject) raw_message.get("message");
+		String source_ip = content.get("ip_src_addr").toString();
+		String dst_ip = content.get("ip_dst_addr").toString();
+
+		String source = null;
+
+		if (loaded_whitelist.contains(source_ip))
+			source = source_ip;
+		else if (loaded_whitelist.contains(dst_ip))
+			source = dst_ip;
+		else
+			source = "unknown";
+
+		alert.put("title", "Appliance alert for: " + source_ip + "->" + dst_ip);
+		alert.put("priority", "1");
+		alert.put("type", "error");
+		alert.put("designated_host", source);
+		alert.put("source", source_ip);
+		alert.put("dest", dst_ip);
+		alert.put("body", "Appliance alert for: " + source_ip + "->" + dst_ip);
+
+		String alert_id = generateAlertId(source_ip, dst_ip, 0);
+
+		alert.put("reference_id", alert_id);
+		alerts.put(alert_id, alert);
+		
+		LOG.trace("[OpenSOC] Returning alert: " + alerts);
+
+		 return alerts;
+	}
+
+
+	public boolean containsAlertId(String alert) {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/alerts/adapters/HbaseWhiteAndBlacklistAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/alerts/adapters/HbaseWhiteAndBlacklistAdapter.java b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/alerts/adapters/HbaseWhiteAndBlacklistAdapter.java
new file mode 100644
index 0000000..6bfed85
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/alerts/adapters/HbaseWhiteAndBlacklistAdapter.java
@@ -0,0 +1,420 @@
+package com.opensoc.alerts.adapters;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.validator.routines.InetAddressValidator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.opensoc.alerts.interfaces.AlertsAdapter;
+
+public class HbaseWhiteAndBlacklistAdapter implements AlertsAdapter,
+		Serializable {
+
+	HTableInterface blacklist_table;
+	HTableInterface whitelist_table;
+	InetAddressValidator ipvalidator = new InetAddressValidator();
+	String _whitelist_table_name;
+	String _blacklist_table_name;
+	String _quorum;
+	String _port;
+	String _topologyname;
+	Configuration conf = null;
+
+	Cache<String, String>cache;
+	String _topology_name;
+	
+	Set<String> loaded_whitelist = new HashSet<String>();
+	Set<String> loaded_blacklist = new HashSet<String>();
+
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(HbaseWhiteAndBlacklistAdapter.class);
+
+	public HbaseWhiteAndBlacklistAdapter(String whitelist_table_name,
+			String blacklist_table_name, String quorum, String port,
+			int _MAX_TIME_RETAIN, int _MAX_CACHE_SIZE) {
+
+		_whitelist_table_name = whitelist_table_name;
+		_blacklist_table_name = blacklist_table_name;
+		_quorum = quorum;
+		_port = port;
+
+		cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE)
+				.expireAfterWrite(_MAX_TIME_RETAIN, TimeUnit.MINUTES).build();
+
+	}
+	
+
+
+	public boolean initialize() {
+
+		conf = HBaseConfiguration.create();
+		conf.set("hbase.zookeeper.quorum", _quorum);
+		conf.set("hbase.zookeeper.property.clientPort", _port);
+
+		LOG.trace("[OpenSOC] Connecting to hbase with conf:" + conf);		
+		LOG.trace("[OpenSOC] Whitelist table name: " + _whitelist_table_name);
+		LOG.trace("[OpenSOC] Whitelist table name: " + _blacklist_table_name);
+		LOG.trace("[OpenSOC] ZK Client/port: " + conf.get("hbase.zookeeper.quorum") + " -> " + conf.get("hbase.zookeeper.property.clientPort"));
+
+		try {
+
+			LOG.trace("[OpenSOC] Attempting to connect to hbase");
+
+			HConnection connection = HConnectionManager.createConnection(conf);
+
+			LOG.trace("[OpenSOC] CONNECTED TO HBASE");
+
+			HBaseAdmin hba = new HBaseAdmin(conf);
+
+			if (!hba.tableExists(_whitelist_table_name))
+				throw new Exception("Whitelist table doesn't exist");
+
+			if (!hba.tableExists(_blacklist_table_name))
+				throw new Exception("Blacklist table doesn't exist");
+
+			whitelist_table = new HTable(conf, _whitelist_table_name);
+
+			LOG.trace("[OpenSOC] CONNECTED TO TABLE: "
+					+ _whitelist_table_name);
+			blacklist_table = new HTable(conf, _blacklist_table_name);
+			LOG.trace("[OpenSOC] CONNECTED TO TABLE: "
+					+ _blacklist_table_name);
+
+			if (connection == null || whitelist_table == null
+					|| blacklist_table == null)
+				throw new Exception("Unable to initialize hbase connection");
+			
+			Scan scan = new Scan();
+
+
+			ResultScanner rs = whitelist_table.getScanner(scan);
+			try {
+				for (Result r = rs.next(); r != null; r = rs.next()) {
+					loaded_whitelist.add(Bytes.toString(r.getRow()));
+				}
+			} catch (Exception e) {
+				LOG.trace("[OpenSOC] COULD NOT READ FROM HBASE");
+				e.printStackTrace();
+			} finally {
+				rs.close(); // always close the ResultScanner!
+			}
+			whitelist_table.close();
+
+			LOG.trace("[OpenSOC] READ IN WHITELIST: " + loaded_whitelist.size());
+			
+			
+			 scan = new Scan();
+
+
+			 rs = blacklist_table.getScanner(scan);
+			try {
+				for (Result r = rs.next(); r != null; r = rs.next()) {
+					loaded_blacklist.add(Bytes.toString(r.getRow()));
+				}
+			} catch (Exception e) {
+				LOG.trace("[OpenSOC] COULD NOT READ FROM HBASE");
+				e.printStackTrace();
+			} finally {
+				rs.close(); // always close the ResultScanner!
+			}
+			blacklist_table.close();
+
+			LOG.trace("[OpenSOC] READ IN WHITELIST: " + loaded_whitelist.size());
+
+			return true;
+		} catch (Exception e) {
+
+			e.printStackTrace();
+		}
+
+		return false;
+
+	}
+
+	protected String generateAlertId(String source_ip, String dst_ip,
+			int alert_type) {
+
+		String key = makeKey(source_ip, dst_ip, alert_type);
+
+		if (cache.getIfPresent(key) != null)
+			return cache.getIfPresent(key);
+
+		String new_UUID = System.currentTimeMillis() + "-" + UUID.randomUUID();
+
+		cache.put(key, new_UUID);
+		key = makeKey(dst_ip, source_ip, alert_type);
+		cache.put(key, new_UUID);
+
+		return new_UUID;
+
+	}
+
+
+	public boolean refresh() throws Exception {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	private String makeKey(String ip1, String ip2, int alert_type) {
+		return (ip1 + "-" + ip2 + "-" + alert_type);
+	}
+
+	@SuppressWarnings("unchecked")
+	public Map<String, JSONObject> alert(JSONObject raw_message) {
+
+		Map<String, JSONObject> alerts = new HashMap<String, JSONObject>();
+
+		JSONObject content = (JSONObject) raw_message.get("message");
+		
+		if (!content.containsKey("ip_src_addr") || !content.containsKey("ip_dst_addr") ) {
+
+			int alert_type = 0;
+
+			JSONObject alert = new JSONObject();
+
+			alert.put("title", "IP Check Error Type: : " + alert_type);
+			alert.put("priority", "1");
+			alert.put("type", "error");
+			alert.put("designated_host", "Uknown");
+			alert.put("source", "NA");
+			alert.put("dest", "NA");
+			alert.put(
+					"body",
+					"Source or destination IP is missing");
+
+			String alert_id = UUID.randomUUID().toString();
+
+			alert.put("reference_id", alert_id);
+			alerts.put(alert_id, alert);
+
+			LOG.trace("[OpenSOC] Returning alert: " + alerts);
+			
+			return alerts;
+
+		}
+
+		String source_ip = content.get("ip_src_addr").toString();
+		String dst_ip = content.get("ip_dst_addr").toString();
+
+		if (source_ip == null && dst_ip == null) {
+
+			int alert_type = 1;
+
+			JSONObject alert = new JSONObject();
+
+			alert.put("title", "IP Check Error Type: : " + alert_type);
+			alert.put("priority", "1");
+			alert.put("type", "error");
+			alert.put("designated_host", "Uknown");
+			alert.put("source", source_ip);
+			alert.put("dest", dst_ip);
+			alert.put(
+					"body",
+					"This communication does not contain a source or destination IP string. Communication between two IPs: "
+							+ source_ip + " -> " + dst_ip);
+
+			String alert_id = generateAlertId(source_ip, dst_ip, alert_type);
+
+			alert.put("reference_id", alert_id);
+			alerts.put(alert_id, alert);
+			
+			LOG.trace("[OpenSOC] Returning alert: " + alerts);
+
+			return alerts;
+
+		}
+
+		if (!ipvalidator.isValidInet4Address(source_ip)
+				&& !ipvalidator.isValidInet4Address(dst_ip)) {
+			int alert_type = 2;
+
+			JSONObject alert = new JSONObject();
+
+			alert.put("title", "IP Check Error Type: : " + alert_type);
+			alert.put("priority", "1");
+			alert.put("type", "error");
+			alert.put("designated_host", "Uknown");
+			alert.put("source", source_ip);
+			alert.put("dest", dst_ip);
+			alert.put(
+					"content",
+					"This communication contains souce and destination IP strings, but these strings are not valid. Communication between two IPs: "
+							+ source_ip + " -> " + dst_ip);
+
+			String alert_id = generateAlertId(source_ip, dst_ip, alert_type);
+
+			alert.put("reference_id", alert_id);
+			alerts.put(alert_id, alert);
+			
+			LOG.trace("[OpenSOC] Returning alert: " + alerts);
+
+			return alerts;
+
+		}
+
+		String designated_host = null;
+
+		if (loaded_whitelist.contains(source_ip))
+			designated_host = source_ip;
+		else if (loaded_whitelist.contains(dst_ip))
+			designated_host = dst_ip;
+		
+
+		if (designated_host == null) {
+			int alert_type = 3;
+
+			JSONObject alert = new JSONObject();
+
+			alert.put("title", "IP Check Error Type: : " + alert_type);
+			alert.put("priority", "1");
+			alert.put("type", "error");
+			alert.put("designated_host", "Uknown");
+			alert.put("source", source_ip);
+			alert.put("dest", dst_ip);
+			alert.put(
+					"content",
+					"This communication does not contain a source or a destination IP that is in the white list. Communication between two IPs: "
+							+ source_ip + " -> " + dst_ip);
+
+			String alert_id = generateAlertId(source_ip, dst_ip, alert_type);
+
+			alert.put("reference_id", alert_id);
+			alerts.put(alert_id, alert);
+			
+			LOG.trace("[OpenSOC] Returning alert: " + alerts);
+
+			return alerts;
+
+		}
+
+		if (source_ip.equals(designated_host)
+				&& !ipvalidator.isValidInet4Address(dst_ip)) {
+			int alert_type = 4;
+
+			JSONObject alert = new JSONObject();
+
+			alert.put("title", "IP Check Error Type: : " + alert_type);
+			alert.put("priority", "1");
+			alert.put("type", "error");
+			alert.put("designated_host", designated_host);
+			alert.put("source", source_ip);
+			alert.put("dest", dst_ip);
+			alert.put(
+					"content",
+					"This communication contains an IP that is not valid. Communication between two IPs: "
+							+ source_ip + " -> " + dst_ip);
+
+			String alert_id = generateAlertId(source_ip, dst_ip, alert_type);
+
+			alert.put("reference_id", alert_id);
+			alerts.put(alert_id, alert);
+
+		}
+
+		if (dst_ip.equals(designated_host)
+				&& !ipvalidator.isValidInet4Address(source_ip)) {
+			int alert_type = 5;
+
+			JSONObject alert = new JSONObject();
+
+			alert.put("title", "IP Check Error Type: : " + alert_type);
+			alert.put("priority", "1");
+			alert.put("type", "error");
+			alert.put("designated_host", designated_host);
+			alert.put("source", source_ip);
+			alert.put("dest", dst_ip);
+			alert.put(
+					"content",
+					"This communication contains IP that is not valid. Communication between two IPs: "
+							+ source_ip + " -> " + dst_ip);
+
+			String alert_id = generateAlertId(source_ip, dst_ip, alert_type);
+
+			alert.put("reference_id", alert_id);
+			alerts.put(alert_id, alert);
+
+		}
+
+		if (loaded_blacklist.contains(source_ip)) {
+			int alert_type = 6;
+
+			JSONObject alert = new JSONObject();
+
+			alert.put("title", "IP Check Error Type: : " + alert_type);
+			alert.put("priority", "1");
+			alert.put("type", "error");
+			alert.put("designated_host", designated_host);
+			alert.put("source", source_ip);
+			alert.put("dest", dst_ip);
+			alert.put(
+					"content",
+					"This communication contains IP that is black listed. Communication between two IPs: "
+							+ source_ip + " -> " + dst_ip);
+
+			String alert_id = generateAlertId(source_ip, dst_ip, alert_type);
+
+			alert.put("reference_id", alert_id);
+			alerts.put(alert_id, alert);
+
+		}
+
+		if (loaded_blacklist.contains(dst_ip)) {
+			int alert_type = 7;
+
+			JSONObject alert = new JSONObject();
+
+			alert.put("title", "IP Check Error Type: : " + alert_type);
+			alert.put("priority", "1");
+			alert.put("type", "error");
+			alert.put("designated_host", designated_host);
+			alert.put("source", source_ip);
+			alert.put("dest", dst_ip);
+			alert.put(
+					"content",
+					"This communication contains IP that is black listed. Communication between two IPs: "
+							+ source_ip + " -> " + dst_ip);
+
+			String alert_id = generateAlertId(source_ip, dst_ip, alert_type);
+
+			alert.put("reference_id", alert_id);
+			alerts.put(alert_id, alert);
+
+		}
+
+		if (alerts.isEmpty())
+			return null;
+		else
+			return alerts;
+	}
+
+
+
+	public boolean containsAlertId(String alert) {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/AbstractTaggerBolt.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/AbstractTaggerBolt.java b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/AbstractTaggerBolt.java
new file mode 100644
index 0000000..e22c3cf
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/AbstractTaggerBolt.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.tagging;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+
+import com.codahale.metrics.Counter;
+import com.opensoc.alerts.interfaces.TaggerAdapter;
+import com.opensoc.metrics.MetricReporter;
+
+@SuppressWarnings("rawtypes")
+public abstract class AbstractTaggerBolt extends BaseRichBolt {
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -6710596708304282838L;
+
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractTaggerBolt.class);
+
+	protected OutputCollector _collector;
+	protected TaggerAdapter _adapter;
+
+	protected String OutputFieldName;
+	protected JSONObject _identifier;
+	protected MetricReporter _reporter;
+	
+	protected Counter ackCounter, emitCounter, failCounter;
+
+	protected void registerCounters() {
+
+		String ackString = _adapter.getClass().getSimpleName() + ".ack";
+
+		String emitString = _adapter.getClass().getSimpleName() + ".emit";
+
+		String failString = _adapter.getClass().getSimpleName() + ".fail";
+
+		ackCounter = _reporter.registerCounter(ackString);
+		emitCounter = _reporter.registerCounter(emitString);
+		failCounter = _reporter.registerCounter(failString);
+
+	}
+
+	public final void prepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) {
+		_collector = collector;
+		
+		if (this._adapter == null)
+			throw new IllegalStateException("Tagging must be specified");
+		if(this._identifier == null)
+			throw new IllegalStateException("Identifier must be specified");
+		try {
+			doPrepare(conf, topologyContext, collector);
+		} catch (IOException e) {
+			LOG.error("Counld not initialize...");
+			e.printStackTrace();
+		}
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declearer) {
+		declearer.declare(new Fields(this.OutputFieldName));
+	}
+
+	abstract void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/TelemetryTaggerBolt.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/TelemetryTaggerBolt.java b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/TelemetryTaggerBolt.java
new file mode 100644
index 0000000..a31e1b7
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/TelemetryTaggerBolt.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.opensoc.alerts.interfaces.TaggerAdapter;
+import com.opensoc.json.serialization.JSONEncoderHelper;
+import com.opensoc.metrics.MetricReporter;
+
+@SuppressWarnings("rawtypes")
+public class TelemetryTaggerBolt extends AbstractTaggerBolt {
+
+	/**
+	 * Use an adapter to tag existing telemetry messages with alerts. The list
+	 * of available tagger adapters is located under
+	 * com.opensoc.tagging.adapters. At the time of the release the following
+	 * adapters are available:
+	 * 
+	 * <p>
+	 * <ul>
+	 * <li>RegexTagger = read a list or regular expressions and tag a message if
+	 * they exist in a message
+	 * <li>StaticAllTagger = tag each message with a static alert
+	 * <ul>
+	 * <p>
+	 */
+	private static final long serialVersionUID = -2647123143398352020L;
+	private Properties metricProperties;
+	private JSONObject metricConfiguration;
+
+	/**
+	 * 
+	 * @param tagger
+	 *            - tagger adapter for generating alert tags
+	 * @return instance of bolt
+	 */
+	public TelemetryTaggerBolt withMessageTagger(TaggerAdapter tagger) {
+		_adapter = tagger;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param OutputFieldName
+	 *            - output name of the tuple coming out of this bolt
+	 * @return - instance of this bolt
+	 */
+	public TelemetryTaggerBolt withOutputFieldName(String OutputFieldName) {
+		this.OutputFieldName = OutputFieldName;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param metricProperties
+	 *            - metric output to graphite
+	 * @return - instance of this bolt
+	 */
+	public TelemetryTaggerBolt withMetricProperties(Properties metricProperties) {
+		this.metricProperties = metricProperties;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param identifier
+	 *            - the identifier tag for tagging telemetry messages with
+	 *            alerts out of this bolt
+	 * @return - instance of this bolt
+	 */
+
+	public TelemetryTaggerBolt withIdentifier(JSONObject identifier) {
+		this._identifier = identifier;
+		return this;
+	}
+	
+	/**
+	 * @param config
+	 *            A class for generating custom metrics into graphite
+	 * @return Instance of this class
+	 */
+
+	public TelemetryTaggerBolt withMetricConfiguration(Configuration config) {
+		this.metricConfiguration = JSONEncoderHelper.getJSON(config
+				.subset("com.opensoc.metrics"));
+		return this;
+	}
+
+	@Override
+	void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException {
+
+		LOG.info("[OpenSOC] Preparing TelemetryParser Bolt...");
+
+		try {
+			_reporter = new MetricReporter();
+			_reporter.initialize(metricProperties, TelemetryTaggerBolt.class);
+			LOG.info("[OpenSOC] Initialized metrics");
+		} catch (Exception e) {
+			LOG.info("[OpenSOC] Could not initialize metrics");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public void execute(Tuple tuple) {
+
+		LOG.trace("[OpenSOC] Starting to process message for alerts");
+		JSONObject original_message = null;
+
+		try {
+
+			original_message = (JSONObject) tuple.getValue(0);
+
+			if (original_message == null || original_message.isEmpty())
+				throw new Exception("Could not parse message from byte stream");
+
+			LOG.trace("[OpenSOC] Received tuple: " + original_message);
+
+			JSONObject alerts_tag = new JSONObject();
+			JSONArray alerts_list = _adapter.tag(original_message);
+
+			LOG.trace("[OpenSOC] Tagged message: " + alerts_list);
+
+			if (alerts_list.size() != 0) {
+				if (original_message.containsKey("alerts")) {
+					JSONObject tag = (JSONObject) original_message
+							.get("alerts");
+					JSONArray already_triggered = (JSONArray) tag
+							.get("triggered");
+					alerts_list.addAll(already_triggered);
+					LOG.trace("[OpenSOC] Created a new string of alerts");
+				}
+
+				alerts_tag.put("identifier", _identifier);
+				alerts_tag.put("triggered", alerts_list);
+				original_message.put("alerts", alerts_tag);
+				
+				LOG.debug("[OpenSOC] Detected alerts: " + alerts_tag);
+			}
+			else
+			{
+				LOG.debug("[OpenSOC] The following messages did not contain alerts: " + original_message);
+			}
+
+			_collector.ack(tuple);
+			_collector.emit(new Values(original_message));
+			
+			/*if (metricConfiguration != null) {
+				emitCounter.inc();
+				ackCounter.inc();
+			}*/
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			LOG.error("Failed to tag message :" + original_message);
+			e.printStackTrace();
+			_collector.fail(tuple);
+			
+			/*
+			if (metricConfiguration != null) {
+				failCounter.inc();
+			}*/
+		}
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declearer) {
+		declearer.declare(new Fields(this.OutputFieldName));
+
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/AbstractTaggerAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/AbstractTaggerAdapter.java b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/AbstractTaggerAdapter.java
new file mode 100644
index 0000000..2ec6377
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/AbstractTaggerAdapter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging.adapters;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.alerts.interfaces.TaggerAdapter;
+
+@SuppressWarnings("serial")
+public abstract class AbstractTaggerAdapter implements TaggerAdapter, Serializable{
+	
+	protected static final Logger _LOG = LoggerFactory
+			.getLogger(AbstractTaggerAdapter.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/RegexTagger.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/RegexTagger.java b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/RegexTagger.java
new file mode 100644
index 0000000..2d8109f
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/RegexTagger.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging.adapters;
+
+import java.util.Map;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class RegexTagger extends AbstractTaggerAdapter{
+	
+	/**
+	 * Reads a regex rules file and tags a message with alerts if any rule from that file
+	 * matches anything in the telemetry message
+	 */
+	private static final long serialVersionUID = -6091495636459799411L;
+	Map <String, JSONObject> _rules;
+	
+	/**
+	 * 
+	 * @param rules rules read from a properties XML file
+	 */
+	public RegexTagger(Map<String, JSONObject> rules)
+	{
+		_rules = rules;
+	}
+
+	/**
+	 * @param raw_message telemetry message to be tagged
+	 */
+	@SuppressWarnings("unchecked")
+	public JSONArray tag(JSONObject raw_message) {
+
+		JSONArray ja = new JSONArray();
+		String message_as_string = raw_message.toString();
+		
+		for(String rule : _rules.keySet())
+		{		
+			if (message_as_string.matches(rule))
+			{
+				ja.add(_rules.get(rule));
+			}
+		}	
+		
+		return ja;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/StaticAllTagger.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/StaticAllTagger.java b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/StaticAllTagger.java
new file mode 100644
index 0000000..67c6c45
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/StaticAllTagger.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging.adapters;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class StaticAllTagger extends AbstractTaggerAdapter {
+
+	/**
+	 * Attaches a static alerts tag to JSON telemetry messages
+	 */
+	private static final long serialVersionUID = 7759427661169094065L;
+	private JSONObject _static_tag_message;
+	JSONArray ja = new JSONArray();
+
+	/**
+	 * 
+	 * @param static_tag_message
+	 *            static alerts tag to attach to the message as a JSON
+	 */
+	@SuppressWarnings("unchecked")
+	public StaticAllTagger(JSONObject static_tag_message) {
+		_static_tag_message = static_tag_message;
+		ja.add(_static_tag_message);
+	}
+
+	/**
+	 * @param raw_message
+	 *            message to tag
+	 */
+	public JSONArray tag(JSONObject raw_message) {
+
+		return ja;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/TaggerAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/TaggerAdapter.java b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/TaggerAdapter.java
new file mode 100644
index 0000000..9fc11d7
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/main/java/com/opensoc/tagging/adapters/TaggerAdapter.java
@@ -0,0 +1,9 @@
+package com.opensoc.tagging.adapters;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public interface TaggerAdapter {
+
+	JSONArray tag(JSONObject raw_message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Alerts/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/main/resources/hbase-site.xml b/opensoc-streaming/OpenSOC-Alerts/src/main/resources/hbase-site.xml
new file mode 100644
index 0000000..dc7cba5
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/main/resources/hbase-site.xml
@@ -0,0 +1,90 @@
+<!--Tue Feb 11 02:34:08 2014 -->
+<configuration>
+
+	<property>
+		<name>hbase.regionserver.global.memstore.lowerLimit</name>
+		<value>0.38</value>
+	</property>
+	<property>
+		<name>zookeeper.session.timeout</name>
+		<value>20</value>
+	</property>
+
+	<property>
+		<name>hbase.security.authorization</name>
+		<value>false</value>
+	</property>
+	<property>
+		<name>hbase.cluster.distributed</name>
+		<value>true</value>
+	</property>
+	
+	<property>
+		<name>hbase.hstore.flush.retries.number</name>
+		<value>120</value>
+	</property>
+	<property>
+		<name>hbase.hregion.memstore.block.multiplier</name>
+		<value>4</value>
+	</property>
+	<property>
+		<name>hbase.hstore.blockingStoreFiles</name>
+		<value>200</value>
+	</property>
+	<property>
+		<name>hbase.defaults.for.version.skip</name>
+		<value>true</value>
+	</property>
+	<property>
+		<name>hbase.regionserver.global.memstore.upperLimit</name>
+		<value>0.4</value>
+	</property>
+	<property>
+		<name>hbase.hregion.memstore.mslab.enabled</name>
+		<value>true</value>
+	</property>
+	<property>
+		<name>hbase.client.keyvalue.maxsize</name>
+		<value>10485760</value>
+	</property>
+	<property>
+		<name>hbase.superuser</name>
+		<value>hbase</value>
+	</property>
+	<property>
+		<name>hfile.block.cache.size</name>
+		<value>0.40</value>
+	</property>
+	<property>
+		<name>zookeeper.znode.parent</name>
+		<value>/hbase-unsecure</value>
+	</property>
+	<property>
+		<name>hbase.hregion.max.filesize</name>
+		<value>10737418240</value>
+	</property>
+	<property>
+		<name>hbase.zookeeper.property.clientPort</name>
+		<value>2181</value>
+	</property>
+	<property>
+		<name>hbase.security.authentication</name>
+		<value>simple</value>
+	</property>
+	<property>
+		<name>hbase.client.scanner.caching</name>
+		<value>100</value>
+	</property>
+	<property>
+		<name>hbase.hregion.memstore.flush.size</name>
+		<value>134217728</value>
+	</property>
+	<property>
+		<name>hbase.hregion.majorcompaction</name>
+		<value>86400000</value>
+	</property>
+	<property>
+		<name>hbase.client.write.buffer</name>
+		<value>500000000</value>
+	</property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/pom.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/pom.xml b/opensoc-streaming/OpenSOC-Common/pom.xml
new file mode 100644
index 0000000..582093d
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/pom.xml
@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.3BETA-SNAPSHOT</version>
+	</parent>
+	<artifactId>OpenSOC-Common</artifactId>
+	<name>OpenSOC-Common</name>
+	<description>Components common to all enrichments</description>
+	<properties>
+		<kafka.version>0.8.0</kafka.version>
+		<commons.config.version>1.10</commons.config.version>
+		<hbase.version>0.98.5-hadoop2</hbase.version>
+	</properties>
+	<repositories>
+		<repository>
+			<id>Kraken-Repo</id>
+			<name>Kraken Repository</name>
+			<url>http://download.krakenapps.org</url>
+		</repository>
+	</repositories>
+	<dependencies>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>${global_json_simple_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${global_storm_version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.9.2</artifactId>
+			<version>0.8.0</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.codahale.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${global_metrics_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.codahale.metrics</groupId>
+			<artifactId>metrics-graphite</artifactId>
+			<version>${global_metrics_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-configuration</groupId>
+			<artifactId>commons-configuration</artifactId>
+			<version>${commons.config.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.krakenapps</groupId>
+			<artifactId>kraken-pcap</artifactId>
+			<version>1.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.2</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-client</artifactId>
+			<version>${hbase.version}</version>
+		</dependency>
+	</dependencies>
+
+	<reporting>
+		<plugins>
+			<!-- Normally, dependency report takes time, skip it -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-project-info-reports-plugin</artifactId>
+				<version>2.7</version>
+
+				<configuration>
+					<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>emma-maven-plugin</artifactId>
+				<version>1.0-alpha-3</version>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-pmd-plugin</artifactId>
+				<configuration>
+					<targetJdk>1.7</targetJdk>
+				</configuration>
+			</plugin>
+		</plugins>
+	</reporting>
+	<build>
+		<resources>
+			<resource>
+				<directory>src/main/resources</directory>
+			</resource>
+		</resources>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/pom.xml.versionsBackup b/opensoc-streaming/OpenSOC-Common/pom.xml.versionsBackup
new file mode 100644
index 0000000..8ead949
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/pom.xml.versionsBackup
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+	<artifactId>OpenSOC-Common</artifactId>
+	<name>OpenSOC-Common</name>
+	<description>Components common to all enrichments</description>
+	<properties>
+		<json.simple.version>1.1.1</json.simple.version>
+
+		<storm.version>0.9.2-incubating</storm.version>
+		<kafka.version>0.8.0</kafka.version>
+		<metrics.version>3.0.2</metrics.version>
+		<commons.config.version>1.10</commons.config.version>
+		<hbase.version>0.98.5-hadoop2</hbase.version>
+	</properties>
+	<repositories>
+		<repository>
+			<id>Kraken-Repo</id>
+			<name>Kraken Repository</name>
+			<url>http://download.krakenapps.org</url>
+		</repository>
+	</repositories>
+	<dependencies>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>${json.simple.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${storm.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.9.2</artifactId>
+			<version>0.8.0</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.codahale.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.codahale.metrics</groupId>
+			<artifactId>metrics-graphite</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-configuration</groupId>
+			<artifactId>commons-configuration</artifactId>
+			<version>${commons.config.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.krakenapps</groupId>
+			<artifactId>kraken-pcap</artifactId>
+			<version>1.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.2</version>
+		</dependency>
+			<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-client</artifactId>
+			<version>${hbase.version}</version>
+		</dependency>
+	</dependencies>
+
+	<reporting>
+		<plugins>
+			<!-- Normally, dependency report takes time, skip it -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-project-info-reports-plugin</artifactId>
+				<version>2.7</version>
+
+				<configuration>
+					<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>emma-maven-plugin</artifactId>
+				<version>1.0-alpha-3</version>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-pmd-plugin</artifactId>
+				<configuration>
+					<targetJdk>1.7</targetJdk>
+				</configuration>
+			</plugin>
+		</plugins>
+	</reporting>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/AlertsAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/AlertsAdapter.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/AlertsAdapter.java
new file mode 100644
index 0000000..58567a6
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/AlertsAdapter.java
@@ -0,0 +1,16 @@
+package com.opensoc.alerts.interfaces;
+
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+
+public interface AlertsAdapter {
+
+	boolean initialize();
+
+	boolean refresh() throws Exception;
+
+	Map<String, JSONObject> alert(JSONObject raw_message);
+
+	boolean containsAlertId(String alert);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/AlertsInterface.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/AlertsInterface.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/AlertsInterface.java
new file mode 100644
index 0000000..e5e32b7
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/AlertsInterface.java
@@ -0,0 +1,11 @@
+package com.opensoc.alerts.interfaces;
+
+import org.json.simple.JSONObject;
+
+public interface AlertsInterface {
+
+	public JSONObject getContent();
+	public void setContent(JSONObject content);
+	public String getUuid();
+	public void setUuid(String uuid);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/TaggerAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/TaggerAdapter.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/TaggerAdapter.java
new file mode 100644
index 0000000..79dc0d6
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/alerts/interfaces/TaggerAdapter.java
@@ -0,0 +1,9 @@
+package com.opensoc.alerts.interfaces;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public interface TaggerAdapter {
+
+	JSONArray tag(JSONObject raw_message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/enrichment/interfaces/EnrichmentAdapter.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/enrichment/interfaces/EnrichmentAdapter.java
new file mode 100644
index 0000000..a6fdd85
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/enrichment/interfaces/EnrichmentAdapter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.interfaces;
+
+import org.json.simple.JSONObject;
+
+public interface EnrichmentAdapter
+{
+	JSONObject enrich(String metadata);
+	boolean initializeAdapter();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseBolt.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseBolt.java
new file mode 100644
index 0000000..9c8f604
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseBolt.java
@@ -0,0 +1,126 @@
+package com.opensoc.hbase;
+
+
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+
+import com.opensoc.topologyhelpers.ErrorGenerator;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+/**
+ * A Storm bolt for putting data into HBase.
+ * <p>
+ * By default works in batch mode by enabling HBase's client-side write buffer. Enabling batch mode
+ * is recommended for high throughput, but it can be disabled in {@link TupleTableConfig}.
+ * <p>
+ * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
+ * classpath
+ * @see TupleTableConfig
+ * @see HTableConnector
+ */
+@SuppressWarnings("serial")
+public class HBaseBolt implements IRichBolt {
+  private static final Logger LOG = Logger.getLogger(HBaseBolt.class);
+
+  protected OutputCollector collector;
+  protected HTableConnector connector;
+  protected TupleTableConfig conf;
+  protected boolean autoAck = true;
+  
+  private String _quorum;
+  private String _port;
+
+  public HBaseBolt(TupleTableConfig conf, String quorum, String port) {
+    this.conf = conf;
+    _quorum = quorum;
+    _port = port;
+
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("rawtypes")
+  
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+
+    try {
+      this.connector = new HTableConnector(conf, _quorum, _port);
+
+		
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    LOG.info("Preparing HBaseBolt for table: " + this.conf.getTableName());
+  }
+
+  /** {@inheritDoc} */
+  
+  public void execute(Tuple input) {
+    try {
+      this.connector.getTable().put(conf.getPutFromTuple(input));
+    } catch (IOException ex) {
+    	
+        String error_as_string = org.apache.commons.lang.exception.ExceptionUtils
+  				.getStackTrace(ex);
+
+  		JSONObject error = ErrorGenerator.generateErrorMessage(
+  				"Alerts problem: " + input.getBinary(0), error_as_string);
+  		collector.emit("error", new Values(error));
+  		
+      throw new RuntimeException(ex);
+    }
+
+    if (this.autoAck) {
+      this.collector.ack(input);
+    }
+  }
+
+  /** {@inheritDoc} */
+  
+  public void cleanup() {
+    this.connector.close();
+  }
+
+  /** {@inheritDoc} */
+  
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+	  declarer.declareStream("error", new Fields("HBase"));
+  }
+
+  /** {@inheritDoc} */
+  
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  /**
+   * @return the autoAck
+   */
+  public boolean isAutoAck() {
+    return autoAck;
+  }
+
+  /**
+   * @param autoAck the autoAck to set
+   */
+  public void setAutoAck(boolean autoAck) {
+    this.autoAck = autoAck;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseStreamPartitioner.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseStreamPartitioner.java
new file mode 100644
index 0000000..4070db7
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseStreamPartitioner.java
@@ -0,0 +1,146 @@
+package com.opensoc.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HTable;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+
+public class HBaseStreamPartitioner implements CustomStreamGrouping {
+
+  private static final long serialVersionUID = -148324019395976092L;
+  private String[] regionStartKeys = { "0" };
+  private Map<String, String> regionStartKeyRegionNameMap = new HashMap<String, String>();
+
+  private List<Integer> targetTasks = null;
+  private int targetTasksSize = 0;
+  private int rowKeyFieldIndex = 0;
+  private String tableName = null;
+  private long regionCheckTime = 0;
+  private int regionInforRefreshIntervalInMins = 60;
+  private int regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
+
+  HTable hTable = null;;
+
+  
+  public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+    
+    System.out.println("preparing HBaseStreamPartitioner for streamId " + stream.get_streamId());
+    this.targetTasks = targetTasks;
+    this.targetTasksSize = this.targetTasks.size();
+
+    Configuration conf = HBaseConfiguration.create();
+    try {
+      hTable = new HTable(conf, tableName);
+      refreshRegionInfo(tableName);
+
+      System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+  
+  public void prepare() {
+    
+    System.out.println("preparing HBaseStreamPartitioner for streamId " );
+
+    Configuration conf = HBaseConfiguration.create();
+    try {
+      hTable = new HTable(conf, tableName);
+      refreshRegionInfo(tableName);
+
+      System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  public HBaseStreamPartitioner(String tableName, int rowKeyFieldIndex, int regionInforRefreshIntervalInMins) {
+    System.out.println("Created HBaseStreamPartitioner ");
+    this.rowKeyFieldIndex = rowKeyFieldIndex;
+    this.tableName = tableName;
+    this.regionInforRefreshIntervalInMins = regionInforRefreshIntervalInMins;
+    this.regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
+
+  }
+
+  
+  public List<Integer> chooseTasks(int taskId, List<Object> values) {
+    List<Integer> choosenTasks = null;
+    System.out.println("Choosing task for taskId " + taskId + " and values " + values);
+
+    if (regionInforRefreshIntervalInMillis > (System.currentTimeMillis() - regionCheckTime)) {
+      try {
+        refreshRegionInfo(tableName);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    int regionIndex = getRegionIndex((String) values.get(rowKeyFieldIndex));
+
+    if (regionIndex < targetTasksSize) {
+      choosenTasks = Arrays.asList(regionIndex);
+
+    } else {
+      choosenTasks = Arrays.asList(regionIndex % targetTasksSize);
+    }
+    System.out.println("Choosen tasks are " + choosenTasks);
+
+    return choosenTasks;
+
+
+  }
+
+  
+  
+  public int getRegionIndex(String key) {
+    int index = Arrays.binarySearch(regionStartKeys, key);
+    if (index < -1) {
+      index = (index + 2) * -1;
+    } else if (index == -1) {
+      index = 0;
+    }
+
+    return index;
+  }
+
+  private void refreshRegionInfo(String tableName) throws IOException {
+
+    System.out.println("in refreshRegionInfo ");
+
+    Map<HRegionInfo, ServerName> regionMap = hTable.getRegionLocations();
+
+    synchronized (regionStartKeys) {
+      synchronized (regionStartKeyRegionNameMap) {
+        regionStartKeys = new String[regionMap.size()];
+        int index = 0;
+        String startKey = null;
+        regionStartKeyRegionNameMap.clear();
+        for (HRegionInfo regionInfo : regionMap.keySet()) {
+          startKey = new String(regionInfo.getStartKey());
+          regionStartKeyRegionNameMap.put(startKey, regionInfo.getRegionNameAsString());
+          regionStartKeys[index] = startKey;
+          index++;
+        }
+
+        Arrays.sort(regionStartKeys);
+        regionCheckTime = System.currentTimeMillis();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HTableConnector.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HTableConnector.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HTableConnector.java
new file mode 100644
index 0000000..d0aa0b4
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HTableConnector.java
@@ -0,0 +1,106 @@
+package com.opensoc.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import backtype.storm.generated.Bolt;
+
+/**
+ * HTable connector for Storm {@link Bolt}
+ * <p>
+ * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
+ * classpath
+ */
+@SuppressWarnings("serial")
+public class HTableConnector implements Serializable {
+  private static final Logger LOG = Logger.getLogger(HTableConnector.class);
+
+  private Configuration conf;
+  protected HTable table;
+  private String tableName;
+
+  /**
+   * Initialize HTable connection
+   * @param conf The {@link TupleTableConfig}
+   * @throws IOException
+   */
+  public HTableConnector(final TupleTableConfig conf, String _quorum, String _port) throws IOException {
+    this.tableName = conf.getTableName();
+    this.conf = HBaseConfiguration.create();
+    
+    if(_quorum != null && _port != null)
+    {
+    	this.conf.set("hbase.zookeeper.quorum", _quorum);
+    	this.conf.set("hbase.zookeeper.property.clientPort", _port);
+    }
+
+    LOG.info(String.format("Initializing connection to HBase table %s at %s", tableName,
+      this.conf.get("hbase.rootdir")));
+
+    try {
+      this.table = new HTable(this.conf, this.tableName);
+    } catch (IOException ex) {
+      throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
+    }
+
+    if (conf.isBatch()) {
+      // Enable client-side write buffer
+      this.table.setAutoFlush(false, true);
+      LOG.info("Enabled client-side write buffer");
+    }
+
+    // If set, override write buffer size
+    if (conf.getWriteBufferSize() > 0) {
+      try {
+        this.table.setWriteBufferSize(conf.getWriteBufferSize());
+
+        LOG.info("Setting client-side write buffer to " + conf.getWriteBufferSize());
+      } catch (IOException ex) {
+        LOG.error("Unable to set client-side write buffer size for HBase table " + this.tableName,
+          ex);
+      }
+    }
+
+    // Check the configured column families exist
+    for (String cf : conf.getColumnFamilies()) {
+      if (!columnFamilyExists(cf)) {
+        throw new RuntimeException(String.format(
+          "HBase table '%s' does not have column family '%s'", conf.getTableName(), cf));
+      }
+    }
+  }
+
+  /**
+   * Checks to see if table contains the given column family
+   * @param columnFamily The column family name
+   * @return boolean
+   * @throws IOException
+   */
+  private boolean columnFamilyExists(final String columnFamily) throws IOException {
+    return this.table.getTableDescriptor().hasFamily(Bytes.toBytes(columnFamily));
+  }
+
+  /**
+   * @return the table
+   */
+  public HTable getTable() {
+    return table;
+  }
+
+  /**
+   * Close the table
+   */
+  public void close() {
+    try {
+      this.table.close();
+    } catch (IOException ex) {
+      LOG.error("Unable to close connection to HBase table " + tableName, ex);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/TupleTableConfig.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/TupleTableConfig.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/TupleTableConfig.java
new file mode 100644
index 0000000..71f8c9a
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/TupleTableConfig.java
@@ -0,0 +1,279 @@
+package com.opensoc.hbase;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Configuration for Storm {@link Tuple} to HBase serialization.
+ */
+@SuppressWarnings("serial")
+public class TupleTableConfig implements Serializable {
+  
+  public static final long DEFAULT_INCREMENT = 1L;
+  
+  private String tableName;
+  protected String tupleRowKeyField;
+  protected String tupleTimestampField;
+  protected Map<String, Set<String>> columnFamilies;
+  private boolean batch = true;
+  protected Durability durability = Durability.USE_DEFAULT;
+  private long writeBufferSize = 0L;
+  
+  /**
+   * Initialize configuration
+   * 
+   * @param table
+   *          The HBase table name
+   * @param rowKeyField
+   *          The {@link Tuple} field used to set the rowKey
+   */
+  public TupleTableConfig(final String table, final String rowKeyField) {
+    this.tableName = table;
+    this.tupleRowKeyField = rowKeyField;
+    this.tupleTimestampField = "";
+    this.columnFamilies = new HashMap<String, Set<String>>();
+  }
+  
+  /**
+   * Initialize configuration
+   * 
+   * @param table
+   *          The HBase table name
+   * @param rowKeyField
+   *          The {@link Tuple} field used to set the rowKey
+   * @param timestampField
+   *          The {@link Tuple} field used to set the timestamp
+   */
+  public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
+    this.tableName = table;
+    this.tupleRowKeyField = rowKeyField;
+    this.tupleTimestampField = timestampField;
+    this.columnFamilies = new HashMap<String, Set<String>>();
+  }
+  
+  /**
+   * Add column family and column qualifier to be extracted from tuple
+   * 
+   * @param columnFamily
+   *          The column family name
+   * @param columnQualifier
+   *          The column qualifier name
+   */
+  public void addColumn(final String columnFamily, final String columnQualifier) {
+    Set<String> columns = this.columnFamilies.get(columnFamily);
+    
+    if (columns == null) {
+      columns = new HashSet<String>();
+    }
+    columns.add(columnQualifier);
+    
+    this.columnFamilies.put(columnFamily, columns);
+  }
+  
+  /**
+   * Creates a HBase {@link Put} from a Storm {@link Tuple}
+   * 
+   * @param tuple
+   *          The {@link Tuple}
+   * @return {@link Put}
+   */
+  public Put getPutFromTuple(final Tuple tuple) {
+    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+    
+    long ts = 0;
+    if (!tupleTimestampField.equals("")) {
+      ts = tuple.getLongByField(tupleTimestampField);
+    }
+    
+    Put p = new Put(rowKey);
+    
+    p.setDurability(durability);
+    
+    if (columnFamilies.size() > 0) {
+      for (String cf : columnFamilies.keySet()) {
+        byte[] cfBytes = Bytes.toBytes(cf);
+        for (String cq : columnFamilies.get(cf)) {
+          byte[] cqBytes = Bytes.toBytes(cq);
+          byte[] val = tuple.getBinaryByField(cq);
+          
+          if (ts > 0) {
+            p.add(cfBytes, cqBytes, ts, val);
+          } else {
+            p.add(cfBytes, cqBytes, val);
+          }
+        }
+      }
+    }
+    
+    return p;
+  }
+  
+  /**
+   * Creates a HBase {@link Increment} from a Storm {@link Tuple}
+   * 
+   * @param tuple
+   *          The {@link Tuple}
+   * @param increment
+   *          The amount to increment the counter by
+   * @return {@link Increment}
+   */
+  public Increment getIncrementFromTuple(final Tuple tuple, final long increment) {
+    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+    
+    Increment inc = new Increment(rowKey);
+    inc.setDurability(durability);
+    
+    if (columnFamilies.size() > 0) {
+      for (String cf : columnFamilies.keySet()) {
+        byte[] cfBytes = Bytes.toBytes(cf);
+        for (String cq : columnFamilies.get(cf)) {
+          byte[] val;
+          try {
+            val = Bytes.toBytes(tuple.getStringByField(cq));
+          } catch (IllegalArgumentException ex) {
+            // if cq isn't a tuple field, use cq for counter instead of tuple
+            // value
+            val = Bytes.toBytes(cq);
+          }
+          inc.addColumn(cfBytes, val, increment);
+        }
+      }
+    }
+    
+    return inc;
+  }
+  
+  /**
+   * Increment the counter for the given family and column by the specified
+   * amount
+   * <p>
+   * If the family and column already exist in the Increment the counter value
+   * is incremented by the specified amount rather than overridden, as it is in
+   * HBase's {@link Increment#addColumn(byte[], byte[], long)} method
+   * 
+   * @param inc
+   *          The {@link Increment} to update
+   * @param family
+   *          The column family
+   * @param qualifier
+   *          The column qualifier
+   * @param amount
+   *          The amount to increment the counter by
+   */
+  public static void addIncrement(Increment inc, final byte[] family, final byte[] qualifier, final Long amount) {
+    
+    NavigableMap<byte[], Long> set = inc.getFamilyMapOfLongs().get(family);
+    if (set == null) {
+      set = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    }
+    
+    // If qualifier exists, increment amount
+    Long counter = set.get(qualifier);
+    if (counter == null) {
+      counter = 0L;
+    }
+    set.put(qualifier, amount + counter);
+    
+    inc.getFamilyMapOfLongs().put(family, set);
+  }
+  
+  /**
+   * @return the tableName
+   */
+  public String getTableName() {
+    return tableName;
+  }
+  
+  /**
+   * @return Whether batch mode is enabled
+   */
+  public boolean isBatch() {
+    return batch;
+  }
+  
+  /**
+   * @param batch
+   *          Whether to enable HBase's client-side write buffer.
+   *          <p>
+   *          When enabled your bolt will store put operations locally until the
+   *          write buffer is full, so they can be sent to HBase in a single RPC
+   *          call. When disabled each put operation is effectively an RPC and
+   *          is sent straight to HBase. As your bolt can process thousands of
+   *          values per second it is recommended that the write buffer is
+   *          enabled.
+   *          <p>
+   *          Enabled by default
+   */
+  public void setBatch(boolean batch) {
+    this.batch = batch;
+  }
+  
+  /**
+   * @param setDurability
+   *          Sets whether to write to HBase's edit log.
+   *          <p>
+   *          Setting to false will mean fewer operations to perform when
+   *          writing to HBase and hence better performance, but changes that
+   *          haven't been flushed to a store file will be lost in the event of
+   *          HBase failure
+   *          <p>
+   *          Enabled by default
+   */
+  public void setDurability(Durability durability) {
+    this.durability = durability;
+  }
+  
+  
+  public Durability getDurability() {
+    return  durability;
+  }
+  
+  /**
+   * @param writeBufferSize
+   *          Overrides the client-side write buffer size.
+   *          <p>
+   *          By default the write buffer size is 2 MB (2097152 bytes). If you
+   *          are storing larger data, you may want to consider increasing this
+   *          value to allow your bolt to efficiently group together a larger
+   *          number of records per RPC
+   *          <p>
+   *          Overrides the write buffer size you have set in your
+   *          hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
+   */
+  public void setWriteBufferSize(long writeBufferSize) {
+    this.writeBufferSize = writeBufferSize;
+  }
+  
+  /**
+   * @return the writeBufferSize
+   */
+  public long getWriteBufferSize() {
+    return writeBufferSize;
+  }
+  
+  /**
+   * @return A Set of configured column families
+   */
+  public Set<String> getColumnFamilies() {
+    return this.columnFamilies.keySet();
+  }
+  
+  /**
+   * @return the tupleRowKeyField
+   */
+  public String getTupleRowKeyField() {
+    return tupleRowKeyField;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/index/interfaces/IndexAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/index/interfaces/IndexAdapter.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/index/interfaces/IndexAdapter.java
new file mode 100644
index 0000000..1f88342
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/index/interfaces/IndexAdapter.java
@@ -0,0 +1,11 @@
+package com.opensoc.index.interfaces;
+
+import org.json.simple.JSONObject;
+
+public interface IndexAdapter {
+
+	boolean initializeConnection(String ip, int port, String cluster_name,
+			String index_name, String document_name, int bulk) throws Exception;
+
+	int bulkIndex(JSONObject raw_message);
+}


Mime
View raw message