metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [49/89] [abbrv] [partial] incubator-metron git commit: Rename all OpenSOC files to Metron
Date Tue, 26 Jan 2016 14:18:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/alerts/adapters/ThreatAlertsAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/alerts/adapters/ThreatAlertsAdapter.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/alerts/adapters/ThreatAlertsAdapter.java
new file mode 100644
index 0000000..c85087d
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/alerts/adapters/ThreatAlertsAdapter.java
@@ -0,0 +1,311 @@
+package com.opensoc.alerts.adapters;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.validator.routines.InetAddressValidator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.opensoc.alerts.interfaces.AlertsAdapter;
+
+@SuppressWarnings("serial")
+public class ThreatAlertsAdapter implements AlertsAdapter, Serializable {
+
+	String enrichment_tag;
+
+	HTableInterface blacklist_table;
+	HTableInterface whitelist_table;
+	InetAddressValidator ipvalidator = new InetAddressValidator();
+	String _whitelist_table_name;
+	String _blacklist_table_name;
+	String _quorum;
+	String _port;
+	String _topologyname;
+	Configuration conf = null;
+
+	Cache<String, String> cache;
+	String _topology_name;
+
+	Set<String> loaded_whitelist = new HashSet<String>();
+	Set<String> loaded_blacklist = new HashSet<String>();
+
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(ThreatAlertsAdapter.class);
+
+	public ThreatAlertsAdapter(Map<String, String> config) {
+		try {
+
+			if (!config.containsKey("whitelist_table_name"))
+				throw new Exception("Whitelist table name is missing");
+
+			_whitelist_table_name = config.get("whitelist_table_name");
+
+			if (!config.containsKey("blacklist_table_name"))
+				throw new Exception("Blacklist table name is missing");
+
+			_blacklist_table_name = config.get("blacklist_table_name");
+
+			if (!config.containsKey("quorum"))
+				throw new Exception("Quorum name is missing");
+
+			_quorum = config.get("quorum");
+
+			if (!config.containsKey("port"))
+				throw new Exception("port name is missing");
+
+			_port = config.get("port");
+
+			if (!config.containsKey("_MAX_CACHE_SIZE_OBJECTS_NUM"))
+				throw new Exception("_MAX_CACHE_SIZE_OBJECTS_NUM name is missing");
+
+			int _MAX_CACHE_SIZE_OBJECTS_NUM = Integer.parseInt(config
+					.get("_MAX_CACHE_SIZE_OBJECTS_NUM"));
+
+			if (!config.containsKey("_MAX_TIME_RETAIN_MINUTES"))
+				throw new Exception("_MAX_TIME_RETAIN_MINUTES name is missing");
+
+			int _MAX_TIME_RETAIN_MINUTES = Integer.parseInt(config
+					.get("_MAX_TIME_RETAIN_MINUTES"));
+
+			cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE_OBJECTS_NUM)
+					.expireAfterWrite(_MAX_TIME_RETAIN_MINUTES, TimeUnit.MINUTES)
+					.build();
+
+			enrichment_tag = config.get("enrichment_tag");
+
+		} catch (Exception e) {
+			System.out.println("Could not initialize alerts adapter");
+			e.printStackTrace();
+			System.exit(0);
+		}
+	}
+
+	@SuppressWarnings("resource")
+    @Override
+	public boolean initialize() {
+
+		conf = HBaseConfiguration.create();
+		// conf.set("hbase.zookeeper.quorum", _quorum);
+		// conf.set("hbase.zookeeper.property.clientPort", _port);
+
+		LOG.trace("[OpenSOC] Connecting to hbase with conf:" + conf);
+		LOG.trace("[OpenSOC] Whitelist table name: " + _whitelist_table_name);
+		LOG.trace("[OpenSOC] Whitelist table name: " + _blacklist_table_name);
+		LOG.trace("[OpenSOC] ZK Client/port: "
+				+ conf.get("hbase.zookeeper.quorum") + " -> "
+				+ conf.get("hbase.zookeeper.property.clientPort"));
+
+		try {
+
+			LOG.trace("[OpenSOC] Attempting to connect to hbase");
+
+			HConnection connection = HConnectionManager.createConnection(conf);
+
+			LOG.trace("[OpenSOC] CONNECTED TO HBASE");
+
+			HBaseAdmin hba = new HBaseAdmin(conf);
+
+			if (!hba.tableExists(_whitelist_table_name))
+				throw new Exception("Whitelist table doesn't exist");
+
+			if (!hba.tableExists(_blacklist_table_name))
+				throw new Exception("Blacklist table doesn't exist");
+
+			whitelist_table = new HTable(conf, _whitelist_table_name);
+
+			LOG.trace("[OpenSOC] CONNECTED TO TABLE: " + _whitelist_table_name);
+			blacklist_table = new HTable(conf, _blacklist_table_name);
+			LOG.trace("[OpenSOC] CONNECTED TO TABLE: " + _blacklist_table_name);
+
+			if (connection == null || whitelist_table == null
+					|| blacklist_table == null)
+				throw new Exception("Unable to initialize hbase connection");
+
+			Scan scan = new Scan();
+
+			ResultScanner rs = whitelist_table.getScanner(scan);
+			try {
+				for (Result r = rs.next(); r != null; r = rs.next()) {
+					loaded_whitelist.add(Bytes.toString(r.getRow()));
+				}
+			} catch (Exception e) {
+				LOG.trace("[OpenSOC] COULD NOT READ FROM HBASE");
+				e.printStackTrace();
+			} finally {
+				rs.close(); // always close the ResultScanner!
+				hba.close();
+			}
+			whitelist_table.close();
+
+			LOG.trace("[OpenSOC] READ IN WHITELIST: " + loaded_whitelist.size());
+
+			scan = new Scan();
+
+			rs = blacklist_table.getScanner(scan);
+			try {
+				for (Result r = rs.next(); r != null; r = rs.next()) {
+					loaded_blacklist.add(Bytes.toString(r.getRow()));
+				}
+			} catch (Exception e) {
+				LOG.trace("[OpenSOC] COULD NOT READ FROM HBASE");
+				e.printStackTrace();
+			} finally {
+				rs.close(); // always close the ResultScanner!
+				hba.close();
+			}
+			blacklist_table.close();
+
+			LOG.trace("[OpenSOC] READ IN WHITELIST: " + loaded_whitelist.size());
+
+			rs.close(); // always close the ResultScanner!
+			hba.close();
+
+			return true;
+		} catch (Exception e) {
+
+			e.printStackTrace();
+		}
+
+		return false;
+
+	}
+
+	@Override
+	public boolean refresh() throws Exception {
+		return true;
+	}
+
+	@SuppressWarnings("unchecked")
+    @Override
+	public Map<String, JSONObject> alert(JSONObject raw_message) {
+
+		System.out.println("LOOKING FOR ENRICHMENT TAG: " + enrichment_tag);
+
+		Map<String, JSONObject> alerts = new HashMap<String, JSONObject>();
+		JSONObject content = (JSONObject) raw_message.get("message");
+
+		JSONObject enrichment = null;
+
+		if (raw_message.containsKey("enrichment"))
+			enrichment = (JSONObject) raw_message.get("enrichment");
+		else
+			return null;
+
+		if (enrichment.containsKey(enrichment_tag)) {
+
+			System.out.println("FOUND TAG: " + enrichment_tag);
+
+			JSONObject threat = (JSONObject) enrichment.get(enrichment_tag);
+
+			int cnt = 0;
+			Object enriched_key = null;
+			
+			for (Object key : threat.keySet()) {
+				JSONObject tmp = (JSONObject) threat.get(key);
+				cnt = cnt + tmp.size();
+				if (tmp.size() > 0)
+					enriched_key = key;
+			}
+
+			if (cnt == 0) {
+				System.out.println("TAG HAS NO ELEMENTS");
+				return null;
+			}
+
+			JSONObject alert = new JSONObject();
+
+			String source = "unknown";
+			String dest = "unknown";
+			String host = "unknown";
+
+			if (content.containsKey("ip_src_addr")) {
+				source = content.get("ip_src_addr").toString();
+
+				if (RangeChecker.checkRange(loaded_whitelist, source))
+					host = source;
+			}
+
+			if (content.containsKey("ip_dst_addr")) {
+				dest = content.get("ip_dst_addr").toString();
+
+				if (RangeChecker.checkRange(loaded_whitelist, dest))
+					host = dest;
+			}
+			
+			JSONObject threatQualifier = (JSONObject) threat.get(enriched_key);
+			
+			alert.put("designated_host", host);
+			String description =
+
+					new StringBuilder()
+					.append("Threat Intelligence match for ")
+					.append(content.get(enriched_key).toString())
+					.append(" from source: ")
+					.append(threatQualifier.keySet().iterator().next().toString())
+					.toString();	
+			alert.put("description", description);
+			alert.put("priority", "MED");
+
+			String alert_id = generateAlertId(source, dest, 0);
+
+			alert.put("alert_id", alert_id);
+			alerts.put(alert_id, alert);
+
+			alert.put("enrichment", enrichment);
+
+			return alerts;
+		} else {
+			System.out.println("DID NOT FIND TAG: " + enrichment_tag);
+			return null;
+		}
+
+	}
+
+	@Override
+	public boolean containsAlertId(String alert) {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	protected String generateAlertId(String source_ip, String dst_ip,
+			int alert_type) {
+
+		String key = makeKey(source_ip, dst_ip, alert_type);
+
+		if (cache.getIfPresent(key) != null)
+			return cache.getIfPresent(key);
+
+		String new_UUID = System.currentTimeMillis() + "-" + UUID.randomUUID();
+
+		cache.put(key, new_UUID);
+		key = makeKey(dst_ip, source_ip, alert_type);
+		cache.put(key, new_UUID);
+
+		return new_UUID;
+
+	}
+
+	private String makeKey(String ip1, String ip2, int alert_type) {
+		return (ip1 + "-" + ip2 + "-" + alert_type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/AbstractTaggerBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/AbstractTaggerBolt.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/AbstractTaggerBolt.java
new file mode 100644
index 0000000..e22c3cf
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/AbstractTaggerBolt.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.tagging;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+
+import com.codahale.metrics.Counter;
+import com.opensoc.alerts.interfaces.TaggerAdapter;
+import com.opensoc.metrics.MetricReporter;
+
+@SuppressWarnings("rawtypes")
+public abstract class AbstractTaggerBolt extends BaseRichBolt {
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -6710596708304282838L;
+
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractTaggerBolt.class);
+
+	protected OutputCollector _collector;
+	protected TaggerAdapter _adapter;
+
+	protected String OutputFieldName;
+	protected JSONObject _identifier;
+	protected MetricReporter _reporter;
+	
+	protected Counter ackCounter, emitCounter, failCounter;
+
+	protected void registerCounters() {
+
+		String ackString = _adapter.getClass().getSimpleName() + ".ack";
+
+		String emitString = _adapter.getClass().getSimpleName() + ".emit";
+
+		String failString = _adapter.getClass().getSimpleName() + ".fail";
+
+		ackCounter = _reporter.registerCounter(ackString);
+		emitCounter = _reporter.registerCounter(emitString);
+		failCounter = _reporter.registerCounter(failString);
+
+	}
+
+	public final void prepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) {
+		_collector = collector;
+		
+		if (this._adapter == null)
+			throw new IllegalStateException("Tagging must be specified");
+		if(this._identifier == null)
+			throw new IllegalStateException("Identifier must be specified");
+		try {
+			doPrepare(conf, topologyContext, collector);
+		} catch (IOException e) {
+			LOG.error("Counld not initialize...");
+			e.printStackTrace();
+		}
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declearer) {
+		declearer.declare(new Fields(this.OutputFieldName));
+	}
+
+	abstract void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/TelemetryTaggerBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/TelemetryTaggerBolt.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/TelemetryTaggerBolt.java
new file mode 100644
index 0000000..a31e1b7
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/TelemetryTaggerBolt.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.opensoc.alerts.interfaces.TaggerAdapter;
+import com.opensoc.json.serialization.JSONEncoderHelper;
+import com.opensoc.metrics.MetricReporter;
+
+@SuppressWarnings("rawtypes")
+public class TelemetryTaggerBolt extends AbstractTaggerBolt {
+
+	/**
+	 * Use an adapter to tag existing telemetry messages with alerts. The list
+	 * of available tagger adapters is located under
+	 * com.opensoc.tagging.adapters. At the time of the release the following
+	 * adapters are available:
+	 * 
+	 * <p>
+	 * <ul>
+	 * <li>RegexTagger = read a list or regular expressions and tag a message if
+	 * they exist in a message
+	 * <li>StaticAllTagger = tag each message with a static alert
+	 * <ul>
+	 * <p>
+	 */
+	private static final long serialVersionUID = -2647123143398352020L;
+	private Properties metricProperties;
+	private JSONObject metricConfiguration;
+
+	/**
+	 * 
+	 * @param tagger
+	 *            - tagger adapter for generating alert tags
+	 * @return instance of bolt
+	 */
+	public TelemetryTaggerBolt withMessageTagger(TaggerAdapter tagger) {
+		_adapter = tagger;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param OutputFieldName
+	 *            - output name of the tuple coming out of this bolt
+	 * @return - instance of this bolt
+	 */
+	public TelemetryTaggerBolt withOutputFieldName(String OutputFieldName) {
+		this.OutputFieldName = OutputFieldName;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param metricProperties
+	 *            - metric output to graphite
+	 * @return - instance of this bolt
+	 */
+	public TelemetryTaggerBolt withMetricProperties(Properties metricProperties) {
+		this.metricProperties = metricProperties;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param identifier
+	 *            - the identifier tag for tagging telemetry messages with
+	 *            alerts out of this bolt
+	 * @return - instance of this bolt
+	 */
+
+	public TelemetryTaggerBolt withIdentifier(JSONObject identifier) {
+		this._identifier = identifier;
+		return this;
+	}
+	
+	/**
+	 * @param config
+	 *            A class for generating custom metrics into graphite
+	 * @return Instance of this class
+	 */
+
+	public TelemetryTaggerBolt withMetricConfiguration(Configuration config) {
+		this.metricConfiguration = JSONEncoderHelper.getJSON(config
+				.subset("com.opensoc.metrics"));
+		return this;
+	}
+
+	@Override
+	void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException {
+
+		LOG.info("[OpenSOC] Preparing TelemetryParser Bolt...");
+
+		try {
+			_reporter = new MetricReporter();
+			_reporter.initialize(metricProperties, TelemetryTaggerBolt.class);
+			LOG.info("[OpenSOC] Initialized metrics");
+		} catch (Exception e) {
+			LOG.info("[OpenSOC] Could not initialize metrics");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public void execute(Tuple tuple) {
+
+		LOG.trace("[OpenSOC] Starting to process message for alerts");
+		JSONObject original_message = null;
+
+		try {
+
+			original_message = (JSONObject) tuple.getValue(0);
+
+			if (original_message == null || original_message.isEmpty())
+				throw new Exception("Could not parse message from byte stream");
+
+			LOG.trace("[OpenSOC] Received tuple: " + original_message);
+
+			JSONObject alerts_tag = new JSONObject();
+			JSONArray alerts_list = _adapter.tag(original_message);
+
+			LOG.trace("[OpenSOC] Tagged message: " + alerts_list);
+
+			if (alerts_list.size() != 0) {
+				if (original_message.containsKey("alerts")) {
+					JSONObject tag = (JSONObject) original_message
+							.get("alerts");
+					JSONArray already_triggered = (JSONArray) tag
+							.get("triggered");
+					alerts_list.addAll(already_triggered);
+					LOG.trace("[OpenSOC] Created a new string of alerts");
+				}
+
+				alerts_tag.put("identifier", _identifier);
+				alerts_tag.put("triggered", alerts_list);
+				original_message.put("alerts", alerts_tag);
+				
+				LOG.debug("[OpenSOC] Detected alerts: " + alerts_tag);
+			}
+			else
+			{
+				LOG.debug("[OpenSOC] The following messages did not contain alerts: " + original_message);
+			}
+
+			_collector.ack(tuple);
+			_collector.emit(new Values(original_message));
+			
+			/*if (metricConfiguration != null) {
+				emitCounter.inc();
+				ackCounter.inc();
+			}*/
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			LOG.error("Failed to tag message :" + original_message);
+			e.printStackTrace();
+			_collector.fail(tuple);
+			
+			/*
+			if (metricConfiguration != null) {
+				failCounter.inc();
+			}*/
+		}
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declearer) {
+		declearer.declare(new Fields(this.OutputFieldName));
+
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/AbstractTaggerAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/AbstractTaggerAdapter.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/AbstractTaggerAdapter.java
new file mode 100644
index 0000000..2ec6377
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/AbstractTaggerAdapter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging.adapters;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.alerts.interfaces.TaggerAdapter;
+
+@SuppressWarnings("serial")
+public abstract class AbstractTaggerAdapter implements TaggerAdapter, Serializable{
+	
+	protected static final Logger _LOG = LoggerFactory
+			.getLogger(AbstractTaggerAdapter.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/RegexTagger.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/RegexTagger.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/RegexTagger.java
new file mode 100644
index 0000000..2d8109f
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/RegexTagger.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging.adapters;
+
+import java.util.Map;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class RegexTagger extends AbstractTaggerAdapter{
+	
+	/**
+	 * Reads a regex rules file and tags a message with alerts if any rule from that file
+	 * matches anything in the telemetry message
+	 */
+	private static final long serialVersionUID = -6091495636459799411L;
+	Map <String, JSONObject> _rules;
+	
+	/**
+	 * 
+	 * @param rules rules read from a properties XML file
+	 */
+	public RegexTagger(Map<String, JSONObject> rules)
+	{
+		_rules = rules;
+	}
+
+	/**
+	 * @param raw_message telemetry message to be tagged
+	 */
+	@SuppressWarnings("unchecked")
+	public JSONArray tag(JSONObject raw_message) {
+
+		JSONArray ja = new JSONArray();
+		String message_as_string = raw_message.toString();
+		
+		for(String rule : _rules.keySet())
+		{		
+			if (message_as_string.matches(rule))
+			{
+				ja.add(_rules.get(rule));
+			}
+		}	
+		
+		return ja;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/StaticAllTagger.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/StaticAllTagger.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/StaticAllTagger.java
new file mode 100644
index 0000000..67c6c45
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/StaticAllTagger.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.tagging.adapters;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class StaticAllTagger extends AbstractTaggerAdapter {
+
+	/**
+	 * Attaches a static alerts tag to JSON telemetry messages
+	 */
+	private static final long serialVersionUID = 7759427661169094065L;
+	private JSONObject _static_tag_message;
+	JSONArray ja = new JSONArray();
+
+	/**
+	 * 
+	 * @param static_tag_message
+	 *            static alerts tag to attach to the message as a JSON
+	 */
+	@SuppressWarnings("unchecked")
+	public StaticAllTagger(JSONObject static_tag_message) {
+		_static_tag_message = static_tag_message;
+		ja.add(_static_tag_message);
+	}
+
+	/**
+	 * @param raw_message
+	 *            message to tag
+	 */
+	public JSONArray tag(JSONObject raw_message) {
+
+		return ja;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/TaggerAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/TaggerAdapter.java b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/TaggerAdapter.java
new file mode 100644
index 0000000..9fc11d7
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/com/apache/metron/tagging/adapters/TaggerAdapter.java
@@ -0,0 +1,9 @@
+package com.opensoc.tagging.adapters;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public interface TaggerAdapter {
+
+	JSONArray tag(JSONObject raw_message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml b/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
new file mode 100644
index 0000000..8d812a9
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/resources/hbase-site.xml
@@ -0,0 +1,131 @@
+<!--Tue Apr  1 18:16:39 2014-->
+  <configuration>
+    <property>
+    <name>hbase.tmp.dir</name>
+    <value>/disk/h/hbase</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.chunkpool.maxsize</name>
+    <value>0.5</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.codecs</name>
+    <value>lzo,gz,snappy</value>
+  </property>
+    <property>
+    <name>hbase.hstore.flush.retries.number</name>
+    <value>120</value>
+  </property>
+    <property>
+    <name>hbase.client.keyvalue.maxsize</name>
+    <value>10485760</value>
+  </property>
+    <property>
+    <name>hbase.rootdir</name>
+    <value>hdfs://nn1:8020/apps/hbase/data</value>
+  </property>
+    <property>
+    <name>hbase.defaults.for.version.skip</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.client.scanner.caching</name>
+    <value>100</value>
+  </property>
+    <property>
+    <name>hbase.superuser</name>
+    <value>hbase</value>
+  </property>
+    <property>
+    <name>hfile.block.cache.size</name>
+    <value>0.40</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.checksum.verify</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.mslab.enabled</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.max.filesize</name>
+    <value>107374182400</value>
+  </property>
+    <property>
+    <name>hbase.cluster.distributed</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>zookeeper.session.timeout</name>
+    <value>30000</value>
+  </property>
+    <property>
+    <name>zookeeper.znode.parent</name>
+    <value>/hbase-unsecure</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.global.memstore.lowerLimit</name>
+    <value>0.38</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.handler.count</name>
+    <value>240</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.mslab.chunksize</name>
+    <value>8388608</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>zkpr1,zkpr2,zkpr3</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.useMulti</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.majorcompaction</name>
+    <value>86400000</value>
+  </property>
+    <property>
+    <name>hbase.hstore.blockingStoreFiles</name>
+    <value>200</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.property.clientPort</name>
+    <value>2181</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.flush.size</name>
+    <value>134217728</value>
+  </property>
+    <property>
+    <name>hbase.security.authorization</name>
+    <value>false</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.global.memstore.upperLimit</name>
+    <value>0.4</value>
+  </property>
+    <property>
+    <name>hbase.hstore.compactionThreshold</name>
+    <value>4</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.block.multiplier</name>
+    <value>8</value>
+  </property>
+    <property>
+    <name>hbase.security.authentication</name>
+    <value>simple</value>
+  </property>
+    <property>
+    <name>dfs.client.read.shortcircuit</name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>dfs.domain.socket.path</name>
+    <value>/var/run/hdfs/dn_socket</value>
+  </property>
+  </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java b/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java
new file mode 100644
index 0000000..65c74c0
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java
@@ -0,0 +1,166 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.alerts.adapters;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.Properties;
+
+import com.opensoc.test.AbstractConfigTest;
+import com.opensoc.alerts.adapters.AllAlertAdapter;
+
+ /**
+ * <ul>
+ * <li>Title: AllAlertAdapterTest</li>
+ * <li>Description: Tests for AllAlertAdapter</li>
+ * <li>Created: Oct 8, 2014</li>
+ * </ul>
+ * @version $Revision: 1.1 $
+ */
+public class AllAlertAdapterTest extends AbstractConfigTest {
+
+     /**
+     * The allAlertAdapter.
+     */
+    private static AllAlertAdapter allAlertAdapter=null;
+    
+     /**
+     * The connected.
+     */
+    private static boolean connected=false;
+
+    /**
+     * Constructs a new <code>AllAlertAdapterTest</code> instance.
+     * @param name
+     */
+    public AllAlertAdapterTest(String name) {
+        super(name);
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    protected static void setUpBeforeClass() throws Exception {
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    protected static void tearDownAfterClass() throws Exception {
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+
+    @SuppressWarnings("unchecked")
+    protected void setUp() throws Exception {
+          super.setUp("com.opensoc.alerts.adapters.AllAlertAdapter");
+          Properties prop = super.getTestProperties();
+          assertNotNull(prop);   
+       // this.setMode("global");
+        if(skipTests(this.getMode())){
+            System.out.println(getClass().getName()+" Skipping Tests !!Local Mode");
+            return;//skip tests
+       }else{      
+           Map<String, String> settings = super.getSettings();
+           @SuppressWarnings("rawtypes")
+        Class loaded_class = Class.forName("com.opensoc.alerts.adapters.AllAlertAdapter");
+           @SuppressWarnings("rawtypes")
+        Constructor constructor = loaded_class.getConstructor(new Class[] { Map.class});
+           
+           AllAlertAdapterTest.allAlertAdapter = (AllAlertAdapter) constructor.newInstance(settings);
+            // AllAlertAdapterTest.allAlertAdapter = new AllAlertAdapter(settings)
+      }
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#tearDown()
+     */
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+
+    /**
+     * Test method for {@link com.opensoc.alerts.adapters.AlllterAdapter#initialize()}.
+     */
+    public void testInitializeAdapter() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{        
+           
+        boolean initialized =AllAlertAdapterTest.getAllAlertAdapter().initialize();
+        assertTrue(initialized);
+       }
+    }
+    
+    /**
+     * Test method for containsAlertId(@link  com.opensoc.alerts.adapters.AlllterAdapter#containsAlertId()}.
+     */
+    public void testContainsAlertId(){
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{          
+            boolean containsAlert =AllAlertAdapterTest.getAllAlertAdapter().containsAlertId("test");
+            assertFalse(containsAlert);
+       }
+    }
+ 
+   
+
+    /**
+     * Returns the allAlertAdapter.
+     * @return the allAlertAdapter.
+     */
+    
+    public static AllAlertAdapter getAllAlertAdapter() {
+        return allAlertAdapter;
+    }
+
+    /**
+     * Sets the allAlertAdapter.
+     * @param allAlertAdapter the allAlertAdapter.
+     */
+    
+    public static void setAllAlertAdapter(AllAlertAdapter allAlertAdapter) {
+    
+        AllAlertAdapterTest.allAlertAdapter = allAlertAdapter;
+    }
+    /**
+     * Returns the connected.
+     * @return the connected.
+     */
+    
+    public static boolean isConnected() {
+        return connected;
+    }
+
+    /**
+     * Sets the connected.
+     * @param connected the connected.
+     */
+    
+    public static void setConnected(boolean connected) {
+    
+        AllAlertAdapterTest.connected = connected;
+    }    
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/test/resources/AllAlertAdapterTest.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/test/resources/AllAlertAdapterTest.properties b/metron-streaming/Metron-Alerts/src/test/resources/AllAlertAdapterTest.properties
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/test/resources/AllAlertAdapterTest.properties
@@ -0,0 +1 @@
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json b/metron-streaming/Metron-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json
new file mode 100644
index 0000000..c4f2a82
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json
@@ -0,0 +1,42 @@
+{
+"title": "GeoMySql Schema",
+"type": "object",
+"properties": {
+
+         "city"    : {
+					   "type": "string"
+				  },
+		 "country" : {
+						"type": "string"
+					},
+		 "dmaCode" :
+		 			 {
+						"type": "string"
+					},
+	     "geoHash" : 
+	     			{
+						"type": "string"
+					},
+		 "latitude" : 
+		 			{
+						"type": "string"
+				   },
+		 "locID" : 
+		 			{
+					   "type": "string"
+				   },
+		 "location_point" : 
+		 			{
+					   "type": "string"
+				    },
+		 "longitude" : 
+		 			{
+						"type": "string"
+					},
+		 "postalCode" : 
+		 			{
+						"type": "string"
+					}
+   },
+   "required": ["city", "country", "dmaCode","latitude","locID","location_point","postalCode"]
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Alerts/src/test/resources/config/AllAlertAdapterTest.config
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/test/resources/config/AllAlertAdapterTest.config b/metron-streaming/Metron-Alerts/src/test/resources/config/AllAlertAdapterTest.config
new file mode 100644
index 0000000..f6e5dd1
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/test/resources/config/AllAlertAdapterTest.config
@@ -0,0 +1,8 @@
+#Alerts Bolt
+bolt.alerts.adapter=com.opensoc.alerts.adapters.AllAlertAdapter
+com.opensoc.alerts.adapters.AllAlertAdapter.whitelist_table_name = ip_whitelist
+com.opensoc.alerts.adapters.AllAlertAdapter.blacklist_table_name = ip_blacklist
+com.opensoc.alerts.adapters.AllAlertAdapter.quorum=zkpr1,zkpr2,zkpr3
+com.opensoc.alerts.adapters.AllAlertAdapter.port=2181
+com.opensoc.alerts.adapters.AllAlertAdapter._MAX_CACHE_SIZE_OBJECTS_NUM=3600
+com.opensoc.alerts.adapters.AllAlertAdapter._MAX_TIME_RETAIN_MINUTES=1000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/.gitignore
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/.gitignore b/metron-streaming/Metron-Common/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/metron-streaming/Metron-Common/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
new file mode 100644
index 0000000..ad1382f
--- /dev/null
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -0,0 +1,170 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.6BETA</version>
+	</parent>
+	<artifactId>OpenSOC-Common</artifactId>
+	<name>OpenSOC-Common</name>
+	<description>Components common to all enrichments</description>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+		<kafka.version>0.8.0</kafka.version>
+		<commons.config.version>1.10</commons.config.version>
+		<hbase.version>0.98.5-hadoop2</hbase.version>
+	</properties>
+	<repositories>
+		<repository>
+			<id>OpenSOC-Kraken-Repo</id>
+			<name>OpenSOC Kraken Repository</name>
+			<url>https://raw.github.com/opensoc/kraken/mvn-repo</url>
+		</repository>
+	</repositories>
+	<dependencies>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>${global_json_simple_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${global_storm_version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+				
+				   <artifactId>servlet-api</artifactId>
+				
+				   <groupId>javax.servlet</groupId>
+				
+				  </exclusion>
+			</exclusions>			
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.9.2</artifactId>
+			<version>0.8.0</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.codahale.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${global_metrics_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.codahale.metrics</groupId>
+			<artifactId>metrics-graphite</artifactId>
+			<version>${global_metrics_version}</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-configuration</groupId>
+			<artifactId>commons-configuration</artifactId>
+			<version>${commons.config.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.krakenapps</groupId>
+			<artifactId>kraken-pcap</artifactId>
+			<version>1.7.1</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.2</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-client</artifactId>
+			<version>${hbase.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.github.fge</groupId>
+			<artifactId>json-schema-validator</artifactId>
+			<version>${global_json_schema_validator_version}</version>
+		</dependency>
+	</dependencies>
+
+	<reporting>
+		<plugins>
+			<!-- Normally, dependency report takes time, skip it -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-project-info-reports-plugin</artifactId>
+				<version>2.7</version>
+
+				<configuration>
+					<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>emma-maven-plugin</artifactId>
+				<version>1.0-alpha-3</version>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-pmd-plugin</artifactId>
+				<configuration>
+					<targetJdk>1.7</targetJdk>
+				</configuration>
+			</plugin>
+		</plugins>
+	</reporting>
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>1.7</source>
+					<compilerArgument>-Xlint:unchecked</compilerArgument>
+					<target>1.7</target>
+				</configuration>
+			</plugin>
+		</plugins>
+		<resources>
+			<resource>
+				<directory>src/main/resources</directory>
+			</resource>
+		</resources>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml.versionsBackup b/metron-streaming/Metron-Common/pom.xml.versionsBackup
new file mode 100644
index 0000000..8ead949
--- /dev/null
+++ b/metron-streaming/Metron-Common/pom.xml.versionsBackup
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+	<artifactId>OpenSOC-Common</artifactId>
+	<name>OpenSOC-Common</name>
+	<description>Components common to all enrichments</description>
+	<properties>
+		<json.simple.version>1.1.1</json.simple.version>
+
+		<storm.version>0.9.2-incubating</storm.version>
+		<kafka.version>0.8.0</kafka.version>
+		<metrics.version>3.0.2</metrics.version>
+		<commons.config.version>1.10</commons.config.version>
+		<hbase.version>0.98.5-hadoop2</hbase.version>
+	</properties>
+	<repositories>
+		<repository>
+			<id>Kraken-Repo</id>
+			<name>Kraken Repository</name>
+			<url>http://download.krakenapps.org</url>
+		</repository>
+	</repositories>
+	<dependencies>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>${json.simple.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${storm.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.9.2</artifactId>
+			<version>0.8.0</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.codahale.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.codahale.metrics</groupId>
+			<artifactId>metrics-graphite</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>commons-configuration</groupId>
+			<artifactId>commons-configuration</artifactId>
+			<version>${commons.config.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.krakenapps</groupId>
+			<artifactId>kraken-pcap</artifactId>
+			<version>1.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.2</version>
+		</dependency>
+			<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-client</artifactId>
+			<version>${hbase.version}</version>
+		</dependency>
+	</dependencies>
+
+	<reporting>
+		<plugins>
+			<!-- Normally, dependency report takes time, skip it -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-project-info-reports-plugin</artifactId>
+				<version>2.7</version>
+
+				<configuration>
+					<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>emma-maven-plugin</artifactId>
+				<version>1.0-alpha-3</version>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-pmd-plugin</artifactId>
+				<configuration>
+					<targetJdk>1.7</targetJdk>
+				</configuration>
+			</plugin>
+		</plugins>
+	</reporting>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java
new file mode 100644
index 0000000..58567a6
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java
@@ -0,0 +1,16 @@
+package com.opensoc.alerts.interfaces;
+
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+
+public interface AlertsAdapter {
+
+	boolean initialize();
+
+	boolean refresh() throws Exception;
+
+	Map<String, JSONObject> alert(JSONObject raw_message);
+
+	boolean containsAlertId(String alert);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java
new file mode 100644
index 0000000..e5e32b7
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java
@@ -0,0 +1,11 @@
+package com.opensoc.alerts.interfaces;
+
+import org.json.simple.JSONObject;
+
+public interface AlertsInterface {
+
+	public JSONObject getContent();
+	public void setContent(JSONObject content);
+	public String getUuid();
+	public void setUuid(String uuid);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java
new file mode 100644
index 0000000..79dc0d6
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java
@@ -0,0 +1,9 @@
+package com.opensoc.alerts.interfaces;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public interface TaggerAdapter {
+
+	JSONArray tag(JSONObject raw_message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java
new file mode 100644
index 0000000..74f19a5
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java
@@ -0,0 +1,119 @@
+package com.opensoc.configuration;
+
+
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.configuration.CombinedConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.DefaultConfigurationBuilder;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration manager class which loads all 'config-definition.xml' files and
+ * creates a Configuration object which holds all properties from the underlying
+ * configuration resource
+ */
+public class ConfigurationManager {
+
+  /** configuration definition file name. */
+  private static String DEFAULT_CONFIG_DEFINITION_FILE_NAME = "config-definition.xml";
+
+  /** Stores a map with the configuration for each path specified. */
+  private static Map<String, Configuration> configurationsCache = new HashMap<String, Configuration>();
+
+  /** The Constant LOGGER. */
+  private static final Logger LOGGER = Logger
+      .getLogger(ConfigurationManager.class);
+
+  /**
+   * Common method to load content of all configuration resources defined in
+   * 'config-definition.xml'.
+   * 
+   * @param configDefFilePath
+   *          the config def file path
+   * @return Configuration
+   */
+  public static Configuration getConfiguration(String configDefFilePath) {
+    if (configurationsCache.containsKey(configDefFilePath)) {
+      return configurationsCache.get(configDefFilePath);
+    }
+    CombinedConfiguration configuration = null;
+    synchronized (configurationsCache) {
+      if (configurationsCache.containsKey(configDefFilePath)) {
+        return configurationsCache.get(configDefFilePath);
+      }
+      DefaultConfigurationBuilder builder = new DefaultConfigurationBuilder();
+      String fielPath = getConfigDefFilePath(configDefFilePath);
+      LOGGER.info("loading from 'configDefFilePath' :" + fielPath);
+      builder.setFile(new File(fielPath));
+      try {
+        configuration = builder.getConfiguration(true);
+        configurationsCache.put(fielPath, configuration);
+      } catch (ConfigurationException e) {
+        LOGGER.info("Exception in loading property files.", e);
+      }
+    }
+    return configuration;
+  }
+
+  /**
+   * Removes the configuration created from a config definition file located at
+   * 'configDefFilePath'.
+   * 
+   * @param configDefFilePath
+   *          path to the config definition file
+   */
+  public static void clearConfiguration(String configDefFilePath) {
+    configurationsCache.remove(configDefFilePath);
+  }
+
+  /**
+   * Gets the configuration.
+   * 
+   * @return the configuration
+   */
+  public static Configuration getConfiguration() {
+    return getConfiguration(null);
+  }
+
+  /**
+   * Returns the 'config-definition.xml' file path. 1. If the param
+   * 'configDefFilePath' has a valid value, returns configDefFilePath 2. If the
+   * system property key 'configDefFilePath' has a valid value, returns the
+   * value 3. By default, it returns the file name 'config-definition.xml'
+   * 
+   * @param configDefFilePath
+   *          given input path to the config definition file
+   * @return the config def file path
+   */
+  private static String getConfigDefFilePath(String configDefFilePath) {
+    if (StringUtils.isNotEmpty(configDefFilePath)) {
+      return configDefFilePath;
+    }
+    return DEFAULT_CONFIG_DEFINITION_FILE_NAME;
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the args
+   * @throws InterruptedException
+   *           the interrupted exception
+   */
+  public static void main(String[] args) throws InterruptedException {
+    Configuration config = ConfigurationManager
+        .getConfiguration("/Users/Sayi/Documents/config/config-definition-dpi.xml");
+    System.out.println("elastic.search.cluster ="
+        + config.getString("elastic.search.cluster"));
+    Thread.sleep(10000);
+    System.out.println("storm.topology.dpi.bolt.es-index.index.name ="
+        + config.getString("storm.topology.dpi.bolt.es-index.index.name"));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java
new file mode 100644
index 0000000..e19646a
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java
@@ -0,0 +1,11 @@
+package com.opensoc.dataloads.interfaces;
+
+import java.util.Iterator;
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONObject;
+
+public interface ThreatIntelSource extends Iterator<JSONObject> {
+
+	void initializeSource(Configuration config);
+	void cleanupSource();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
new file mode 100644
index 0000000..a6fdd85
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.interfaces;
+
+import org.json.simple.JSONObject;
+
+public interface EnrichmentAdapter
+{
+	JSONObject enrich(String metadata);
+	boolean initializeAdapter();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java
new file mode 100644
index 0000000..ef155f1
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java
@@ -0,0 +1,118 @@
+package com.opensoc.hbase;
+
+
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.opensoc.helpers.topology.ErrorGenerator;
+
+/**
+ * A Storm bolt for putting data into HBase.
+ * <p>
+ * By default works in batch mode by enabling HBase's client-side write buffer. Enabling batch mode
+ * is recommended for high throughput, but it can be disabled in {@link TupleTableConfig}.
+ * <p>
+ * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
+ * classpath
+ * @see TupleTableConfig
+ * @see HTableConnector
+ */
+@SuppressWarnings("serial")
+public class HBaseBolt implements IRichBolt {
+  private static final Logger LOG = Logger.getLogger(HBaseBolt.class);
+
+  protected OutputCollector collector;
+  protected HTableConnector connector;
+  protected TupleTableConfig conf;
+  protected boolean autoAck = true;
+  
+  private String _quorum;
+  private String _port;
+
+  public HBaseBolt(TupleTableConfig conf, String quorum, String port) {
+    this.conf = conf;
+    _quorum = quorum;
+    _port = port;
+
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("rawtypes")
+  
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+
+    try {
+      this.connector = new HTableConnector(conf, _quorum, _port);
+
+		
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    LOG.info("Preparing HBaseBolt for table: " + this.conf.getTableName());
+  }
+
+  /** {@inheritDoc} */
+  
+  public void execute(Tuple input) {
+    try {
+      this.connector.getTable().put(conf.getPutFromTuple(input));
+    } catch (IOException ex) {
+
+  		JSONObject error = ErrorGenerator.generateErrorMessage(
+  				"Alerts problem: " + input.getBinary(0), ex);
+  		collector.emit("error", new Values(error));
+  		
+      throw new RuntimeException(ex);
+    }
+
+    if (this.autoAck) {
+      this.collector.ack(input);
+    }
+  }
+
+  /** {@inheritDoc} */
+  
+  public void cleanup() {
+    this.connector.close();
+  }
+
+  /** {@inheritDoc} */
+  
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+	  declarer.declareStream("error", new Fields("HBase"));
+  }
+
+  /** {@inheritDoc} */
+  
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  /**
+   * @return the autoAck
+   */
+  public boolean isAutoAck() {
+    return autoAck;
+  }
+
+  /**
+   * @param autoAck the autoAck to set
+   */
+  public void setAutoAck(boolean autoAck) {
+    this.autoAck = autoAck;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java
new file mode 100644
index 0000000..4070db7
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java
@@ -0,0 +1,146 @@
+package com.opensoc.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HTable;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+
+public class HBaseStreamPartitioner implements CustomStreamGrouping {
+
+  private static final long serialVersionUID = -148324019395976092L;
+  private String[] regionStartKeys = { "0" };
+  private Map<String, String> regionStartKeyRegionNameMap = new HashMap<String, String>();
+
+  private List<Integer> targetTasks = null;
+  private int targetTasksSize = 0;
+  private int rowKeyFieldIndex = 0;
+  private String tableName = null;
+  private long regionCheckTime = 0;
+  private int regionInforRefreshIntervalInMins = 60;
+  private int regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
+
+  HTable hTable = null;;
+
+  
+  public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+    
+    System.out.println("preparing HBaseStreamPartitioner for streamId " + stream.get_streamId());
+    this.targetTasks = targetTasks;
+    this.targetTasksSize = this.targetTasks.size();
+
+    Configuration conf = HBaseConfiguration.create();
+    try {
+      hTable = new HTable(conf, tableName);
+      refreshRegionInfo(tableName);
+
+      System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+  
+  public void prepare() {
+    
+    System.out.println("preparing HBaseStreamPartitioner for streamId " );
+
+    Configuration conf = HBaseConfiguration.create();
+    try {
+      hTable = new HTable(conf, tableName);
+      refreshRegionInfo(tableName);
+
+      System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  public HBaseStreamPartitioner(String tableName, int rowKeyFieldIndex, int regionInforRefreshIntervalInMins) {
+    System.out.println("Created HBaseStreamPartitioner ");
+    this.rowKeyFieldIndex = rowKeyFieldIndex;
+    this.tableName = tableName;
+    this.regionInforRefreshIntervalInMins = regionInforRefreshIntervalInMins;
+    this.regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
+
+  }
+
+  
+  public List<Integer> chooseTasks(int taskId, List<Object> values) {
+    List<Integer> choosenTasks = null;
+    System.out.println("Choosing task for taskId " + taskId + " and values " + values);
+
+    if (regionInforRefreshIntervalInMillis > (System.currentTimeMillis() - regionCheckTime)) {
+      try {
+        refreshRegionInfo(tableName);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    int regionIndex = getRegionIndex((String) values.get(rowKeyFieldIndex));
+
+    if (regionIndex < targetTasksSize) {
+      choosenTasks = Arrays.asList(regionIndex);
+
+    } else {
+      choosenTasks = Arrays.asList(regionIndex % targetTasksSize);
+    }
+    System.out.println("Choosen tasks are " + choosenTasks);
+
+    return choosenTasks;
+
+
+  }
+
+  
+  
+  public int getRegionIndex(String key) {
+    int index = Arrays.binarySearch(regionStartKeys, key);
+    if (index < -1) {
+      index = (index + 2) * -1;
+    } else if (index == -1) {
+      index = 0;
+    }
+
+    return index;
+  }
+
+  private void refreshRegionInfo(String tableName) throws IOException {
+
+    System.out.println("in refreshRegionInfo ");
+
+    Map<HRegionInfo, ServerName> regionMap = hTable.getRegionLocations();
+
+    synchronized (regionStartKeys) {
+      synchronized (regionStartKeyRegionNameMap) {
+        regionStartKeys = new String[regionMap.size()];
+        int index = 0;
+        String startKey = null;
+        regionStartKeyRegionNameMap.clear();
+        for (HRegionInfo regionInfo : regionMap.keySet()) {
+          startKey = new String(regionInfo.getStartKey());
+          regionStartKeyRegionNameMap.put(startKey, regionInfo.getRegionNameAsString());
+          regionStartKeys[index] = startKey;
+          index++;
+        }
+
+        Arrays.sort(regionStartKeys);
+        regionCheckTime = System.currentTimeMillis();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java
new file mode 100644
index 0000000..d0aa0b4
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java
@@ -0,0 +1,106 @@
+package com.opensoc.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+import backtype.storm.generated.Bolt;
+
+/**
+ * HTable connector for Storm {@link Bolt}
+ * <p>
+ * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
+ * classpath
+ */
+@SuppressWarnings("serial")
+public class HTableConnector implements Serializable {
+  private static final Logger LOG = Logger.getLogger(HTableConnector.class);
+
+  private Configuration conf;
+  protected HTable table;
+  private String tableName;
+
+  /**
+   * Initialize HTable connection
+   * @param conf The {@link TupleTableConfig}
+   * @throws IOException
+   */
+  public HTableConnector(final TupleTableConfig conf, String _quorum, String _port) throws IOException {
+    this.tableName = conf.getTableName();
+    this.conf = HBaseConfiguration.create();
+    
+    if(_quorum != null && _port != null)
+    {
+    	this.conf.set("hbase.zookeeper.quorum", _quorum);
+    	this.conf.set("hbase.zookeeper.property.clientPort", _port);
+    }
+
+    LOG.info(String.format("Initializing connection to HBase table %s at %s", tableName,
+      this.conf.get("hbase.rootdir")));
+
+    try {
+      this.table = new HTable(this.conf, this.tableName);
+    } catch (IOException ex) {
+      throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
+    }
+
+    if (conf.isBatch()) {
+      // Enable client-side write buffer
+      this.table.setAutoFlush(false, true);
+      LOG.info("Enabled client-side write buffer");
+    }
+
+    // If set, override write buffer size
+    if (conf.getWriteBufferSize() > 0) {
+      try {
+        this.table.setWriteBufferSize(conf.getWriteBufferSize());
+
+        LOG.info("Setting client-side write buffer to " + conf.getWriteBufferSize());
+      } catch (IOException ex) {
+        LOG.error("Unable to set client-side write buffer size for HBase table " + this.tableName,
+          ex);
+      }
+    }
+
+    // Check the configured column families exist
+    for (String cf : conf.getColumnFamilies()) {
+      if (!columnFamilyExists(cf)) {
+        throw new RuntimeException(String.format(
+          "HBase table '%s' does not have column family '%s'", conf.getTableName(), cf));
+      }
+    }
+  }
+
+  /**
+   * Checks to see if table contains the given column family
+   * @param columnFamily The column family name
+   * @return boolean
+   * @throws IOException
+   */
+  private boolean columnFamilyExists(final String columnFamily) throws IOException {
+    return this.table.getTableDescriptor().hasFamily(Bytes.toBytes(columnFamily));
+  }
+
+  /**
+   * @return the table
+   */
+  public HTable getTable() {
+    return table;
+  }
+
+  /**
+   * Close the table
+   */
+  public void close() {
+    try {
+      this.table.close();
+    } catch (IOException ex) {
+      LOG.error("Unable to close connection to HBase table " + tableName, ex);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java
new file mode 100644
index 0000000..71f8c9a
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java
@@ -0,0 +1,279 @@
+package com.opensoc.hbase;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Configuration for Storm {@link Tuple} to HBase serialization.
+ */
+@SuppressWarnings("serial")
+public class TupleTableConfig implements Serializable {
+  
+  public static final long DEFAULT_INCREMENT = 1L;
+  
+  private String tableName;
+  protected String tupleRowKeyField;
+  protected String tupleTimestampField;
+  protected Map<String, Set<String>> columnFamilies;
+  private boolean batch = true;
+  protected Durability durability = Durability.USE_DEFAULT;
+  private long writeBufferSize = 0L;
+  
+  /**
+   * Initialize configuration
+   * 
+   * @param table
+   *          The HBase table name
+   * @param rowKeyField
+   *          The {@link Tuple} field used to set the rowKey
+   */
+  public TupleTableConfig(final String table, final String rowKeyField) {
+    this.tableName = table;
+    this.tupleRowKeyField = rowKeyField;
+    this.tupleTimestampField = "";
+    this.columnFamilies = new HashMap<String, Set<String>>();
+  }
+  
+  /**
+   * Initialize configuration
+   * 
+   * @param table
+   *          The HBase table name
+   * @param rowKeyField
+   *          The {@link Tuple} field used to set the rowKey
+   * @param timestampField
+   *          The {@link Tuple} field used to set the timestamp
+   */
+  public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
+    this.tableName = table;
+    this.tupleRowKeyField = rowKeyField;
+    this.tupleTimestampField = timestampField;
+    this.columnFamilies = new HashMap<String, Set<String>>();
+  }
+  
+  /**
+   * Add column family and column qualifier to be extracted from tuple
+   * 
+   * @param columnFamily
+   *          The column family name
+   * @param columnQualifier
+   *          The column qualifier name
+   */
+  public void addColumn(final String columnFamily, final String columnQualifier) {
+    Set<String> columns = this.columnFamilies.get(columnFamily);
+    
+    if (columns == null) {
+      columns = new HashSet<String>();
+    }
+    columns.add(columnQualifier);
+    
+    this.columnFamilies.put(columnFamily, columns);
+  }
+  
+  /**
+   * Creates a HBase {@link Put} from a Storm {@link Tuple}
+   * 
+   * @param tuple
+   *          The {@link Tuple}
+   * @return {@link Put}
+   */
+  public Put getPutFromTuple(final Tuple tuple) {
+    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+    
+    long ts = 0;
+    if (!tupleTimestampField.equals("")) {
+      ts = tuple.getLongByField(tupleTimestampField);
+    }
+    
+    Put p = new Put(rowKey);
+    
+    p.setDurability(durability);
+    
+    if (columnFamilies.size() > 0) {
+      for (String cf : columnFamilies.keySet()) {
+        byte[] cfBytes = Bytes.toBytes(cf);
+        for (String cq : columnFamilies.get(cf)) {
+          byte[] cqBytes = Bytes.toBytes(cq);
+          byte[] val = tuple.getBinaryByField(cq);
+          
+          if (ts > 0) {
+            p.add(cfBytes, cqBytes, ts, val);
+          } else {
+            p.add(cfBytes, cqBytes, val);
+          }
+        }
+      }
+    }
+    
+    return p;
+  }
+  
+  /**
+   * Creates a HBase {@link Increment} from a Storm {@link Tuple}
+   * 
+   * @param tuple
+   *          The {@link Tuple}
+   * @param increment
+   *          The amount to increment the counter by
+   * @return {@link Increment}
+   */
+  public Increment getIncrementFromTuple(final Tuple tuple, final long increment) {
+    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
+    
+    Increment inc = new Increment(rowKey);
+    inc.setDurability(durability);
+    
+    if (columnFamilies.size() > 0) {
+      for (String cf : columnFamilies.keySet()) {
+        byte[] cfBytes = Bytes.toBytes(cf);
+        for (String cq : columnFamilies.get(cf)) {
+          byte[] val;
+          try {
+            val = Bytes.toBytes(tuple.getStringByField(cq));
+          } catch (IllegalArgumentException ex) {
+            // if cq isn't a tuple field, use cq for counter instead of tuple
+            // value
+            val = Bytes.toBytes(cq);
+          }
+          inc.addColumn(cfBytes, val, increment);
+        }
+      }
+    }
+    
+    return inc;
+  }
+  
+  /**
+   * Increment the counter for the given family and column by the specified
+   * amount
+   * <p>
+   * If the family and column already exist in the Increment the counter value
+   * is incremented by the specified amount rather than overridden, as it is in
+   * HBase's {@link Increment#addColumn(byte[], byte[], long)} method
+   * 
+   * @param inc
+   *          The {@link Increment} to update
+   * @param family
+   *          The column family
+   * @param qualifier
+   *          The column qualifier
+   * @param amount
+   *          The amount to increment the counter by
+   */
+  public static void addIncrement(Increment inc, final byte[] family, final byte[] qualifier, final Long amount) {
+    
+    NavigableMap<byte[], Long> set = inc.getFamilyMapOfLongs().get(family);
+    if (set == null) {
+      set = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    }
+    
+    // If qualifier exists, increment amount
+    Long counter = set.get(qualifier);
+    if (counter == null) {
+      counter = 0L;
+    }
+    set.put(qualifier, amount + counter);
+    
+    inc.getFamilyMapOfLongs().put(family, set);
+  }
+  
+  /**
+   * @return the tableName
+   */
+  public String getTableName() {
+    return tableName;
+  }
+  
+  /**
+   * @return Whether batch mode is enabled
+   */
+  public boolean isBatch() {
+    return batch;
+  }
+  
+  /**
+   * @param batch
+   *          Whether to enable HBase's client-side write buffer.
+   *          <p>
+   *          When enabled your bolt will store put operations locally until the
+   *          write buffer is full, so they can be sent to HBase in a single RPC
+   *          call. When disabled each put operation is effectively an RPC and
+   *          is sent straight to HBase. As your bolt can process thousands of
+   *          values per second it is recommended that the write buffer is
+   *          enabled.
+   *          <p>
+   *          Enabled by default
+   */
+  public void setBatch(boolean batch) {
+    this.batch = batch;
+  }
+  
+  /**
+   * @param setDurability
+   *          Sets whether to write to HBase's edit log.
+   *          <p>
+   *          Setting to false will mean fewer operations to perform when
+   *          writing to HBase and hence better performance, but changes that
+   *          haven't been flushed to a store file will be lost in the event of
+   *          HBase failure
+   *          <p>
+   *          Enabled by default
+   */
+  public void setDurability(Durability durability) {
+    this.durability = durability;
+  }
+  
+  
+  public Durability getDurability() {
+    return  durability;
+  }
+  
+  /**
+   * @param writeBufferSize
+   *          Overrides the client-side write buffer size.
+   *          <p>
+   *          By default the write buffer size is 2 MB (2097152 bytes). If you
+   *          are storing larger data, you may want to consider increasing this
+   *          value to allow your bolt to efficiently group together a larger
+   *          number of records per RPC
+   *          <p>
+   *          Overrides the write buffer size you have set in your
+   *          hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
+   */
+  public void setWriteBufferSize(long writeBufferSize) {
+    this.writeBufferSize = writeBufferSize;
+  }
+  
+  /**
+   * @return the writeBufferSize
+   */
+  public long getWriteBufferSize() {
+    return writeBufferSize;
+  }
+  
+  /**
+   * @return A Set of configured column families
+   */
+  public Set<String> getColumnFamilies() {
+    return this.columnFamilies.keySet();
+  }
+  
+  /**
+   * @return the tupleRowKeyField
+   */
+  public String getTupleRowKeyField() {
+    return tupleRowKeyField;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java
new file mode 100644
index 0000000..70f8683
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java
@@ -0,0 +1,110 @@
+package com.opensoc.helpers.services;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+public class PcapServiceCli {
+
+	private String[] args = null;
+	private Options options = new Options();
+
+	int port = 8081;
+	String uri = "/pcapGetter";
+
+	public int getPort() {
+		return port;
+	}
+
+	public void setPort(int port) {
+		this.port = port;
+	}
+
+	public String getUri() {
+		return uri;
+	}
+
+	public void setUri(String uri) {
+		this.uri = uri;
+	}
+
+	public PcapServiceCli(String[] args) {
+
+		this.args = args;
+
+		Option help = new Option("h", "Display help menue");
+		options.addOption(help);
+		options.addOption(
+				"port",
+				true,
+				"OPTIONAL ARGUMENT [portnumber] If this argument sets the port for starting the service.  If this argument is not set the port will start on defaut port 8081");
+		options.addOption(
+				"endpoint_uri",
+				true,
+				"OPTIONAL ARGUMENT [/uri/to/service] This sets the URI for the service to be hosted.  The default URI is /pcapGetter");
+	}
+
+	public void parse() {
+		CommandLineParser parser = new BasicParser();
+
+		CommandLine cmd = null;
+
+		try {
+			cmd = parser.parse(options, args);
+		} catch (ParseException e1) {
+
+			e1.printStackTrace();
+		}
+
+		if (cmd.hasOption("h"))
+			help();
+
+		if (cmd.hasOption("port")) {
+
+			try {
+				port = Integer.parseInt(cmd.getOptionValue("port").trim());
+			} catch (Exception e) {
+
+				System.out.println("[OpenSOC] Invalid value for port entered");
+				help();
+			}
+		}
+		if (cmd.hasOption("endpoint_uri")) {
+
+			try {
+
+				if (uri == null || uri.equals(""))
+					throw new Exception("invalid uri");
+
+				uri = cmd.getOptionValue("uri").trim();
+
+				if (uri.charAt(0) != '/')
+					uri = "/" + uri;
+
+				if (uri.charAt(uri.length()) == '/')
+					uri = uri.substring(0, uri.length() - 1);
+
+			} catch (Exception e) {
+				System.out.println("[OpenSOC] Invalid URI entered");
+				help();
+			}
+		}
+
+	}
+
+	private void help() {
+		// This prints out some help
+		HelpFormatter formater = new HelpFormatter();
+
+		formater.printHelp("Topology Options:", options);
+
+		// System.out
+		// .println("[OpenSOC] Example usage: \n storm jar OpenSOC-Topologies-0.3BETA-SNAPSHOT.jar com.opensoc.topology.Bro -local_mode true -config_path OpenSOC_Configs/ -generator_spout true");
+
+		System.exit(0);
+	}
+}


Mime
View raw message