metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [74/89] [abbrv] incubator-metron git commit: Move all com/apache folders to org/apache
Date Tue, 26 Jan 2016 14:18:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/adapters/ThreatAlertsAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/adapters/ThreatAlertsAdapter.java b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/adapters/ThreatAlertsAdapter.java
new file mode 100644
index 0000000..48d773d
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/adapters/ThreatAlertsAdapter.java
@@ -0,0 +1,311 @@
+package com.apache.metron.alerts.adapters;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.validator.routines.InetAddressValidator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.apache.metron.alerts.interfaces.AlertsAdapter;
+
+@SuppressWarnings("serial")
+public class ThreatAlertsAdapter implements AlertsAdapter, Serializable {
+
+	String enrichment_tag;
+
+	HTableInterface blacklist_table;
+	HTableInterface whitelist_table;
+	InetAddressValidator ipvalidator = new InetAddressValidator();
+	String _whitelist_table_name;
+	String _blacklist_table_name;
+	String _quorum;
+	String _port;
+	String _topologyname;
+	Configuration conf = null;
+
+	Cache<String, String> cache;
+	String _topology_name;
+
+	Set<String> loaded_whitelist = new HashSet<String>();
+	Set<String> loaded_blacklist = new HashSet<String>();
+
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(ThreatAlertsAdapter.class);
+
+	public ThreatAlertsAdapter(Map<String, String> config) {
+		try {
+
+			if (!config.containsKey("whitelist_table_name"))
+				throw new Exception("Whitelist table name is missing");
+
+			_whitelist_table_name = config.get("whitelist_table_name");
+
+			if (!config.containsKey("blacklist_table_name"))
+				throw new Exception("Blacklist table name is missing");
+
+			_blacklist_table_name = config.get("blacklist_table_name");
+
+			if (!config.containsKey("quorum"))
+				throw new Exception("Quorum name is missing");
+
+			_quorum = config.get("quorum");
+
+			if (!config.containsKey("port"))
+				throw new Exception("port name is missing");
+
+			_port = config.get("port");
+
+			if (!config.containsKey("_MAX_CACHE_SIZE_OBJECTS_NUM"))
+				throw new Exception("_MAX_CACHE_SIZE_OBJECTS_NUM name is missing");
+
+			int _MAX_CACHE_SIZE_OBJECTS_NUM = Integer.parseInt(config
+					.get("_MAX_CACHE_SIZE_OBJECTS_NUM"));
+
+			if (!config.containsKey("_MAX_TIME_RETAIN_MINUTES"))
+				throw new Exception("_MAX_TIME_RETAIN_MINUTES name is missing");
+
+			int _MAX_TIME_RETAIN_MINUTES = Integer.parseInt(config
+					.get("_MAX_TIME_RETAIN_MINUTES"));
+
+			cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE_OBJECTS_NUM)
+					.expireAfterWrite(_MAX_TIME_RETAIN_MINUTES, TimeUnit.MINUTES)
+					.build();
+
+			enrichment_tag = config.get("enrichment_tag");
+
+		} catch (Exception e) {
+			System.out.println("Could not initialize alerts adapter");
+			e.printStackTrace();
+			System.exit(0);
+		}
+	}
+
+	@SuppressWarnings("resource")
+    @Override
+	public boolean initialize() {
+
+		conf = HBaseConfiguration.create();
+		// conf.set("hbase.zookeeper.quorum", _quorum);
+		// conf.set("hbase.zookeeper.property.clientPort", _port);
+
+		LOG.trace("[Metron] Connecting to hbase with conf:" + conf);
+		LOG.trace("[Metron] Whitelist table name: " + _whitelist_table_name);
+		LOG.trace("[Metron] Whitelist table name: " + _blacklist_table_name);
+		LOG.trace("[Metron] ZK Client/port: "
+				+ conf.get("hbase.zookeeper.quorum") + " -> "
+				+ conf.get("hbase.zookeeper.property.clientPort"));
+
+		try {
+
+			LOG.trace("[Metron] Attempting to connect to hbase");
+
+			HConnection connection = HConnectionManager.createConnection(conf);
+
+			LOG.trace("[Metron] CONNECTED TO HBASE");
+
+			HBaseAdmin hba = new HBaseAdmin(conf);
+
+			if (!hba.tableExists(_whitelist_table_name))
+				throw new Exception("Whitelist table doesn't exist");
+
+			if (!hba.tableExists(_blacklist_table_name))
+				throw new Exception("Blacklist table doesn't exist");
+
+			whitelist_table = new HTable(conf, _whitelist_table_name);
+
+			LOG.trace("[Metron] CONNECTED TO TABLE: " + _whitelist_table_name);
+			blacklist_table = new HTable(conf, _blacklist_table_name);
+			LOG.trace("[Metron] CONNECTED TO TABLE: " + _blacklist_table_name);
+
+			if (connection == null || whitelist_table == null
+					|| blacklist_table == null)
+				throw new Exception("Unable to initialize hbase connection");
+
+			Scan scan = new Scan();
+
+			ResultScanner rs = whitelist_table.getScanner(scan);
+			try {
+				for (Result r = rs.next(); r != null; r = rs.next()) {
+					loaded_whitelist.add(Bytes.toString(r.getRow()));
+				}
+			} catch (Exception e) {
+				LOG.trace("[Metron] COULD NOT READ FROM HBASE");
+				e.printStackTrace();
+			} finally {
+				rs.close(); // always close the ResultScanner!
+				hba.close();
+			}
+			whitelist_table.close();
+
+			LOG.trace("[Metron] READ IN WHITELIST: " + loaded_whitelist.size());
+
+			scan = new Scan();
+
+			rs = blacklist_table.getScanner(scan);
+			try {
+				for (Result r = rs.next(); r != null; r = rs.next()) {
+					loaded_blacklist.add(Bytes.toString(r.getRow()));
+				}
+			} catch (Exception e) {
+				LOG.trace("[Metron] COULD NOT READ FROM HBASE");
+				e.printStackTrace();
+			} finally {
+				rs.close(); // always close the ResultScanner!
+				hba.close();
+			}
+			blacklist_table.close();
+
+			LOG.trace("[Metron] READ IN WHITELIST: " + loaded_whitelist.size());
+
+			rs.close(); // always close the ResultScanner!
+			hba.close();
+
+			return true;
+		} catch (Exception e) {
+
+			e.printStackTrace();
+		}
+
+		return false;
+
+	}
+
+	@Override
+	public boolean refresh() throws Exception {
+		return true;
+	}
+
+	@SuppressWarnings("unchecked")
+    @Override
+	public Map<String, JSONObject> alert(JSONObject raw_message) {
+
+		System.out.println("LOOKING FOR ENRICHMENT TAG: " + enrichment_tag);
+
+		Map<String, JSONObject> alerts = new HashMap<String, JSONObject>();
+		JSONObject content = (JSONObject) raw_message.get("message");
+
+		JSONObject enrichment = null;
+
+		if (raw_message.containsKey("enrichment"))
+			enrichment = (JSONObject) raw_message.get("enrichment");
+		else
+			return null;
+
+		if (enrichment.containsKey(enrichment_tag)) {
+
+			System.out.println("FOUND TAG: " + enrichment_tag);
+
+			JSONObject threat = (JSONObject) enrichment.get(enrichment_tag);
+
+			int cnt = 0;
+			Object enriched_key = null;
+			
+			for (Object key : threat.keySet()) {
+				JSONObject tmp = (JSONObject) threat.get(key);
+				cnt = cnt + tmp.size();
+				if (tmp.size() > 0)
+					enriched_key = key;
+			}
+
+			if (cnt == 0) {
+				System.out.println("TAG HAS NO ELEMENTS");
+				return null;
+			}
+
+			JSONObject alert = new JSONObject();
+
+			String source = "unknown";
+			String dest = "unknown";
+			String host = "unknown";
+
+			if (content.containsKey("ip_src_addr")) {
+				source = content.get("ip_src_addr").toString();
+
+				if (RangeChecker.checkRange(loaded_whitelist, source))
+					host = source;
+			}
+
+			if (content.containsKey("ip_dst_addr")) {
+				dest = content.get("ip_dst_addr").toString();
+
+				if (RangeChecker.checkRange(loaded_whitelist, dest))
+					host = dest;
+			}
+			
+			JSONObject threatQualifier = (JSONObject) threat.get(enriched_key);
+			
+			alert.put("designated_host", host);
+			String description =
+
+					new StringBuilder()
+					.append("Threat Intelligence match for ")
+					.append(content.get(enriched_key).toString())
+					.append(" from source: ")
+					.append(threatQualifier.keySet().iterator().next().toString())
+					.toString();	
+			alert.put("description", description);
+			alert.put("priority", "MED");
+
+			String alert_id = generateAlertId(source, dest, 0);
+
+			alert.put("alert_id", alert_id);
+			alerts.put(alert_id, alert);
+
+			alert.put("enrichment", enrichment);
+
+			return alerts;
+		} else {
+			System.out.println("DID NOT FIND TAG: " + enrichment_tag);
+			return null;
+		}
+
+	}
+
+	@Override
+	public boolean containsAlertId(String alert) {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	protected String generateAlertId(String source_ip, String dst_ip,
+			int alert_type) {
+
+		String key = makeKey(source_ip, dst_ip, alert_type);
+
+		if (cache.getIfPresent(key) != null)
+			return cache.getIfPresent(key);
+
+		String new_UUID = System.currentTimeMillis() + "-" + UUID.randomUUID();
+
+		cache.put(key, new_UUID);
+		key = makeKey(dst_ip, source_ip, alert_type);
+		cache.put(key, new_UUID);
+
+		return new_UUID;
+
+	}
+
+	private String makeKey(String ip1, String ip2, int alert_type) {
+		return (ip1 + "-" + ip2 + "-" + alert_type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/AbstractTaggerBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/AbstractTaggerBolt.java b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/AbstractTaggerBolt.java
new file mode 100644
index 0000000..72eaacd
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/AbstractTaggerBolt.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.tagging;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+
+import com.codahale.metrics.Counter;
+import com.apache.metron.alerts.interfaces.TaggerAdapter;
+import com.apache.metron.metrics.MetricReporter;
+
+@SuppressWarnings("rawtypes")
+public abstract class AbstractTaggerBolt extends BaseRichBolt {
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -6710596708304282838L;
+
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractTaggerBolt.class);
+
+	protected OutputCollector _collector;
+	protected TaggerAdapter _adapter;
+
+	protected String OutputFieldName;
+	protected JSONObject _identifier;
+	protected MetricReporter _reporter;
+	
+	protected Counter ackCounter, emitCounter, failCounter;
+
+	protected void registerCounters() {
+
+		String ackString = _adapter.getClass().getSimpleName() + ".ack";
+
+		String emitString = _adapter.getClass().getSimpleName() + ".emit";
+
+		String failString = _adapter.getClass().getSimpleName() + ".fail";
+
+		ackCounter = _reporter.registerCounter(ackString);
+		emitCounter = _reporter.registerCounter(emitString);
+		failCounter = _reporter.registerCounter(failString);
+
+	}
+
+	public final void prepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) {
+		_collector = collector;
+		
+		if (this._adapter == null)
+			throw new IllegalStateException("Tagging must be specified");
+		if(this._identifier == null)
+			throw new IllegalStateException("Identifier must be specified");
+		try {
+			doPrepare(conf, topologyContext, collector);
+		} catch (IOException e) {
+			LOG.error("Counld not initialize...");
+			e.printStackTrace();
+		}
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declearer) {
+		declearer.declare(new Fields(this.OutputFieldName));
+	}
+
+	abstract void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/TelemetryTaggerBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/TelemetryTaggerBolt.java b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/TelemetryTaggerBolt.java
new file mode 100644
index 0000000..69f87b1
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/TelemetryTaggerBolt.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.metron.tagging;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.apache.metron.alerts.interfaces.TaggerAdapter;
+import com.apache.metron.json.serialization.JSONEncoderHelper;
+import com.apache.metron.metrics.MetricReporter;
+
+@SuppressWarnings("rawtypes")
+public class TelemetryTaggerBolt extends AbstractTaggerBolt {
+
+	/**
+	 * Use an adapter to tag existing telemetry messages with alerts. The list
+	 * of available tagger adapters is located under
+	 * com.apache.metron.tagging.adapters. At the time of the release the following
+	 * adapters are available:
+	 * 
+	 * <p>
+	 * <ul>
+	 * <li>RegexTagger = read a list or regular expressions and tag a message if
+	 * they exist in a message
+	 * <li>StaticAllTagger = tag each message with a static alert
+	 * <ul>
+	 * <p>
+	 */
+	private static final long serialVersionUID = -2647123143398352020L;
+	private Properties metricProperties;
+	private JSONObject metricConfiguration;
+
+	/**
+	 * 
+	 * @param tagger
+	 *            - tagger adapter for generating alert tags
+	 * @return instance of bolt
+	 */
+	public TelemetryTaggerBolt withMessageTagger(TaggerAdapter tagger) {
+		_adapter = tagger;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param OutputFieldName
+	 *            - output name of the tuple coming out of this bolt
+	 * @return - instance of this bolt
+	 */
+	public TelemetryTaggerBolt withOutputFieldName(String OutputFieldName) {
+		this.OutputFieldName = OutputFieldName;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param metricProperties
+	 *            - metric output to graphite
+	 * @return - instance of this bolt
+	 */
+	public TelemetryTaggerBolt withMetricProperties(Properties metricProperties) {
+		this.metricProperties = metricProperties;
+		return this;
+	}
+
+	/**
+	 * 
+	 * @param identifier
+	 *            - the identifier tag for tagging telemetry messages with
+	 *            alerts out of this bolt
+	 * @return - instance of this bolt
+	 */
+
+	public TelemetryTaggerBolt withIdentifier(JSONObject identifier) {
+		this._identifier = identifier;
+		return this;
+	}
+	
+	/**
+	 * @param config
+	 *            A class for generating custom metrics into graphite
+	 * @return Instance of this class
+	 */
+
+	public TelemetryTaggerBolt withMetricConfiguration(Configuration config) {
+		this.metricConfiguration = JSONEncoderHelper.getJSON(config
+				.subset("com.apache.metron.metrics"));
+		return this;
+	}
+
+	@Override
+	void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException {
+
+		LOG.info("[Metron] Preparing TelemetryParser Bolt...");
+
+		try {
+			_reporter = new MetricReporter();
+			_reporter.initialize(metricProperties, TelemetryTaggerBolt.class);
+			LOG.info("[Metron] Initialized metrics");
+		} catch (Exception e) {
+			LOG.info("[Metron] Could not initialize metrics");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public void execute(Tuple tuple) {
+
+		LOG.trace("[Metron] Starting to process message for alerts");
+		JSONObject original_message = null;
+
+		try {
+
+			original_message = (JSONObject) tuple.getValue(0);
+
+			if (original_message == null || original_message.isEmpty())
+				throw new Exception("Could not parse message from byte stream");
+
+			LOG.trace("[Metron] Received tuple: " + original_message);
+
+			JSONObject alerts_tag = new JSONObject();
+			JSONArray alerts_list = _adapter.tag(original_message);
+
+			LOG.trace("[Metron] Tagged message: " + alerts_list);
+
+			if (alerts_list.size() != 0) {
+				if (original_message.containsKey("alerts")) {
+					JSONObject tag = (JSONObject) original_message
+							.get("alerts");
+					JSONArray already_triggered = (JSONArray) tag
+							.get("triggered");
+					alerts_list.addAll(already_triggered);
+					LOG.trace("[Metron] Created a new string of alerts");
+				}
+
+				alerts_tag.put("identifier", _identifier);
+				alerts_tag.put("triggered", alerts_list);
+				original_message.put("alerts", alerts_tag);
+				
+				LOG.debug("[Metron] Detected alerts: " + alerts_tag);
+			}
+			else
+			{
+				LOG.debug("[Metron] The following messages did not contain alerts: " + original_message);
+			}
+
+			_collector.ack(tuple);
+			_collector.emit(new Values(original_message));
+			
+			/*if (metricConfiguration != null) {
+				emitCounter.inc();
+				ackCounter.inc();
+			}*/
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			LOG.error("Failed to tag message :" + original_message);
+			e.printStackTrace();
+			_collector.fail(tuple);
+			
+			/*
+			if (metricConfiguration != null) {
+				failCounter.inc();
+			}*/
+		}
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declearer) {
+		declearer.declare(new Fields(this.OutputFieldName));
+
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/AbstractTaggerAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/AbstractTaggerAdapter.java b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/AbstractTaggerAdapter.java
new file mode 100644
index 0000000..8d56e6f
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/AbstractTaggerAdapter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.metron.tagging.adapters;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.apache.metron.alerts.interfaces.TaggerAdapter;
+
+@SuppressWarnings("serial")
+public abstract class AbstractTaggerAdapter implements TaggerAdapter, Serializable{
+	
+	protected static final Logger _LOG = LoggerFactory
+			.getLogger(AbstractTaggerAdapter.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/RegexTagger.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/RegexTagger.java b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/RegexTagger.java
new file mode 100644
index 0000000..2854ae6
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/RegexTagger.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.metron.tagging.adapters;
+
+import java.util.Map;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class RegexTagger extends AbstractTaggerAdapter{
+	
+	/**
+	 * Reads a regex rules file and tags a message with alerts if any rule from that file
+	 * matches anything in the telemetry message
+	 */
+	private static final long serialVersionUID = -6091495636459799411L;
+	Map <String, JSONObject> _rules;
+	
+	/**
+	 * 
+	 * @param rules rules read from a properties XML file
+	 */
+	public RegexTagger(Map<String, JSONObject> rules)
+	{
+		_rules = rules;
+	}
+
+	/**
+	 * @param raw_message telemetry message to be tagged
+	 */
+	@SuppressWarnings("unchecked")
+	public JSONArray tag(JSONObject raw_message) {
+
+		JSONArray ja = new JSONArray();
+		String message_as_string = raw_message.toString();
+		
+		for(String rule : _rules.keySet())
+		{		
+			if (message_as_string.matches(rule))
+			{
+				ja.add(_rules.get(rule));
+			}
+		}	
+		
+		return ja;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/StaticAllTagger.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/StaticAllTagger.java b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/StaticAllTagger.java
new file mode 100644
index 0000000..3962620
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/StaticAllTagger.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.metron.tagging.adapters;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class StaticAllTagger extends AbstractTaggerAdapter {
+
+	/**
+	 * Attaches a static alerts tag to JSON telemetry messages
+	 */
+	private static final long serialVersionUID = 7759427661169094065L;
+	private JSONObject _static_tag_message;
+	JSONArray ja = new JSONArray();
+
+	/**
+	 * 
+	 * @param static_tag_message
+	 *            static alerts tag to attach to the message as a JSON
+	 */
+	@SuppressWarnings("unchecked")
+	public StaticAllTagger(JSONObject static_tag_message) {
+		_static_tag_message = static_tag_message;
+		ja.add(_static_tag_message);
+	}
+
+	/**
+	 * @param raw_message
+	 *            message to tag
+	 */
+	public JSONArray tag(JSONObject raw_message) {
+
+		return ja;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/TaggerAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/TaggerAdapter.java b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/TaggerAdapter.java
new file mode 100644
index 0000000..b3ba635
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/tagging/adapters/TaggerAdapter.java
@@ -0,0 +1,9 @@
+package com.apache.metron.tagging.adapters;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public interface TaggerAdapter {
+
+	JSONArray tag(JSONObject raw_message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java b/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java
deleted file mode 100644
index 4e3e0a5..0000000
--- a/metron-streaming/Metron-Alerts/src/test/java/com/apache/metron/alerts/adapters/AllAlertAdapterTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.apache.metron.alerts.adapters;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-import java.util.Properties;
-
-import com.apache.metron.test.AbstractConfigTest;
-import com.apache.metron.alerts.adapters.AllAlertAdapter;
-
- /**
- * <ul>
- * <li>Title: AllAlertAdapterTest</li>
- * <li>Description: Tests for AllAlertAdapter</li>
- * <li>Created: Oct 8, 2014</li>
- * </ul>
- * @version $Revision: 1.1 $
- */
-public class AllAlertAdapterTest extends AbstractConfigTest {
-
-     /**
-     * The allAlertAdapter.
-     */
-    private static AllAlertAdapter allAlertAdapter=null;
-    
-     /**
-     * The connected.
-     */
-    private static boolean connected=false;
-
-    /**
-     * Constructs a new <code>AllAlertAdapterTest</code> instance.
-     * @param name
-     */
-    public AllAlertAdapterTest(String name) {
-        super(name);
-    }
-
-    /**
-     * @throws java.lang.Exception
-     */
-    protected static void setUpBeforeClass() throws Exception {
-    }
-
-    /**
-     * @throws java.lang.Exception
-     */
-    protected static void tearDownAfterClass() throws Exception {
-    }
-
-    /* 
-     * (non-Javadoc)
-     * @see junit.framework.TestCase#setUp()
-     */
-
-    @SuppressWarnings("unchecked")
-    protected void setUp() throws Exception {
-          super.setUp("com.apache.metron.alerts.adapters.AllAlertAdapter");
-          Properties prop = super.getTestProperties();
-          assertNotNull(prop);   
-       // this.setMode("global");
-        if(skipTests(this.getMode())){
-            System.out.println(getClass().getName()+" Skipping Tests !!Local Mode");
-            return;//skip tests
-       }else{      
-           Map<String, String> settings = super.getSettings();
-           @SuppressWarnings("rawtypes")
-        Class loaded_class = Class.forName("com.apache.metron.alerts.adapters.AllAlertAdapter");
-           @SuppressWarnings("rawtypes")
-        Constructor constructor = loaded_class.getConstructor(new Class[] { Map.class});
-           
-           AllAlertAdapterTest.allAlertAdapter = (AllAlertAdapter) constructor.newInstance(settings);
-            // AllAlertAdapterTest.allAlertAdapter = new AllAlertAdapter(settings)
-      }
-    }
-
-    /* 
-     * (non-Javadoc)
-     * @see junit.framework.TestCase#tearDown()
-     */
-
-    protected void tearDown() throws Exception {
-        super.tearDown();
-    }
-
-
-    /**
-     * Test method for {@link com.apache.metron.alerts.adapters.AlllterAdapter#initialize()}.
-     */
-    public void testInitializeAdapter() {
-        if(skipTests(this.getMode())){
-            return;//skip tests
-       }else{        
-           
-        boolean initialized =AllAlertAdapterTest.getAllAlertAdapter().initialize();
-        assertTrue(initialized);
-       }
-    }
-    
-    /**
-     * Test method for containsAlertId(@link  com.apache.metron.alerts.adapters.AlllterAdapter#containsAlertId()}.
-     */
-    public void testContainsAlertId(){
-        if(skipTests(this.getMode())){
-            return;//skip tests
-       }else{          
-            boolean containsAlert =AllAlertAdapterTest.getAllAlertAdapter().containsAlertId("test");
-            assertFalse(containsAlert);
-       }
-    }
- 
-   
-
-    /**
-     * Returns the allAlertAdapter.
-     * @return the allAlertAdapter.
-     */
-    
-    public static AllAlertAdapter getAllAlertAdapter() {
-        return allAlertAdapter;
-    }
-
-    /**
-     * Sets the allAlertAdapter.
-     * @param allAlertAdapter the allAlertAdapter.
-     */
-    
-    public static void setAllAlertAdapter(AllAlertAdapter allAlertAdapter) {
-    
-        AllAlertAdapterTest.allAlertAdapter = allAlertAdapter;
-    }
-    /**
-     * Returns the connected.
-     * @return the connected.
-     */
-    
-    public static boolean isConnected() {
-        return connected;
-    }
-
-    /**
-     * Sets the connected.
-     * @param connected the connected.
-     */
-    
-    public static void setConnected(boolean connected) {
-    
-        AllAlertAdapterTest.connected = connected;
-    }    
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Alerts/src/test/java/org/apache/metron/alerts/adapters/AllAlertAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/test/java/org/apache/metron/alerts/adapters/AllAlertAdapterTest.java b/metron-streaming/Metron-Alerts/src/test/java/org/apache/metron/alerts/adapters/AllAlertAdapterTest.java
new file mode 100644
index 0000000..4e3e0a5
--- /dev/null
+++ b/metron-streaming/Metron-Alerts/src/test/java/org/apache/metron/alerts/adapters/AllAlertAdapterTest.java
@@ -0,0 +1,166 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.alerts.adapters;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.Properties;
+
+import com.apache.metron.test.AbstractConfigTest;
+import com.apache.metron.alerts.adapters.AllAlertAdapter;
+
+ /**
+ * <ul>
+ * <li>Title: AllAlertAdapterTest</li>
+ * <li>Description: Tests for AllAlertAdapter</li>
+ * <li>Created: Oct 8, 2014</li>
+ * </ul>
+ * @version $Revision: 1.1 $
+ */
+public class AllAlertAdapterTest extends AbstractConfigTest {
+
+     /**
+     * The allAlertAdapter.
+     */
+    private static AllAlertAdapter allAlertAdapter=null;
+    
+     /**
+     * The connected.
+     */
+    private static boolean connected=false;
+
+    /**
+     * Constructs a new <code>AllAlertAdapterTest</code> instance.
+     * @param name
+     */
+    public AllAlertAdapterTest(String name) {
+        super(name);
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    protected static void setUpBeforeClass() throws Exception {
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    protected static void tearDownAfterClass() throws Exception {
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+
+    @SuppressWarnings("unchecked")
+    protected void setUp() throws Exception {
+          super.setUp("com.apache.metron.alerts.adapters.AllAlertAdapter");
+          Properties prop = super.getTestProperties();
+          assertNotNull(prop);   
+       // this.setMode("global");
+        if(skipTests(this.getMode())){
+            System.out.println(getClass().getName()+" Skipping Tests !!Local Mode");
+            return;//skip tests
+       }else{      
+           Map<String, String> settings = super.getSettings();
+           @SuppressWarnings("rawtypes")
+        Class loaded_class = Class.forName("com.apache.metron.alerts.adapters.AllAlertAdapter");
+           @SuppressWarnings("rawtypes")
+        Constructor constructor = loaded_class.getConstructor(new Class[] { Map.class});
+           
+           AllAlertAdapterTest.allAlertAdapter = (AllAlertAdapter) constructor.newInstance(settings);
+            // AllAlertAdapterTest.allAlertAdapter = new AllAlertAdapter(settings)
+      }
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#tearDown()
+     */
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+
+    /**
+     * Test method for {@link com.apache.metron.alerts.adapters.AlllterAdapter#initialize()}.
+     */
+    public void testInitializeAdapter() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{        
+           
+        boolean initialized =AllAlertAdapterTest.getAllAlertAdapter().initialize();
+        assertTrue(initialized);
+       }
+    }
+    
+    /**
+     * Test method for containsAlertId(@link  com.apache.metron.alerts.adapters.AlllterAdapter#containsAlertId()}.
+     */
+    public void testContainsAlertId(){
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{          
+            boolean containsAlert =AllAlertAdapterTest.getAllAlertAdapter().containsAlertId("test");
+            assertFalse(containsAlert);
+       }
+    }
+ 
+   
+
+    /**
+     * Returns the allAlertAdapter.
+     * @return the allAlertAdapter.
+     */
+    
+    public static AllAlertAdapter getAllAlertAdapter() {
+        return allAlertAdapter;
+    }
+
+    /**
+     * Sets the allAlertAdapter.
+     * @param allAlertAdapter the allAlertAdapter.
+     */
+    
+    public static void setAllAlertAdapter(AllAlertAdapter allAlertAdapter) {
+    
+        AllAlertAdapterTest.allAlertAdapter = allAlertAdapter;
+    }
+    /**
+     * Returns the connected.
+     * @return the connected.
+     */
+    
+    public static boolean isConnected() {
+        return connected;
+    }
+
+    /**
+     * Sets the connected.
+     * @param connected the connected.
+     */
+    
+    public static void setConnected(boolean connected) {
+    
+        AllAlertAdapterTest.connected = connected;
+    }    
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java
deleted file mode 100644
index 9b5bfa9..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsAdapter.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.apache.metron.alerts.interfaces;
-
-import java.util.Map;
-
-import org.json.simple.JSONObject;
-
-public interface AlertsAdapter {
-
-	boolean initialize();
-
-	boolean refresh() throws Exception;
-
-	Map<String, JSONObject> alert(JSONObject raw_message);
-
-	boolean containsAlertId(String alert);
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java
deleted file mode 100644
index b71e3e9..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/AlertsInterface.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.apache.metron.alerts.interfaces;
-
-import org.json.simple.JSONObject;
-
-public interface AlertsInterface {
-
-	public JSONObject getContent();
-	public void setContent(JSONObject content);
-	public String getUuid();
-	public void setUuid(String uuid);
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java
deleted file mode 100644
index db57b5f..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/alerts/interfaces/TaggerAdapter.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package com.apache.metron.alerts.interfaces;
-
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-
-public interface TaggerAdapter {
-
-	JSONArray tag(JSONObject raw_message);
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java
deleted file mode 100644
index 2bd05b9..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/configuration/ConfigurationManager.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package com.apache.metron.configuration;
-
-
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.configuration.CombinedConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.DefaultConfigurationBuilder;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
-/**
- * Configuration manager class which loads all 'config-definition.xml' files and
- * creates a Configuration object which holds all properties from the underlying
- * configuration resource
- */
-public class ConfigurationManager {
-
-  /** configuration definition file name. */
-  private static String DEFAULT_CONFIG_DEFINITION_FILE_NAME = "config-definition.xml";
-
-  /** Stores a map with the configuration for each path specified. */
-  private static Map<String, Configuration> configurationsCache = new HashMap<String, Configuration>();
-
-  /** The Constant LOGGER. */
-  private static final Logger LOGGER = Logger
-      .getLogger(ConfigurationManager.class);
-
-  /**
-   * Common method to load content of all configuration resources defined in
-   * 'config-definition.xml'.
-   * 
-   * @param configDefFilePath
-   *          the config def file path
-   * @return Configuration
-   */
-  public static Configuration getConfiguration(String configDefFilePath) {
-    if (configurationsCache.containsKey(configDefFilePath)) {
-      return configurationsCache.get(configDefFilePath);
-    }
-    CombinedConfiguration configuration = null;
-    synchronized (configurationsCache) {
-      if (configurationsCache.containsKey(configDefFilePath)) {
-        return configurationsCache.get(configDefFilePath);
-      }
-      DefaultConfigurationBuilder builder = new DefaultConfigurationBuilder();
-      String fielPath = getConfigDefFilePath(configDefFilePath);
-      LOGGER.info("loading from 'configDefFilePath' :" + fielPath);
-      builder.setFile(new File(fielPath));
-      try {
-        configuration = builder.getConfiguration(true);
-        configurationsCache.put(fielPath, configuration);
-      } catch (ConfigurationException e) {
-        LOGGER.info("Exception in loading property files.", e);
-      }
-    }
-    return configuration;
-  }
-
-  /**
-   * Removes the configuration created from a config definition file located at
-   * 'configDefFilePath'.
-   * 
-   * @param configDefFilePath
-   *          path to the config definition file
-   */
-  public static void clearConfiguration(String configDefFilePath) {
-    configurationsCache.remove(configDefFilePath);
-  }
-
-  /**
-   * Gets the configuration.
-   * 
-   * @return the configuration
-   */
-  public static Configuration getConfiguration() {
-    return getConfiguration(null);
-  }
-
-  /**
-   * Returns the 'config-definition.xml' file path. 1. If the param
-   * 'configDefFilePath' has a valid value, returns configDefFilePath 2. If the
-   * system property key 'configDefFilePath' has a valid value, returns the
-   * value 3. By default, it returns the file name 'config-definition.xml'
-   * 
-   * @param configDefFilePath
-   *          given input path to the config definition file
-   * @return the config def file path
-   */
-  private static String getConfigDefFilePath(String configDefFilePath) {
-    if (StringUtils.isNotEmpty(configDefFilePath)) {
-      return configDefFilePath;
-    }
-    return DEFAULT_CONFIG_DEFINITION_FILE_NAME;
-  }
-
-  /**
-   * The main method.
-   * 
-   * @param args
-   *          the args
-   * @throws InterruptedException
-   *           the interrupted exception
-   */
-  public static void main(String[] args) throws InterruptedException {
-    Configuration config = ConfigurationManager
-        .getConfiguration("/Users/Sayi/Documents/config/config-definition-dpi.xml");
-    System.out.println("elastic.search.cluster ="
-        + config.getString("elastic.search.cluster"));
-    Thread.sleep(10000);
-    System.out.println("storm.topology.dpi.bolt.es-index.index.name ="
-        + config.getString("storm.topology.dpi.bolt.es-index.index.name"));
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java
deleted file mode 100644
index d579264..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/dataloads/interfaces/ThreatIntelSource.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.apache.metron.dataloads.interfaces;
-
-import java.util.Iterator;
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-
-public interface ThreatIntelSource extends Iterator<JSONObject> {
-
-	void initializeSource(Configuration config);
-	void cleanupSource();
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
deleted file mode 100644
index b1986a0..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/enrichment/interfaces/EnrichmentAdapter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.apache.metron.enrichment.interfaces;
-
-import org.json.simple.JSONObject;
-
-public interface EnrichmentAdapter
-{
-	JSONObject enrich(String metadata);
-	boolean initializeAdapter();
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java
deleted file mode 100644
index 3ff53d6..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseBolt.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package com.apache.metron.hbase;
-
-
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.json.simple.JSONObject;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import com.apache.metron.helpers.topology.ErrorGenerator;
-
-/**
- * A Storm bolt for putting data into HBase.
- * <p>
- * By default works in batch mode by enabling HBase's client-side write buffer. Enabling batch mode
- * is recommended for high throughput, but it can be disabled in {@link TupleTableConfig}.
- * <p>
- * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
- * classpath
- * @see TupleTableConfig
- * @see HTableConnector
- */
-@SuppressWarnings("serial")
-public class HBaseBolt implements IRichBolt {
-  private static final Logger LOG = Logger.getLogger(HBaseBolt.class);
-
-  protected OutputCollector collector;
-  protected HTableConnector connector;
-  protected TupleTableConfig conf;
-  protected boolean autoAck = true;
-  
-  private String _quorum;
-  private String _port;
-
-  public HBaseBolt(TupleTableConfig conf, String quorum, String port) {
-    this.conf = conf;
-    _quorum = quorum;
-    _port = port;
-
-  }
-
-  /** {@inheritDoc} */
-  @SuppressWarnings("rawtypes")
-  
-  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    this.collector = collector;
-
-    try {
-      this.connector = new HTableConnector(conf, _quorum, _port);
-
-		
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    LOG.info("Preparing HBaseBolt for table: " + this.conf.getTableName());
-  }
-
-  /** {@inheritDoc} */
-  
-  public void execute(Tuple input) {
-    try {
-      this.connector.getTable().put(conf.getPutFromTuple(input));
-    } catch (IOException ex) {
-
-  		JSONObject error = ErrorGenerator.generateErrorMessage(
-  				"Alerts problem: " + input.getBinary(0), ex);
-  		collector.emit("error", new Values(error));
-  		
-      throw new RuntimeException(ex);
-    }
-
-    if (this.autoAck) {
-      this.collector.ack(input);
-    }
-  }
-
-  /** {@inheritDoc} */
-  
-  public void cleanup() {
-    this.connector.close();
-  }
-
-  /** {@inheritDoc} */
-  
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-	  declarer.declareStream("error", new Fields("HBase"));
-  }
-
-  /** {@inheritDoc} */
-  
-  public Map<String, Object> getComponentConfiguration() {
-    return null;
-  }
-
-  /**
-   * @return the autoAck
-   */
-  public boolean isAutoAck() {
-    return autoAck;
-  }
-
-  /**
-   * @param autoAck the autoAck to set
-   */
-  public void setAutoAck(boolean autoAck) {
-    this.autoAck = autoAck;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java
deleted file mode 100644
index 19a9bfd..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HBaseStreamPartitioner.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package com.apache.metron.hbase;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HTable;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-
-public class HBaseStreamPartitioner implements CustomStreamGrouping {
-
-  private static final long serialVersionUID = -148324019395976092L;
-  private String[] regionStartKeys = { "0" };
-  private Map<String, String> regionStartKeyRegionNameMap = new HashMap<String, String>();
-
-  private List<Integer> targetTasks = null;
-  private int targetTasksSize = 0;
-  private int rowKeyFieldIndex = 0;
-  private String tableName = null;
-  private long regionCheckTime = 0;
-  private int regionInforRefreshIntervalInMins = 60;
-  private int regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
-
-  HTable hTable = null;;
-
-  
-  public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-    
-    System.out.println("preparing HBaseStreamPartitioner for streamId " + stream.get_streamId());
-    this.targetTasks = targetTasks;
-    this.targetTasksSize = this.targetTasks.size();
-
-    Configuration conf = HBaseConfiguration.create();
-    try {
-      hTable = new HTable(conf, tableName);
-      refreshRegionInfo(tableName);
-
-      System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
-
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-  }
-  
-  public void prepare() {
-    
-    System.out.println("preparing HBaseStreamPartitioner for streamId " );
-
-    Configuration conf = HBaseConfiguration.create();
-    try {
-      hTable = new HTable(conf, tableName);
-      refreshRegionInfo(tableName);
-
-      System.out.println("regionStartKeyRegionNameMap: " + regionStartKeyRegionNameMap);
-
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-  }
-
-  public HBaseStreamPartitioner(String tableName, int rowKeyFieldIndex, int regionInforRefreshIntervalInMins) {
-    System.out.println("Created HBaseStreamPartitioner ");
-    this.rowKeyFieldIndex = rowKeyFieldIndex;
-    this.tableName = tableName;
-    this.regionInforRefreshIntervalInMins = regionInforRefreshIntervalInMins;
-    this.regionInforRefreshIntervalInMillis = regionInforRefreshIntervalInMins * 60000;
-
-  }
-
-  
-  public List<Integer> chooseTasks(int taskId, List<Object> values) {
-    List<Integer> choosenTasks = null;
-    System.out.println("Choosing task for taskId " + taskId + " and values " + values);
-
-    if (regionInforRefreshIntervalInMillis > (System.currentTimeMillis() - regionCheckTime)) {
-      try {
-        refreshRegionInfo(tableName);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
-    int regionIndex = getRegionIndex((String) values.get(rowKeyFieldIndex));
-
-    if (regionIndex < targetTasksSize) {
-      choosenTasks = Arrays.asList(regionIndex);
-
-    } else {
-      choosenTasks = Arrays.asList(regionIndex % targetTasksSize);
-    }
-    System.out.println("Choosen tasks are " + choosenTasks);
-
-    return choosenTasks;
-
-
-  }
-
-  
-  
-  public int getRegionIndex(String key) {
-    int index = Arrays.binarySearch(regionStartKeys, key);
-    if (index < -1) {
-      index = (index + 2) * -1;
-    } else if (index == -1) {
-      index = 0;
-    }
-
-    return index;
-  }
-
-  private void refreshRegionInfo(String tableName) throws IOException {
-
-    System.out.println("in refreshRegionInfo ");
-
-    Map<HRegionInfo, ServerName> regionMap = hTable.getRegionLocations();
-
-    synchronized (regionStartKeys) {
-      synchronized (regionStartKeyRegionNameMap) {
-        regionStartKeys = new String[regionMap.size()];
-        int index = 0;
-        String startKey = null;
-        regionStartKeyRegionNameMap.clear();
-        for (HRegionInfo regionInfo : regionMap.keySet()) {
-          startKey = new String(regionInfo.getStartKey());
-          regionStartKeyRegionNameMap.put(startKey, regionInfo.getRegionNameAsString());
-          regionStartKeys[index] = startKey;
-          index++;
-        }
-
-        Arrays.sort(regionStartKeys);
-        regionCheckTime = System.currentTimeMillis();
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java
deleted file mode 100644
index 9bacd71..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/HTableConnector.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package com.apache.metron.hbase;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.log4j.Logger;
-
-import backtype.storm.generated.Bolt;
-
-/**
- * HTable connector for Storm {@link Bolt}
- * <p>
- * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt> encountered in the
- * classpath
- */
-@SuppressWarnings("serial")
-public class HTableConnector implements Serializable {
-  private static final Logger LOG = Logger.getLogger(HTableConnector.class);
-
-  private Configuration conf;
-  protected HTable table;
-  private String tableName;
-
-  /**
-   * Initialize HTable connection
-   * @param conf The {@link TupleTableConfig}
-   * @throws IOException
-   */
-  public HTableConnector(final TupleTableConfig conf, String _quorum, String _port) throws IOException {
-    this.tableName = conf.getTableName();
-    this.conf = HBaseConfiguration.create();
-    
-    if(_quorum != null && _port != null)
-    {
-    	this.conf.set("hbase.zookeeper.quorum", _quorum);
-    	this.conf.set("hbase.zookeeper.property.clientPort", _port);
-    }
-
-    LOG.info(String.format("Initializing connection to HBase table %s at %s", tableName,
-      this.conf.get("hbase.rootdir")));
-
-    try {
-      this.table = new HTable(this.conf, this.tableName);
-    } catch (IOException ex) {
-      throw new IOException("Unable to establish connection to HBase table " + this.tableName, ex);
-    }
-
-    if (conf.isBatch()) {
-      // Enable client-side write buffer
-      this.table.setAutoFlush(false, true);
-      LOG.info("Enabled client-side write buffer");
-    }
-
-    // If set, override write buffer size
-    if (conf.getWriteBufferSize() > 0) {
-      try {
-        this.table.setWriteBufferSize(conf.getWriteBufferSize());
-
-        LOG.info("Setting client-side write buffer to " + conf.getWriteBufferSize());
-      } catch (IOException ex) {
-        LOG.error("Unable to set client-side write buffer size for HBase table " + this.tableName,
-          ex);
-      }
-    }
-
-    // Check the configured column families exist
-    for (String cf : conf.getColumnFamilies()) {
-      if (!columnFamilyExists(cf)) {
-        throw new RuntimeException(String.format(
-          "HBase table '%s' does not have column family '%s'", conf.getTableName(), cf));
-      }
-    }
-  }
-
-  /**
-   * Checks to see if table contains the given column family
-   * @param columnFamily The column family name
-   * @return boolean
-   * @throws IOException
-   */
-  private boolean columnFamilyExists(final String columnFamily) throws IOException {
-    return this.table.getTableDescriptor().hasFamily(Bytes.toBytes(columnFamily));
-  }
-
-  /**
-   * @return the table
-   */
-  public HTable getTable() {
-    return table;
-  }
-
-  /**
-   * Close the table
-   */
-  public void close() {
-    try {
-      this.table.close();
-    } catch (IOException ex) {
-      LOG.error("Unable to close connection to HBase table " + tableName, ex);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java
deleted file mode 100644
index 90681c7..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/hbase/TupleTableConfig.java
+++ /dev/null
@@ -1,279 +0,0 @@
-package com.apache.metron.hbase;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import backtype.storm.tuple.Tuple;
-
-/**
- * Configuration for Storm {@link Tuple} to HBase serialization.
- */
-@SuppressWarnings("serial")
-public class TupleTableConfig implements Serializable {
-  
-  public static final long DEFAULT_INCREMENT = 1L;
-  
-  private String tableName;
-  protected String tupleRowKeyField;
-  protected String tupleTimestampField;
-  protected Map<String, Set<String>> columnFamilies;
-  private boolean batch = true;
-  protected Durability durability = Durability.USE_DEFAULT;
-  private long writeBufferSize = 0L;
-  
-  /**
-   * Initialize configuration
-   * 
-   * @param table
-   *          The HBase table name
-   * @param rowKeyField
-   *          The {@link Tuple} field used to set the rowKey
-   */
-  public TupleTableConfig(final String table, final String rowKeyField) {
-    this.tableName = table;
-    this.tupleRowKeyField = rowKeyField;
-    this.tupleTimestampField = "";
-    this.columnFamilies = new HashMap<String, Set<String>>();
-  }
-  
-  /**
-   * Initialize configuration
-   * 
-   * @param table
-   *          The HBase table name
-   * @param rowKeyField
-   *          The {@link Tuple} field used to set the rowKey
-   * @param timestampField
-   *          The {@link Tuple} field used to set the timestamp
-   */
-  public TupleTableConfig(final String table, final String rowKeyField, final String timestampField) {
-    this.tableName = table;
-    this.tupleRowKeyField = rowKeyField;
-    this.tupleTimestampField = timestampField;
-    this.columnFamilies = new HashMap<String, Set<String>>();
-  }
-  
-  /**
-   * Add column family and column qualifier to be extracted from tuple
-   * 
-   * @param columnFamily
-   *          The column family name
-   * @param columnQualifier
-   *          The column qualifier name
-   */
-  public void addColumn(final String columnFamily, final String columnQualifier) {
-    Set<String> columns = this.columnFamilies.get(columnFamily);
-    
-    if (columns == null) {
-      columns = new HashSet<String>();
-    }
-    columns.add(columnQualifier);
-    
-    this.columnFamilies.put(columnFamily, columns);
-  }
-  
-  /**
-   * Creates a HBase {@link Put} from a Storm {@link Tuple}
-   * 
-   * @param tuple
-   *          The {@link Tuple}
-   * @return {@link Put}
-   */
-  public Put getPutFromTuple(final Tuple tuple) {
-    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
-    
-    long ts = 0;
-    if (!tupleTimestampField.equals("")) {
-      ts = tuple.getLongByField(tupleTimestampField);
-    }
-    
-    Put p = new Put(rowKey);
-    
-    p.setDurability(durability);
-    
-    if (columnFamilies.size() > 0) {
-      for (String cf : columnFamilies.keySet()) {
-        byte[] cfBytes = Bytes.toBytes(cf);
-        for (String cq : columnFamilies.get(cf)) {
-          byte[] cqBytes = Bytes.toBytes(cq);
-          byte[] val = tuple.getBinaryByField(cq);
-          
-          if (ts > 0) {
-            p.add(cfBytes, cqBytes, ts, val);
-          } else {
-            p.add(cfBytes, cqBytes, val);
-          }
-        }
-      }
-    }
-    
-    return p;
-  }
-  
-  /**
-   * Creates a HBase {@link Increment} from a Storm {@link Tuple}
-   * 
-   * @param tuple
-   *          The {@link Tuple}
-   * @param increment
-   *          The amount to increment the counter by
-   * @return {@link Increment}
-   */
-  public Increment getIncrementFromTuple(final Tuple tuple, final long increment) {
-    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));
-    
-    Increment inc = new Increment(rowKey);
-    inc.setDurability(durability);
-    
-    if (columnFamilies.size() > 0) {
-      for (String cf : columnFamilies.keySet()) {
-        byte[] cfBytes = Bytes.toBytes(cf);
-        for (String cq : columnFamilies.get(cf)) {
-          byte[] val;
-          try {
-            val = Bytes.toBytes(tuple.getStringByField(cq));
-          } catch (IllegalArgumentException ex) {
-            // if cq isn't a tuple field, use cq for counter instead of tuple
-            // value
-            val = Bytes.toBytes(cq);
-          }
-          inc.addColumn(cfBytes, val, increment);
-        }
-      }
-    }
-    
-    return inc;
-  }
-  
-  /**
-   * Increment the counter for the given family and column by the specified
-   * amount
-   * <p>
-   * If the family and column already exist in the Increment the counter value
-   * is incremented by the specified amount rather than overridden, as it is in
-   * HBase's {@link Increment#addColumn(byte[], byte[], long)} method
-   * 
-   * @param inc
-   *          The {@link Increment} to update
-   * @param family
-   *          The column family
-   * @param qualifier
-   *          The column qualifier
-   * @param amount
-   *          The amount to increment the counter by
-   */
-  public static void addIncrement(Increment inc, final byte[] family, final byte[] qualifier, final Long amount) {
-    
-    NavigableMap<byte[], Long> set = inc.getFamilyMapOfLongs().get(family);
-    if (set == null) {
-      set = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
-    }
-    
-    // If qualifier exists, increment amount
-    Long counter = set.get(qualifier);
-    if (counter == null) {
-      counter = 0L;
-    }
-    set.put(qualifier, amount + counter);
-    
-    inc.getFamilyMapOfLongs().put(family, set);
-  }
-  
-  /**
-   * @return the tableName
-   */
-  public String getTableName() {
-    return tableName;
-  }
-  
-  /**
-   * @return Whether batch mode is enabled
-   */
-  public boolean isBatch() {
-    return batch;
-  }
-  
-  /**
-   * @param batch
-   *          Whether to enable HBase's client-side write buffer.
-   *          <p>
-   *          When enabled your bolt will store put operations locally until the
-   *          write buffer is full, so they can be sent to HBase in a single RPC
-   *          call. When disabled each put operation is effectively an RPC and
-   *          is sent straight to HBase. As your bolt can process thousands of
-   *          values per second it is recommended that the write buffer is
-   *          enabled.
-   *          <p>
-   *          Enabled by default
-   */
-  public void setBatch(boolean batch) {
-    this.batch = batch;
-  }
-  
-  /**
-   * @param setDurability
-   *          Sets whether to write to HBase's edit log.
-   *          <p>
-   *          Setting to false will mean fewer operations to perform when
-   *          writing to HBase and hence better performance, but changes that
-   *          haven't been flushed to a store file will be lost in the event of
-   *          HBase failure
-   *          <p>
-   *          Enabled by default
-   */
-  public void setDurability(Durability durability) {
-    this.durability = durability;
-  }
-  
-  
-  public Durability getDurability() {
-    return  durability;
-  }
-  
-  /**
-   * @param writeBufferSize
-   *          Overrides the client-side write buffer size.
-   *          <p>
-   *          By default the write buffer size is 2 MB (2097152 bytes). If you
-   *          are storing larger data, you may want to consider increasing this
-   *          value to allow your bolt to efficiently group together a larger
-   *          number of records per RPC
-   *          <p>
-   *          Overrides the write buffer size you have set in your
-   *          hbase-site.xml e.g. <code>hbase.client.write.buffer</code>
-   */
-  public void setWriteBufferSize(long writeBufferSize) {
-    this.writeBufferSize = writeBufferSize;
-  }
-  
-  /**
-   * @return the writeBufferSize
-   */
-  public long getWriteBufferSize() {
-    return writeBufferSize;
-  }
-  
-  /**
-   * @return A Set of configured column families
-   */
-  public Set<String> getColumnFamilies() {
-    return this.columnFamilies.keySet();
-  }
-  
-  /**
-   * @return the tupleRowKeyField
-   */
-  public String getTupleRowKeyField() {
-    return tupleRowKeyField;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java
deleted file mode 100644
index 5ff25ca..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/services/PcapServiceCli.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package com.apache.metron.helpers.services;
-
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-public class PcapServiceCli {
-
-	private String[] args = null;
-	private Options options = new Options();
-
-	int port = 8081;
-	String uri = "/pcapGetter";
-
-	public int getPort() {
-		return port;
-	}
-
-	public void setPort(int port) {
-		this.port = port;
-	}
-
-	public String getUri() {
-		return uri;
-	}
-
-	public void setUri(String uri) {
-		this.uri = uri;
-	}
-
-	public PcapServiceCli(String[] args) {
-
-		this.args = args;
-
-		Option help = new Option("h", "Display help menue");
-		options.addOption(help);
-		options.addOption(
-				"port",
-				true,
-				"OPTIONAL ARGUMENT [portnumber] If this argument sets the port for starting the service.  If this argument is not set the port will start on defaut port 8081");
-		options.addOption(
-				"endpoint_uri",
-				true,
-				"OPTIONAL ARGUMENT [/uri/to/service] This sets the URI for the service to be hosted.  The default URI is /pcapGetter");
-	}
-
-	public void parse() {
-		CommandLineParser parser = new BasicParser();
-
-		CommandLine cmd = null;
-
-		try {
-			cmd = parser.parse(options, args);
-		} catch (ParseException e1) {
-
-			e1.printStackTrace();
-		}
-
-		if (cmd.hasOption("h"))
-			help();
-
-		if (cmd.hasOption("port")) {
-
-			try {
-				port = Integer.parseInt(cmd.getOptionValue("port").trim());
-			} catch (Exception e) {
-
-				System.out.println("[Metron] Invalid value for port entered");
-				help();
-			}
-		}
-		if (cmd.hasOption("endpoint_uri")) {
-
-			try {
-
-				if (uri == null || uri.equals(""))
-					throw new Exception("invalid uri");
-
-				uri = cmd.getOptionValue("uri").trim();
-
-				if (uri.charAt(0) != '/')
-					uri = "/" + uri;
-
-				if (uri.charAt(uri.length()) == '/')
-					uri = uri.substring(0, uri.length() - 1);
-
-			} catch (Exception e) {
-				System.out.println("[Metron] Invalid URI entered");
-				help();
-			}
-		}
-
-	}
-
-	private void help() {
-		// This prints out some help
-		HelpFormatter formater = new HelpFormatter();
-
-		formater.printHelp("Topology Options:", options);
-
-		// System.out
-		// .println("[Metron] Example usage: \n storm jar Metron-Topologies-0.3BETA-SNAPSHOT.jar com.apache.metron.topology.Bro -local_mode true -config_path Metron_Configs/ -generator_spout true");
-
-		System.exit(0);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/Cli.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/Cli.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/Cli.java
deleted file mode 100644
index a7560e1..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/Cli.java
+++ /dev/null
@@ -1,186 +0,0 @@
-package com.apache.metron.helpers.topology;
-
-import java.io.File;
-
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-public class Cli {
-
-	private String[] args = null;
-	private Options options = new Options();
-
-	private String path = null;
-	private boolean debug = true;
-	private boolean local_mode = true;
-	private boolean generator_spout = false;
-
-	public boolean isGenerator_spout() {
-		return generator_spout;
-	}
-
-	public void setGenerator_spout(boolean generator_spout) {
-		this.generator_spout = generator_spout;
-	}
-
-	public String getPath() {
-		return path;
-	}
-
-	public void setPath(String path) {
-		this.path = path;
-	}
-
-	public boolean isDebug() {
-		return debug;
-	}
-
-	public void setDebug(boolean debug) {
-		this.debug = debug;
-	}
-
-	public boolean isLocal_mode() {
-		return local_mode;
-	}
-
-	public void setLocal_mode(boolean local_mode) {
-		this.local_mode = local_mode;
-	}
-
-	public Cli(String[] args) {
-
-		this.args = args;
-
-		Option help = new Option("h", "Display help menue");
-		options.addOption(help);
-		options.addOption(
-				"config_path",
-				true,
-				"OPTIONAL ARGUMENT [/path/to/configs] Path to configuration folder. If not provided topology will initialize with default configs");
-		options.addOption(
-				"local_mode",
-				true,
-				"REQUIRED ARGUMENT [true|false] Local mode or cluster mode.  If set to true the topology will run in local mode.  If set to false the topology will be deployed to Storm nimbus");
-		options.addOption(
-				"debug",
-				true,
-				"OPTIONAL ARGUMENT [true|false] Storm debugging enabled.  Default value is true");
-		options.addOption(
-				"generator_spout",
-				true,
-				"REQUIRED ARGUMENT [true|false] Turn on test generator spout.  Default is set to false.  If test generator spout is turned on then kafka spout is turned off.  Instead the generator spout will read telemetry from file and ingest it into a topology");
-	}
-
-	public void parse() {
-		CommandLineParser parser = new BasicParser();
-
-		CommandLine cmd = null;
-		try {
-			cmd = parser.parse(options, args);
-
-			if (cmd.hasOption("h"))
-				help();
-
-			if (cmd.hasOption("local_mode")) {
-
-				String local_value = cmd.getOptionValue("local_mode").trim()
-						.toLowerCase();
-
-				if (local_value.equals("true"))
-					local_mode = true;
-
-				else if (local_value.equals("false"))
-					local_mode = false;
-				else {
-					System.out
-							.println("[Metron] ERROR: Invalid value for local mode");
-					System.out
-							.println("[Metron] ERROR: Using cli argument -local_mode="
-									+ cmd.getOptionValue("local_mode"));
-					help();
-				}
-			} else {
-				System.out
-						.println("[Metron] ERROR: Invalid value for local mode");
-				help();
-			}
-			if (cmd.hasOption("generator_spout")) {
-
-				String local_value = cmd.getOptionValue("generator_spout").trim()
-						.toLowerCase();
-
-				if (local_value.equals("true"))
-					generator_spout = true;
-
-				else if (local_value.equals("false"))
-					generator_spout = false;
-				else {
-					System.out
-							.println("[Metron] ERROR: Invalid value for local generator_spout");
-					System.out
-							.println("[Metron] ERROR: Using cli argument -generator_spout="
-									+ cmd.getOptionValue("generator_spout"));
-					help();
-				}
-			} else {
-				System.out
-						.println("[Metron] ERROR: Invalid value for generator_spout");
-				help();
-			}
-			if (cmd.hasOption("config_path")) {
-
-				path = cmd.getOptionValue("config_path").trim();
-
-				File file = new File(path);
-
-				if (!file.isDirectory() || !file.exists()) {
-					System.out
-							.println("[Metron] ERROR: Invalid settings directory name given");
-					System.out
-							.println("[Metron] ERROR: Using cli argument -config_path="
-									+ cmd.getOptionValue("config_path"));
-					help();
-				}
-			}
-
-			if (cmd.hasOption("debug")) {
-				String debug_value = cmd.getOptionValue("debug");
-
-				if (debug_value.equals("true"))
-					debug = true;
-				else if (debug_value.equals("false"))
-					debug = false;
-				else {
-					System.out
-							.println("[Metron] ERROR: Invalid value for debug_value");
-					System.out
-							.println("[Metron] ERROR: Using cli argument -debug_value="
-									+ cmd.getOptionValue("debug_value"));
-					help();
-				}
-			}
-
-		} catch (ParseException e) {
-			System.out
-					.println("[Metron] ERROR: Failed to parse command line arguments");
-			help();
-		}
-	}
-
-	private void help() {
-		// This prints out some help
-		HelpFormatter formater = new HelpFormatter();
-
-		formater.printHelp("Topology Options:", options);
-
-		System.out
-				.println("[Metron] Example usage: \n storm jar Metron-Topologies-0.3BETA-SNAPSHOT.jar com.apache.metron.topology.Bro -local_mode true -config_path Metron_Configs/ -generator_spout true");
-
-		System.exit(0);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/ErrorGenerator.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/ErrorGenerator.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/ErrorGenerator.java
deleted file mode 100644
index 2201c09..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/ErrorGenerator.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.apache.metron.helpers.topology;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.json.simple.JSONObject;
-
-public class ErrorGenerator {
-
-	@SuppressWarnings("unchecked")
-	public static JSONObject generateErrorMessage(String message, Exception e)
-	{
-		JSONObject error_message = new JSONObject();
-		
-		/*
-		 * Save full stack trace in object.
-		 */
-		String stackTrace = ExceptionUtils.getStackTrace(e);
-		
-		String exception = e.toString();
-		
-		error_message.put("time", System.currentTimeMillis());
-		try {
-			error_message.put("hostname", InetAddress.getLocalHost().getHostName());
-		} catch (UnknownHostException ex) {
-			// TODO Auto-generated catch block
-			ex.printStackTrace();
-		}
-		
-		error_message.put("message", message);
-		error_message.put("exception", exception);
-		error_message.put("stack", stackTrace);
-		
-		return error_message;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/SettingsLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/SettingsLoader.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/SettingsLoader.java
deleted file mode 100644
index 971bf8b..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/helpers/topology/SettingsLoader.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package com.apache.metron.helpers.topology;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-
-public class SettingsLoader {
-
-	@SuppressWarnings("unchecked")
-	public static JSONObject loadEnvironmentIdnetifier(String config_path)
-			throws ConfigurationException {
-		Configuration config = new PropertiesConfiguration(config_path);
-
-		String customer = config.getString("customer.id", "unknown");
-		String datacenter = config.getString("datacenter.id", "unknown");
-		String instance = config.getString("instance.id", "unknown");
-
-		JSONObject identifier = new JSONObject();
-		identifier.put("customer", customer);
-		identifier.put("datacenter", datacenter);
-		identifier.put("instance", instance);
-
-		return identifier;
-	}
-
-	@SuppressWarnings("unchecked")
-	public static JSONObject loadTopologyIdnetifier(String config_path)
-			throws ConfigurationException {
-		Configuration config = new PropertiesConfiguration(config_path);
-
-		String topology = config.getString("topology.id", "unknown");
-		String instance = config.getString("instance.id", "unknown");
-
-		JSONObject identifier = new JSONObject();
-		identifier.put("topology", topology);
-		identifier.put("topology_instance", instance);
-
-		return identifier;
-	}
-	
-
-	public static String generateTopologyName(JSONObject env, JSONObject topo) {
-
-		return (env.get("customer") + "_" + env.get("datacenter") + "_"
-				+ env.get("instance") + "_" + topo.get("topology") + "_" + topo.get("topology_instance"));
-	}
-	
-	@SuppressWarnings("unchecked")
-	public static JSONObject generateAlertsIdentifier(JSONObject env, JSONObject topo)
-	{
-		JSONObject identifier = new JSONObject();
-		identifier.put("environment", env);
-		identifier.put("topology", topo);
-		
-		return identifier;
-	}
-
-	public static Map<String, JSONObject> loadRegexAlerts(String config_path)
-			throws ConfigurationException, ParseException {
-		XMLConfiguration alert_rules = new XMLConfiguration();
-		alert_rules.setDelimiterParsingDisabled(true);
-		alert_rules.load(config_path);
-
-		//int number_of_rules = alert_rules.getList("rule.pattern").size();
-
-		String[] patterns = alert_rules.getStringArray("rule.pattern");
-		String[] alerts = alert_rules.getStringArray("rule.alert");
-
-		JSONParser pr = new JSONParser();
-		Map<String, JSONObject> rules = new HashMap<String, JSONObject>();
-
-		for (int i = 0; i < patterns.length; i++)
-			rules.put(patterns[i], (JSONObject) pr.parse(alerts[i]));
-
-		return rules;
-	}
-
-	public static Map<String, JSONObject> loadKnownHosts(String config_path)
-			throws ConfigurationException, ParseException {
-		Configuration hosts = new PropertiesConfiguration(config_path);
-
-		Iterator<String> keys = hosts.getKeys();
-		Map<String, JSONObject> known_hosts = new HashMap<String, JSONObject>();
-		JSONParser parser = new JSONParser();
-
-		while (keys.hasNext()) {
-			String key = keys.next().trim();
-			JSONArray value = (JSONArray) parser.parse(hosts.getProperty(key)
-					.toString());
-			known_hosts.put(key, (JSONObject) value.get(0));
-		}
-
-		return known_hosts;
-	}
-
-	public static void printConfigOptions(PropertiesConfiguration config, String path_fragment)
-	{
-		Iterator<String> itr = config.getKeys();
-		
-		while(itr.hasNext())
-		{
-			String key = itr.next();
-			
-			if(key.contains(path_fragment))
-			{
-				
-				System.out.println("[Metron] Key: " + key + " -> " + config.getString(key));
-			}
-		}
-
-	}
-	
-	public static void printOptionalSettings(Map<String, String> settings)
-	{
-		for(String setting: settings.keySet())
-		{
-			System.out.println("[Metron] Optional Setting: " + setting + " -> " +settings.get(setting));
-		}
-
-	}
-	
-	public static Map<String, String> getConfigOptions(PropertiesConfiguration config, String path_fragment)
-	{
-		Iterator<String> itr = config.getKeys();
-		Map<String, String> settings = new HashMap<String, String>();
-		
-		while(itr.hasNext())
-		{
-			String key = itr.next();
-			
-			if(key.contains(path_fragment))
-			{
-				String tmp_key = key.replace(path_fragment, "");
-				settings.put(tmp_key, config.getString(key));
-			}
-		}
-
-		return settings;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Common/src/main/java/com/apache/metron/index/interfaces/IndexAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/index/interfaces/IndexAdapter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/index/interfaces/IndexAdapter.java
deleted file mode 100644
index 4144ccf..0000000
--- a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/index/interfaces/IndexAdapter.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.apache.metron.index.interfaces;
-
-import java.util.Map;
-
-import org.json.simple.JSONObject;
-
-public interface IndexAdapter {
-
-	boolean initializeConnection(String ip, int port, String cluster_name,
-			String index_name, String document_name, int bulk, String date_format) throws Exception;
-
-	int bulkIndex(JSONObject raw_message);
-
-	void setOptionalSettings(Map<String, String> settings);
-}


Mime
View raw message