metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sirs...@apache.org
Subject [41/85] [partial] incubator-metron git commit: Rename all OpenSOC files to Metron
Date Thu, 14 Jan 2016 17:03:36 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/pom.xml.versionsBackup
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/pom.xml.versionsBackup b/metron-streaming/Metron-EnrichmentAdapters/pom.xml.versionsBackup
new file mode 100644
index 0000000..6c54dfc
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/pom.xml.versionsBackup
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+	<artifactId>OpenSOC-EnrichmentAdapters</artifactId>
+
+	<properties>
+		<opensoc.common.version>0.0.1-SNAPSHOT</opensoc.common.version>
+		<mysql.version>5.1.31</mysql.version>
+		<slf4j.version>1.7.7</slf4j.version>
+		<hbase.client.version>0.96.1-hadoop2</hbase.client.version>
+		<storm.hdfs.version>0.1.2</storm.hdfs.version>
+		<hadoop.version>2.2.0</hadoop.version>
+		<storm.version>0.9.1-incubating</storm.version>
+		<guava.version>17.0</guava.version>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${opensoc.common.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>${slf4j.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>mysql</groupId>
+			<artifactId>mysql-connector-java</artifactId>
+			<version>${mysql.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-client</artifactId>
+			<version>${hbase.client.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<version>${hadoop.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${storm.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<version>${hadoop.version}</version>
+		</dependency>
+  		<dependency>
+  			<groupId>junit</groupId>
+  			<artifactId>junit</artifactId>
+  			<version>3.8.2</version>
+  		</dependency>	
+  		<dependency>
+  	    <groupId>commons-validator</groupId>
+    <artifactId>commons-validator</artifactId>
+    <version>1.4.0</version>
+    </dependency>	
+  		
+  			
+	</dependencies>
+   <reporting>
+    <plugins>
+     <plugin>
+     <groupId>org.apache.maven.plugins</groupId>
+     <artifactId>maven-surefire-plugin</artifactId>
+     	<configuration>
+	   		<systemProperties>
+	   		    <property>
+	   		         <name>mode</name>
+	   		         <value>local</value>
+	   		    </property>
+	   		</systemProperties>
+		</configuration>
+     </plugin>
+	<!-- Normally, dependency report takes time, skip it -->
+      <plugin>
+		<groupId>org.apache.maven.plugins</groupId>
+		<artifactId>maven-project-info-reports-plugin</artifactId>
+		<version>2.7</version>
+	 
+		<configuration>
+	          <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+		</configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>emma-maven-plugin</artifactId>
+        <version>1.0-alpha-3</version>
+      </plugin>    
+      <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-pmd-plugin</artifactId>
+          <configuration>
+            <targetJdk>1.7</targetJdk>
+	  </configuration>
+        </plugin>        
+    </plugins>
+  </reporting>  	
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/readme.md
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/readme.md b/metron-streaming/Metron-EnrichmentAdapters/readme.md
new file mode 100644
index 0000000..7c08218
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/readme.md
@@ -0,0 +1,125 @@
+#OpenSOC-Enrichments
+
+##Module Description
+
+This module enables enrichment of message metafields with additional information from various enrichment sources.  Currently there is only a limited number of enrichments available, but this is an extensible framework that can be extended with additional enrichments.  Enrichments currently available are geo, whois, hosts, and CIF.
+
+##Message Format
+
+Enrichment bolts are designed to go after the parser bolts.  Parser bolts will parse the telemetry, taking it from its native format and producing a standard JSON that would look like so:
+
+```json
+{
+"message": 
+{"ip_src_addr": xxxx, 
+"ip_dst_addr": xxxx, 
+"ip_src_port": xxxx, 
+"ip_dst_port": xxxx, 
+"protocol": xxxx, 
+"additional-field 1": xxx,
+}
+
+}
+```
+
+A single enrichment bolt would enrich the message and produce a JSON enrichment and attach it to the message.  Enrichments are stackable so multiple enrichments can be attached sequentially after a single parser bolt.  Stacked enrichments would produce messages under the "enrichment" tag and attach it to the message like so:
+
+```json
+{
+"message": 
+{"ip_src_addr": xxxx, 
+"ip_dst_addr": xxxx, 
+"ip_src_port": xxxx, 
+"ip_dst_port": xxxx, 
+"protocol": xxxx, 
+"additional-field 1": xxxx,
+},
+"enrichment" : {"geo": xxxx, "whois": xxxx, "hosts": xxxxx, "CIF": "xxxxx"}
+
+}
+```
+
+##Enrichment Sources
+
+Each enrichment has to have an anrichment source which can serve as a lookup table for enriching relevant message fields.  In order to minimize the use of additional platforms and tools we primarily try to rely on HBase as much as possible to store the enrichment information for lookup by key.  In order to use Hbase we have to pre-process the enrichment feeds for bulk-loading into HBase with specific key format optimized for retrieval as well as utilize caches within the enrichment bolts to be able to provide enrichments real-time.  Our wiki contains information on how to setup the environment, pre-process feeds, and plug in the enrichment sources.
+
+##Enrichment Bolt
+
+The enrichment bolt is designed to be extensible to be re-used for all kinds of enrichment processes.  The bolt signature for declaration in a storm topology is as follows:
+
+
+
+```
+GenericEnrichmentBolt geo_enrichment = new GenericEnrichmentBolt()
+.withEnrichmentTag(
+config.getString("bolt.enrichment.geo.enrichment_tag"))
+.withAdapter(geo_adapter)
+.withMaxTimeRetain(
+config.getInt("bolt.enrichment.geo.MAX_TIME_RETAIN_MINUTES"))
+.withMaxCacheSize(
+config.getInt("bolt.enrichment.geo.MAX_CACHE_SIZE_OBJECTS_NUM"))
+.withKeys(geo_keys).withMetricConfiguration(config);
+
+```
+
+EnrichmentTag - Name of the enrichment (geo, whois, hosts, etc)
+Keys - Keys which this enrichment is able to enrich (hosts field for hosts enrichment, source_ip, dest_ip, for geo enrichment, etc)
+MaxTimeToRetain & MaxCacheSize - define the caching policy of the enrichment bolt
+Adapter - which adapter to use with the enrichment bolt instance
+
+###Geo Adapter
+Geo adapter is able to do geo enrichment on hosts and destination IPs.  The open source verison of the geo adapter uses the free Geo feeds from MaxMind.  The format of these feeds does not easily lend itself to a no-sql DB so this adapter is designed to work with mySql.  But it is extensible enough to be made work with a variety of other back ends.
+
+The signature of a geo adapter is as follows;
+
+```
+GeoMysqlAdapter geo_adapter = new GeoMysqlAdapter(
+config.getString("mysql.ip"), config.getInt("mysql.port"),
+config.getString("mysql.username"),
+config.getString("mysql.password"),
+config.getString("bolt.enrichment.geo.adapter.table"));
+
+```
+
+###Hosts Adapter
+The hosts adapter is designed to enrich message format with the static host information that can be read from a standard text file.  This adapter is intended for use with a network crawling script that can identify all customer assets and place them in a text file.  For example, this script would identify all workstations, printers, appliantces, etc.  Then if any of these assets are seen in the telemetry messages flowing through the adapter this enrichment would fire and the relevant known information about a host would be attached.  We are currently working on porting this adapter to work with HBase, but this work is not ready yet.  The known hosts file is located under the /etc/whitelists config directory of OpenSOC.
+
+The signature of the hosts adapter is as follows:
+
+```
+Map<String, JSONObject> known_hosts = SettingsLoader
+.loadKnownHosts(hosts_path);
+
+HostFromPropertiesFileAdapter host_adapter = new HostFromPropertiesFileAdapter(
+known_hosts);
+
+```
+* The source and dest ips refer to the name of the message JSON key where the host information is located
+
+###Whois Adapter
+Whois adapter enriches the host name with additional whois information obtained from our proprietary Cisco feed.  The enricher itself is provided in this open source distribution, but the feed is not.  You have to have your own feed in order to use it.  Alternatively, you can contact us for providing you with this feed, but we would have to charge you a fee (we can't distribute it for free). The implemetation of the whois enrichment we provide works with HBase
+
+The signature of the whois adapter is as follows:
+
+```
+
+EnrichmentAdapter whois_adapter = new WhoisHBaseAdapter(
+config.getString("bolt.enrichment.whois.hbase.table.name"),
+config.getString("kafka.zk.list"),
+config.getString("kafka.zk.port"));
+```
+
+###CIF Adapter
+CIF adapter is designed to take in CIF feeds and cross-reference them against every message processed by Storm.  If there is a hit then the relevant information is attached to the message.  
+
+The signature of the CIF adapter is as follows:
+
+```
+CIFHbaseAdapter = new CIFHbaseAdapter(config
+.getString("kafka.zk.list"), config
+.getString("kafka.zk.port"), config
+.getString("bolt.enrichment.cif.tablename")))
+```
+
+##Stacking Enrichments
+Enrichments can be stacked.  By default each enrichment bolt listens on the "message" stream.  In order to create and stack enrichment bolts create a new bolt and instantiate the appropariate adapter.  You can look at our sample topologies to see how enrichments can be stacked
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
new file mode 100644
index 0000000..7e69864
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/cif/AbstractCIFAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.cif;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.enrichment.interfaces.EnrichmentAdapter;
+
+public abstract class AbstractCIFAdapter implements EnrichmentAdapter,Serializable{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -5040559164824221816L;
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractCIFAdapter.class);
+	
+	abstract public boolean initializeAdapter();
+	abstract public String enrichByIP(String metadata);
+	abstract public String enrichByDomain(String metadata);
+	abstract public String enrichByEmail(String metadata);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
new file mode 100644
index 0000000..bf255ef
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/cif/CIFHbaseAdapter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.cif;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.log4j.Logger;
+
+@SuppressWarnings("unchecked")
+public class CIFHbaseAdapter extends AbstractCIFAdapter {
+
+	private static final long serialVersionUID = 1L;
+	private String _tableName;
+	private HTableInterface table;
+	private String _quorum;
+	private String _port;
+
+	public CIFHbaseAdapter(String quorum, String port, String tableName) {
+		_quorum = quorum;
+		_port = port;
+		_tableName = tableName;
+	}
+
+	/** The LOGGER. */
+	private static final Logger LOGGER = Logger
+			.getLogger(CIFHbaseAdapter.class);
+
+	public JSONObject enrich(String metadata) {
+
+		JSONObject output = new JSONObject();
+		LOGGER.debug("=======Looking Up For:" + metadata);
+		output.putAll(getCIFObject(metadata));
+
+		return output;
+	}
+
+	@SuppressWarnings({ "rawtypes", "deprecation" })
+	protected Map getCIFObject(String key) {
+
+		LOGGER.debug("=======Pinging HBase For:" + key);
+
+		Get get = new Get(key.getBytes());
+		Result rs;
+		Map output = new HashMap();
+
+		try {
+			rs = table.get(get);
+
+			for (KeyValue kv : rs.raw())
+				output.put(new String(kv.getQualifier()), "Y");
+
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		return output;
+	}
+
+	@Override
+	public boolean initializeAdapter() {
+
+		// Initialize HBase Table
+		Configuration conf = null;
+		conf = HBaseConfiguration.create();
+		conf.set("hbase.zookeeper.quorum", _quorum);
+		conf.set("hbase.zookeeper.property.clientPort", _port);
+
+		try {
+			LOGGER.debug("=======Connecting to HBASE===========");
+			LOGGER.debug("=======ZOOKEEPER = "
+					+ conf.get("hbase.zookeeper.quorum"));
+			HConnection connection = HConnectionManager.createConnection(conf);
+			table = connection.getTable(_tableName);
+			return true;
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			LOGGER.debug("=======Unable to Connect to HBASE===========");
+			e.printStackTrace();
+		}
+
+		return false;
+	}
+
+	@Override
+	public String enrichByIP(String metadata) {
+		return null;
+	}
+
+	@Override
+	public String enrichByDomain(String metadata) {
+		return null;
+	}
+
+	@Override
+	public String enrichByEmail(String metadata) {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java
new file mode 100644
index 0000000..44db36c
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/geo/AbstractGeoAdapter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.geo;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.enrichment.common.GenericEnrichmentBolt;
+import com.opensoc.enrichment.interfaces.EnrichmentAdapter;
+
+@SuppressWarnings("serial")
+public abstract class AbstractGeoAdapter implements EnrichmentAdapter,
+		Serializable {
+
+	protected static final Logger _LOG = LoggerFactory
+			.getLogger(GenericEnrichmentBolt.class);
+
+	abstract public JSONObject enrich(String metadata);
+
+	abstract public boolean initializeAdapter();
+
+	/**
+	 * Check if we can reach the IP where geo data is storred
+	 * 
+	 * @param ip
+	 *            - ip of geo database
+	 * @param timeout
+	 *            - timeout for a connection attempt
+	 * @return - True if can connect, False if cannot
+	 * @throws Exception
+	 */
+	public boolean checkIfReachable(String ip, int timeout) throws Exception {
+		boolean reachable = InetAddress.getByName(ip).isReachable(timeout);
+
+		if (!reachable)
+			return false;
+
+		return true;
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java
new file mode 100644
index 0000000..d62632b
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/geo/GeoMysqlAdapter.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.geo;
+
+import java.net.InetAddress;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import org.apache.commons.validator.routines.InetAddressValidator;
+import org.json.simple.JSONObject;
+
+@SuppressWarnings("serial")
+public class GeoMysqlAdapter extends AbstractGeoAdapter {
+
+	private Connection connection = null;
+	private Statement statement = null;
+	private String _ip;
+	private String _username;
+	private String _password;
+	private String _tablename;
+	private InetAddressValidator ipvalidator = new InetAddressValidator();
+
+	public GeoMysqlAdapter(String ip, int port, String username,
+			String password, String tablename) {
+		try {
+			_ip = InetAddress.getByName(ip).getHostAddress();
+
+			boolean reachable = checkIfReachable(ip, 500);
+
+			if (!reachable)
+				throw new Exception("Unable to reach IP " + _ip
+						+ " with username " + _username + " and password "
+						+ _password + " accessing table name " + _tablename);
+
+		} catch (Exception e) {
+			_LOG.error("Environment misconfigured, cannot reach MYSQL server....");
+			e.printStackTrace();
+		}
+
+		_username = username;
+		_password = password;
+		_tablename = tablename;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public JSONObject enrich(String metadata) {
+
+		ResultSet resultSet = null;
+
+		try {
+
+			_LOG.trace("[OpenSOC] Received metadata: " + metadata);
+
+			InetAddress addr = InetAddress.getByName(metadata);
+
+			if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()
+					|| addr.isSiteLocalAddress() || addr.isMulticastAddress()
+					|| !ipvalidator.isValidInet4Address(metadata)) {
+				_LOG.trace("[OpenSOC] Not a remote IP: " + metadata);
+				_LOG.trace("[OpenSOC] Returning enrichment: " + "{}");
+
+				return new JSONObject();
+			}
+
+			_LOG.trace("[OpenSOC] Is a valid remote IP: " + metadata);
+
+			statement = connection.createStatement(
+					ResultSet.TYPE_SCROLL_INSENSITIVE,
+					ResultSet.CONCUR_READ_ONLY);
+			String locid_query = "select IPTOLOCID(\"" + metadata
+					+ "\") as ANS";
+			resultSet = statement.executeQuery(locid_query);
+
+			if (resultSet == null)
+				throw new Exception("Invalid result set for metadata: "
+						+ metadata + ". Query run was: " + locid_query);
+
+			resultSet.last();
+			int size = resultSet.getRow();
+
+			if (size == 0)
+				throw new Exception("No result returned for: " + metadata
+						+ ". Query run was: " + locid_query);
+
+			resultSet.beforeFirst();
+			resultSet.next();
+
+			String locid = null;
+			locid = resultSet.getString("ANS");
+
+			if (locid == null)
+				throw new Exception("Invalid location id for: " + metadata
+						+ ". Query run was: " + locid_query);
+
+			String geo_query = "select * from location where locID = " + locid
+					+ ";";
+			resultSet = statement.executeQuery(geo_query);
+
+			if (resultSet == null)
+				throw new Exception(
+						"Invalid result set for metadata and locid: "
+								+ metadata + ", " + locid + ". Query run was: "
+								+ geo_query);
+
+			resultSet.last();
+			size = resultSet.getRow();
+
+			if (size == 0)
+				throw new Exception(
+						"No result id returned for metadata and locid: "
+								+ metadata + ", " + locid + ". Query run was: "
+								+ geo_query);
+
+			resultSet.beforeFirst();
+			resultSet.next();
+
+			JSONObject jo = new JSONObject();
+			jo.put("locID", resultSet.getString("locID"));
+			jo.put("country", resultSet.getString("country"));
+			jo.put("city", resultSet.getString("city"));
+			jo.put("postalCode", resultSet.getString("postalCode"));
+			jo.put("latitude", resultSet.getString("latitude"));
+			jo.put("longitude", resultSet.getString("longitude"));
+			jo.put("dmaCode", resultSet.getString("dmaCode"));
+			jo.put("locID", resultSet.getString("locID"));
+			
+			jo.put("location_point", jo.get("longitude") + "," + jo.get("latitude"));
+
+			_LOG.debug("Returning enrichment: " + jo);
+
+			return jo;
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			_LOG.error("Enrichment failure: " + e);
+			return new JSONObject();
+		}
+	}
+
+	@Override
+	public boolean initializeAdapter() {
+
+		_LOG.info("[OpenSOC] Initializing MysqlAdapter....");
+
+		try {
+
+			Class.forName("com.mysql.jdbc.Driver");
+			connection = DriverManager.getConnection("jdbc:mysql://" + _ip
+					+ "/" + _tablename + "?user=" + _username + "&password="
+					+ _password);
+
+			connection.setReadOnly(true);
+
+			if (!connection.isValid(0))
+				throw new Exception("Invalid connection string....");
+
+			_LOG.info("[OpenSOC] Set JDBC connection....");
+
+			return true;
+		} catch (Exception e) {
+			e.printStackTrace();
+			_LOG.error("[OpenSOC] JDBC connection failed....");
+
+			return false;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
new file mode 100644
index 0000000..c37e6c9
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/host/AbstractHostAdapter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.host;
+
+import java.io.Serializable;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.enrichment.interfaces.EnrichmentAdapter;
+
+public abstract class AbstractHostAdapter implements EnrichmentAdapter,Serializable{
+
+	/**
+	 * Adapter to attach reputation information to the telemetry message
+	 */
+	private static final long serialVersionUID = 8280523289446309728L;
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractHostAdapter.class);
+	
+	abstract public boolean initializeAdapter();
+	abstract public JSONObject enrich(String metadata);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
new file mode 100644
index 0000000..e6f693a
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.host;
+
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+
+@SuppressWarnings("serial")
+public class HostFromPropertiesFileAdapter extends AbstractHostAdapter {
+	
+	Map<String, JSONObject> _known_hosts;
+	
+	public HostFromPropertiesFileAdapter(Map<String, JSONObject> known_hosts)
+	{
+		_known_hosts = known_hosts;
+	}
+
+	@Override
+	public boolean initializeAdapter() 
+	{
+		
+		if(_known_hosts.size() > 0)
+			return true;
+		else
+			return false;
+	}
+
+	@SuppressWarnings("unchecked")
+    @Override
+	public JSONObject enrich(String metadata) {
+		
+		
+		if(!_known_hosts.containsKey(metadata))
+			return new JSONObject();
+		
+		JSONObject enrichment = new JSONObject();
+		enrichment.put("known_info", (JSONObject) _known_hosts.get(metadata));
+		return enrichment;
+	}
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
new file mode 100644
index 0000000..395ee48
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/threat/AbstractThreatAdapter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.threat;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.enrichment.interfaces.EnrichmentAdapter;
+
+public abstract class AbstractThreatAdapter implements EnrichmentAdapter,Serializable{
+
+	
+	private static final long serialVersionUID = 1524030932856141771L;
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractThreatAdapter.class);
+	
+	abstract public boolean initializeAdapter();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
new file mode 100644
index 0000000..97d02d4
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/threat/ThreatHbaseAdapter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.threat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.log4j.Logger;
+
+@SuppressWarnings("unchecked")
+public class ThreatHbaseAdapter extends AbstractThreatAdapter {
+
+	private static final long serialVersionUID = 1L;
+	private String _tableName;
+	private HTableInterface table;
+	private String _quorum;
+	private String _port;
+
+	public ThreatHbaseAdapter(String quorum, String port, String tableName) {
+		_quorum = quorum;
+		_port = port;
+		_tableName = tableName;
+	}
+
+	/** The LOGGER. */
+	private static final Logger LOGGER = Logger
+			.getLogger(ThreatHbaseAdapter.class);
+
+	public JSONObject enrich(String metadata) {
+
+		JSONObject output = new JSONObject();
+		LOGGER.debug("=======Looking Up For:" + metadata);
+		output.putAll(getThreatObject(metadata));
+
+		return output;
+	}
+
+	@SuppressWarnings({ "rawtypes", "deprecation" })
+	protected Map getThreatObject(String key) {
+
+		LOGGER.debug("=======Pinging HBase For:" + key);
+		
+		Get get = new Get(Bytes.toBytes(key));
+		Result rs;
+		Map output = new HashMap();
+
+		try {
+			rs = table.get(get);
+
+			if (!rs.isEmpty()) {
+				byte[] source_family = Bytes.toBytes("source");
+				JSONParser parser = new JSONParser();
+				
+				Map<byte[], byte[]> sourceFamilyMap = rs.getFamilyMap(source_family);
+				
+				for (Map.Entry<byte[], byte[]> entry  : sourceFamilyMap.entrySet()) {
+					String k = Bytes.toString(entry.getKey());
+					LOGGER.debug("=======Found intel from source: " + k);
+					output.put(k,parser.parse(Bytes.toString(entry.getValue())));
+	            }
+			}
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (ParseException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		return output;
+	}
+
+	@Override
+	public boolean initializeAdapter() {
+
+		// Initialize HBase Table
+		Configuration conf = null;
+		conf = HBaseConfiguration.create();
+		conf.set("hbase.zookeeper.quorum", _quorum);
+		conf.set("hbase.zookeeper.property.clientPort", _port);
+
+		try {
+			LOGGER.debug("=======Connecting to HBASE===========");
+			LOGGER.debug("=======ZOOKEEPER = "
+					+ conf.get("hbase.zookeeper.quorum"));
+			HConnection connection = HConnectionManager.createConnection(conf);
+			table = connection.getTable(_tableName);
+			return true;
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			LOGGER.debug("=======Unable to Connect to HBASE===========");
+			e.printStackTrace();
+		}
+
+		return false;
+	}
+
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java
new file mode 100644
index 0000000..2c01b31
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/whois/AbstractWhoisAdapter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.whois;
+
+import java.io.Serializable;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.opensoc.enrichment.interfaces.EnrichmentAdapter;
+
+public abstract class AbstractWhoisAdapter implements EnrichmentAdapter,Serializable{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 8280523289446309728L;
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractWhoisAdapter.class);
+	
+	abstract public boolean initializeAdapter();
+	abstract public JSONObject enrich(String metadata);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
new file mode 100644
index 0000000..503618a
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.adapters.whois;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.json.simple.JSONObject;
+
+import com.google.common.base.Joiner;
+import com.opensoc.tldextractor.BasicTldExtractor;
+
+public class WhoisHBaseAdapter extends AbstractWhoisAdapter {
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 3371873619030870389L;
+	private HTableInterface table;
+	private String _table_name;
+	private String _quorum;
+	private String _port;
+	private BasicTldExtractor tldex = new BasicTldExtractor();
+
+	public WhoisHBaseAdapter(String table_name, String quorum, String port) {
+		_table_name = table_name;
+		_quorum = quorum;
+		_port = port;
+	}
+
+	public boolean initializeAdapter() {
+		Configuration conf = null;
+		conf = HBaseConfiguration.create();
+		conf.set("hbase.zookeeper.quorum", _quorum);
+		conf.set("hbase.zookeeper.property.clientPort", _port);
+		conf.set("zookeeper.session.timeout", "20");
+		conf.set("hbase.rpc.timeout", "20");
+		conf.set("zookeeper.recovery.retry", "1");
+		conf.set("zookeeper.recovery.retry.intervalmill", "1");
+
+		try {
+
+			LOG.trace("[OpenSOC] Connecting to HBase");
+			LOG.trace("[OpenSOC] ZOOKEEPER = "
+					+ conf.get("hbase.zookeeper.quorum"));
+
+			LOG.trace("[OpenSOC] CONNECTING TO HBASE WITH: " + conf);
+
+			HConnection connection = HConnectionManager.createConnection(conf);
+
+			LOG.trace("[OpenSOC] CONNECTED TO HBASE");
+
+			table = connection.getTable(_table_name);
+
+			LOG.trace("--------CONNECTED TO TABLE: " + table);
+
+			JSONObject tester = enrich("cisco.com");
+
+			if (tester.keySet().size() == 0)
+				throw new IOException(
+						"Either HBASE is misconfigured or whois table is missing");
+
+			return true;
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+
+		return false;
+
+	}
+
+	@SuppressWarnings({ "unchecked", "deprecation" })
+	public JSONObject enrich(String metadataIn) {
+		
+		String metadata = tldex.extract2LD(metadataIn);
+
+		LOG.trace("[OpenSOC] Pinging HBase For:" + metadata);
+
+        
+		JSONObject output = new JSONObject();
+		JSONObject payload = new JSONObject();
+
+		Get get = new Get(metadata.getBytes());
+		Result rs;
+
+		try {
+			rs = table.get(get);
+
+			for (KeyValue kv : rs.raw())
+				payload.put(metadata, new String(kv.getValue()));
+
+			output.put("whois", payload);
+
+		} catch (IOException e) {
+			payload.put(metadata, "{}");
+			output.put("whois", payload);
+			e.printStackTrace();
+		}
+
+		return output;
+
+	}
+	
+//	private String format(String input) {
+//		String output = input;
+//		String[] tokens = input.split("\\.");
+//		if(tokens.length > 2) {
+//			output = Joiner.on(".").join(Arrays.copyOfRange(tokens, 1, tokens.length));;
+//		}
+//		return output;
+//	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/common/AbstractEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/common/AbstractEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/common/AbstractEnrichmentBolt.java
new file mode 100644
index 0000000..be1ef96
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/common/AbstractEnrichmentBolt.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.enrichment.common;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+
+import com.codahale.metrics.Counter;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.opensoc.enrichment.interfaces.EnrichmentAdapter;
+import com.opensoc.metrics.MetricReporter;
+
+@SuppressWarnings("rawtypes")
+public abstract class AbstractEnrichmentBolt extends BaseRichBolt {
+	/**
+	 * Abstract enrichment bolt
+	 */
+	private static final long serialVersionUID = -6710596708304282838L;
+
+	protected static final Logger LOG = LoggerFactory
+			.getLogger(AbstractEnrichmentBolt.class);
+
+	protected OutputCollector _collector;
+	protected String _OutputFieldName;
+
+	protected String _enrichment_tag;
+	protected Long _MAX_CACHE_SIZE_OBJECTS_NUM;
+	protected Long _MAX_TIME_RETAIN_MINUTES;
+
+	// JSON Keys to be enriched
+	protected List<String> _jsonKeys;
+	protected EnrichmentAdapter _adapter;
+	protected MetricReporter _reporter;
+
+	protected transient CacheLoader<String, JSONObject> loader;
+	protected transient LoadingCache<String, JSONObject> cache;
+
+	protected Counter ackCounter, emitCounter, failCounter;
+
+	protected void registerCounters() {
+
+		String ackString = _adapter.getClass().getSimpleName() + ".ack";
+
+		String emitString = _adapter.getClass().getSimpleName() + ".emit";
+
+		String failString = _adapter.getClass().getSimpleName() + ".fail";
+
+		ackCounter = _reporter.registerCounter(ackString);
+		emitCounter = _reporter.registerCounter(emitString);
+		failCounter = _reporter.registerCounter(failString);
+
+	}
+
+	public final void prepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) {
+		_collector = collector;
+
+		if (this._OutputFieldName == null)
+			throw new IllegalStateException("OutputFieldName must be specified");
+		if (this._enrichment_tag == null)
+			throw new IllegalStateException("enrichment_tag must be specified");
+		if (this._MAX_CACHE_SIZE_OBJECTS_NUM == null)
+			throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified");
+		if (this._MAX_TIME_RETAIN_MINUTES == null)
+			throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified");
+		if (this._adapter == null)
+			throw new IllegalStateException("Adapter must be specified");
+		if (this._jsonKeys == null)
+			throw new IllegalStateException(
+					"JSON Keys to be enriched, must be specified");
+
+		loader = new CacheLoader<String, JSONObject>() {
+			public JSONObject load(String key) throws Exception {
+				return _adapter.enrich(key);
+			}
+		};
+
+		cache = CacheBuilder.newBuilder().maximumSize(_MAX_CACHE_SIZE_OBJECTS_NUM)
+				.expireAfterWrite(_MAX_TIME_RETAIN_MINUTES, TimeUnit.MINUTES)
+				.build(loader);
+
+		boolean success = _adapter.initializeAdapter();
+
+		if (!success) {
+			LOG.error("[OpenSOC] EnrichmentBolt could not initialize adapter");
+			throw new IllegalStateException("Could not initialize adapter...");
+		}
+
+		try {
+			doPrepare(conf, topologyContext, collector);
+		} catch (IOException e) {
+			LOG.error("[OpenSOC] Counld not initialize...");
+			e.printStackTrace();
+		}
+
+	}
+
+	abstract void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/common/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/common/GenericEnrichmentBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/common/GenericEnrichmentBolt.java
new file mode 100644
index 0000000..37c151f
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/com/apache/metron/enrichment/common/GenericEnrichmentBolt.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.enrichment.common;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import com.opensoc.enrichment.interfaces.EnrichmentAdapter;
+import com.opensoc.helpers.topology.ErrorGenerator;
+import com.opensoc.json.serialization.JSONEncoderHelper;
+import com.opensoc.metrics.MetricReporter;
+
+/**
+ * Uses an adapter to enrich telemetry messages with additional metadata
+ * entries. For a list of available enrichment adapters see
+ * com.opensoc.enrichment.adapters.
+ * <p>
+ * At the moment of release the following enrichment adapters are available:
+ * <p>
+ * <ul>
+ * 
+ * <li>geo = attaches geo coordinates to IPs
+ * <li>whois = attaches whois information to domains
+ * <li>host = attaches reputation information to known hosts
+ * <li>CIF = attaches information from threat intelligence feeds
+ * <ul>
+ * <p>
+ * <p>
+ * Enrichments are optional
+ **/
+
+@SuppressWarnings({ "rawtypes", "serial" })
+public class GenericEnrichmentBolt extends AbstractEnrichmentBolt {
+
+	private static final Logger LOG = LoggerFactory
+			.getLogger(GenericEnrichmentBolt.class);
+	private JSONObject metricConfiguration;
+
+	/**
+	 * @param adapter
+	 *            Adapter for doing the enrichment
+	 * @return Instance of this class
+	 */
+
+	public GenericEnrichmentBolt withAdapter(EnrichmentAdapter adapter) {
+		_adapter = adapter;
+		return this;
+	}
+
+	/**
+	 * @param OutputFieldName
+	 *            Fieldname of the output tuple for this bolt
+	 * @return Instance of this class
+	 */
+
+	public GenericEnrichmentBolt withOutputFieldName(String OutputFieldName) {
+		_OutputFieldName = OutputFieldName;
+		return this;
+	}
+
+	/**
+	 * @param EnrichmentTag
+	 *            Defines what tag the enrichment will be tagged with in the
+	 *            telemetry message
+	 * @return Instance of this class
+	 */
+
+	public GenericEnrichmentBolt withEnrichmentTag(String EnrichmentTag) {
+		_enrichment_tag = EnrichmentTag;
+		return this;
+	}
+
+	/**
+	 * @param MAX_CACHE_SIZE_OBJECTS_NUM
+	 *            Maximum size of cache before flushing
+	 * @return Instance of this class
+	 */
+
+	public GenericEnrichmentBolt withMaxCacheSize(long MAX_CACHE_SIZE_OBJECTS_NUM) {
+		_MAX_CACHE_SIZE_OBJECTS_NUM = MAX_CACHE_SIZE_OBJECTS_NUM;
+		return this;
+	}
+
+	/**
+	 * @param MAX_TIME_RETAIN_MINUTES
+	 *            Maximum time to retain cached entry before expiring
+	 * @return Instance of this class
+	 */
+
+	public GenericEnrichmentBolt withMaxTimeRetain(long MAX_TIME_RETAIN_MINUTES) {
+		_MAX_TIME_RETAIN_MINUTES = MAX_TIME_RETAIN_MINUTES;
+		return this;
+	}
+
+	/**
+	 * @param jsonKeys
+	 *            Keys in the telemetry message that are to be enriched by this
+	 *            bolt
+	 * @return Instance of this class
+	 */
+
+	public GenericEnrichmentBolt withKeys(List<String> jsonKeys) {
+		_jsonKeys = jsonKeys;
+		return this;
+	}
+
+	/**
+	 * @param config
+	 *            A class for generating custom metrics into graphite
+	 * @return Instance of this class
+	 */
+
+	public GenericEnrichmentBolt withMetricConfiguration(Configuration config) {
+		this.metricConfiguration = JSONEncoderHelper.getJSON(config
+				.subset("com.opensoc.metrics"));
+		return this;
+	}
+
+	@SuppressWarnings("unchecked")
+	public void execute(Tuple tuple) {
+
+		LOG.trace("[OpenSOC] Starting enrichment");
+
+		JSONObject in_json = null;
+		String key = null;
+		
+		try {
+
+			key = tuple.getStringByField("key");
+			in_json = (JSONObject) tuple.getValueByField("message");
+
+			if (in_json == null || in_json.isEmpty())
+				throw new Exception("Could not parse binary stream to JSON");
+			
+			if(key == null)
+				throw new Exception("Key is not valid");
+
+			LOG.trace("[OpenSOC] Received tuple: " + in_json);
+
+			JSONObject message = (JSONObject) in_json.get("message");
+
+			if (message == null || message.isEmpty())
+				throw new Exception("Could not extract message from JSON: "
+						+ in_json);
+
+			LOG.trace("[OpenSOC] Extracted message: " + message);
+
+			for (String jsonkey : _jsonKeys) {
+				LOG.trace("[OpenSOC] Processing:" + jsonkey + " within:"
+						+ message);
+
+				String jsonvalue = (String) message.get(jsonkey);
+				LOG.trace("[OpenSOC] Processing: " + jsonkey + " -> "
+						+ jsonvalue);
+
+				if (null == jsonvalue) {
+					LOG.trace("[OpenSOC] Key " + jsonkey
+							+ "not present in message " + message);
+					continue;
+				}
+				
+				// If the field is empty, no need to enrich
+				if ( jsonvalue.length() == 0) {
+					continue;
+				}
+
+				JSONObject enrichment = cache.getUnchecked(jsonvalue);
+				LOG.trace("[OpenSOC] Enriched: " + jsonkey + " -> "
+						+ enrichment);
+
+				if (enrichment == null)
+					throw new Exception("[OpenSOC] Could not enrich string: "
+							+ jsonvalue);
+
+				if (!in_json.containsKey("enrichment")) {
+					in_json.put("enrichment", new JSONObject());
+					LOG.trace("[OpenSOC] Starting a string of enrichments");
+				}
+
+				JSONObject enr1 = (JSONObject) in_json.get("enrichment");
+
+				if (enr1 == null)
+					throw new Exception("Internal enrichment is empty");
+
+				if (!enr1.containsKey(_enrichment_tag)) {
+					enr1.put(_enrichment_tag, new JSONObject());
+					LOG.trace("[OpenSOC] Starting a new enrichment");
+				}
+
+				LOG.trace("[OpenSOC] ENR1 is: " + enr1);
+
+				JSONObject enr2 = (JSONObject) enr1.get(_enrichment_tag);
+				enr2.put(jsonkey, enrichment);
+
+				LOG.trace("[OpenSOC] ENR2 is: " + enr2);
+
+				enr1.put(_enrichment_tag, enr2);
+				in_json.put("enrichment", enr1);
+			}
+
+			LOG.debug("[OpenSOC] Generated combined enrichment: " + in_json);
+
+			_collector.emit("message", new Values(key, in_json));
+			_collector.ack(tuple);
+
+			if (_reporter != null) {
+				emitCounter.inc();
+				ackCounter.inc();
+			}
+		} catch (Exception e) {
+			
+			LOG.error("[OpenSOC] Unable to enrich message: " + in_json);
+			_collector.fail(tuple);
+
+			if (_reporter != null) {
+				failCounter.inc();
+			}
+			
+			JSONObject error = ErrorGenerator.generateErrorMessage("Enrichment problem: " + in_json, e);
+			_collector.emit("error", new Values(error));
+		}
+		
+		
+
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declearer) {
+		declearer.declareStream("message", new Fields("key", "message"));
+		declearer.declareStream("error", new Fields("message"));
+	}
+
+	@Override
+	void doPrepare(Map conf, TopologyContext topologyContext,
+			OutputCollector collector) throws IOException {
+		LOG.info("[OpenSOC] Preparing Enrichment Bolt...");
+
+		_collector = collector;
+
+		try {
+			_reporter = new MetricReporter();
+			_reporter.initialize(metricConfiguration,
+					GenericEnrichmentBolt.class);
+			this.registerCounters();
+		} catch (Exception e) {
+			LOG.info("[OpenSOC] Unable to initialize metrics reporting");
+		}
+
+		LOG.info("[OpenSOC] Enrichment bolt initialized...");
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml b/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml
new file mode 100644
index 0000000..8d812a9
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/resources/hbase-site.xml
@@ -0,0 +1,131 @@
+<!--Tue Apr  1 18:16:39 2014-->
+  <configuration>
+    <property>
+    <name>hbase.tmp.dir</name>
+    <value>/disk/h/hbase</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.chunkpool.maxsize</name>
+    <value>0.5</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.codecs</name>
+    <value>lzo,gz,snappy</value>
+  </property>
+    <property>
+    <name>hbase.hstore.flush.retries.number</name>
+    <value>120</value>
+  </property>
+    <property>
+    <name>hbase.client.keyvalue.maxsize</name>
+    <value>10485760</value>
+  </property>
+    <property>
+    <name>hbase.rootdir</name>
+    <value>hdfs://nn1:8020/apps/hbase/data</value>
+  </property>
+    <property>
+    <name>hbase.defaults.for.version.skip</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.client.scanner.caching</name>
+    <value>100</value>
+  </property>
+    <property>
+    <name>hbase.superuser</name>
+    <value>hbase</value>
+  </property>
+    <property>
+    <name>hfile.block.cache.size</name>
+    <value>0.40</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.checksum.verify</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.mslab.enabled</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.max.filesize</name>
+    <value>107374182400</value>
+  </property>
+    <property>
+    <name>hbase.cluster.distributed</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>zookeeper.session.timeout</name>
+    <value>30000</value>
+  </property>
+    <property>
+    <name>zookeeper.znode.parent</name>
+    <value>/hbase-unsecure</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.global.memstore.lowerLimit</name>
+    <value>0.38</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.handler.count</name>
+    <value>240</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.mslab.chunksize</name>
+    <value>8388608</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>zkpr1,zkpr2,zkpr3</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.useMulti</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.majorcompaction</name>
+    <value>86400000</value>
+  </property>
+    <property>
+    <name>hbase.hstore.blockingStoreFiles</name>
+    <value>200</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.property.clientPort</name>
+    <value>2181</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.flush.size</name>
+    <value>134217728</value>
+  </property>
+    <property>
+    <name>hbase.security.authorization</name>
+    <value>false</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.global.memstore.upperLimit</name>
+    <value>0.4</value>
+  </property>
+    <property>
+    <name>hbase.hstore.compactionThreshold</name>
+    <value>4</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.block.multiplier</name>
+    <value>8</value>
+  </property>
+    <property>
+    <name>hbase.security.authentication</name>
+    <value>simple</value>
+  </property>
+    <property>
+    <name>dfs.client.read.shortcircuit</name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>dfs.domain.socket.path</name>
+    <value>/var/run/hdfs/dn_socket</value>
+  </property>
+  </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
new file mode 100644
index 0000000..82390e9
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/cif/CIFHbaseAdapterTest.java
@@ -0,0 +1,224 @@
+
+ 
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.enrichment.adapters.cif;
+
+import java.net.InetAddress;
+import java.util.Properties;
+
+import com.opensoc.test.AbstractTestContext;
+import com.opensoc.enrichment.adapters.cif.CIFHbaseAdapter;
+
+
+ /**
+ * <ul>
+ * <li>Title: CIFHbaseAdapterTest</li>
+ * <li>Description: Test Class for CIGFHbaseAdapter</li>
+ * <li>Created: Aug 7, 2014</li>
+ * </ul>
+ * @version $Revision: 1.1 $
+ */
+public class CIFHbaseAdapterTest extends AbstractTestContext {
+
+    private static CIFHbaseAdapter cifHbaseAdapter=null;
+
+
+
+    /**
+     * Constructs a new <code>CIFHbaseAdapterTest</code> instance.
+     * @param name
+     */
+
+    public CIFHbaseAdapterTest(String name) {
+        super(name);
+
+    }
+
+
+
+    /**
+     
+     * @throws java.lang.Exception
+     */
+    protected static void setUpBeforeClass() throws Exception {
+    }
+
+    /**
+     
+     * @throws java.lang.Exception
+     */
+    protected static void tearDownAfterClass() throws Exception {
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        
+        Properties prop = super.getTestProperties();
+        assertNotNull(prop);
+        
+        if(skipTests(this.getMode())){
+            return;//skip tests
+        }
+        
+        String[] zk = prop.get("kafka.zk.list").toString().split(",");
+        
+        for(String z : zk)
+        {
+        	InetAddress address = InetAddress.getByName(z);
+            boolean reachable = address.isReachable(100);
+
+            if(!reachable)
+            {
+            	this.setMode("local");
+            	//throw new Exception("Unable to reach zookeeper, skipping CIF adapter test");
+            	break;
+            }
+            
+        }
+        
+        if(skipTests(this.getMode()))
+            return;//skip tests
+            
+        System.out.println("kafka.zk.list ="+(String) prop.get("kafka.zk.list"));
+        System.out.println("kafka.zk.list ="+(String) prop.get("kafka.zk.port"));   
+        System.out.println("kafka.zk.list ="+(String) prop.get("bolt.enrichment.cif.tablename"));   
+        if(skipTests(this.getMode())){
+            System.out.println("Local Mode Skipping tests !! ");
+        }else{
+            cifHbaseAdapter=new CIFHbaseAdapter((String) prop.get("kafka.zk.list"), (String) prop.get("kafka.zk.port"),(String) prop.get("bolt.enrichment.cif.tablename"));
+        }
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#tearDown()
+     */
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        cifHbaseAdapter=null;
+    }
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.cif.CIFHbaseAdapter#initializeAdapter()}.
+     */
+    public void testInitializeAdapter() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{
+            assertTrue(cifHbaseAdapter.initializeAdapter());
+        }
+    }
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.cif.CIFHbaseAdapter#enrichByIP(java.lang.String)}.
+     */
+    public void testEnrichByIP() {
+        if(skipTests(this.getMode())){
+             return;//skip tests
+        }else{      
+           assertNull(cifHbaseAdapter.enrichByIP("11.1.1"));
+       }
+    }
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.cif.CIFHbaseAdapter#enrichByDomain(java.lang.String)}.
+     */
+    public void testEnrichByDomain() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{       
+           assertNull(cifHbaseAdapter.enrichByIP("invaliddomain"));
+       }
+    }
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.cif.CIFHbaseAdapter#enrichByEmail(java.lang.String)}.
+     */
+    public void testEnrichByEmail() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{
+           assertNull(cifHbaseAdapter.enrichByIP("sample@invalid.com"));
+       }
+    }
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.cif.CIFHbaseAdapter#CIFHbaseAdapter(java.lang.String, java.lang.String, java.lang.String)}.
+     */
+    public void testCIFHbaseAdapter() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{
+           assertNotNull(cifHbaseAdapter);
+       }
+    }
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.cif.CIFHbaseAdapter#enrich(java.lang.String)}.
+     */
+    public void testEnrich() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{
+            cifHbaseAdapter.initializeAdapter();
+            assertNotNull(cifHbaseAdapter.enrich("testinvalid.metadata"));
+            
+            assertNotNull(cifHbaseAdapter.enrich("ivalid.ip"));
+            assertNotNull(cifHbaseAdapter.enrich("1.1.1.10"));
+       }
+    }
+    
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.cif.CIFHbaseAdapter#getCIFObject(java.lang.String)}.
+     */
+    public void testGetCIFObject() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{        
+           cifHbaseAdapter.initializeAdapter();
+           assertNotNull(cifHbaseAdapter.getCIFObject("testkey"));
+       }
+    }
+    /**
+     * Returns the cifHbaseAdapter.
+     * @return the cifHbaseAdapter.
+     */
+    
+    public static CIFHbaseAdapter getCifHbaseAdapter() {
+        return CIFHbaseAdapterTest.cifHbaseAdapter;
+    }
+
+    /**
+     * Sets the cifHbaseAdapter.
+     * @param cifHbaseAdapter the cifHbaseAdapter.
+     */
+    
+    public static void setCifHbaseAdapter(CIFHbaseAdapter cifHbaseAdapter) {
+    
+        CIFHbaseAdapterTest.cifHbaseAdapter = cifHbaseAdapter;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
new file mode 100644
index 0000000..ca54500
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/geo/GeoMysqlAdapterTest.java
@@ -0,0 +1,165 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.enrichment.adapters.geo;
+
+import java.net.URL;
+import java.util.Properties;
+
+import org.json.simple.JSONObject;
+
+import com.opensoc.test.AbstractSchemaTest;
+
+ /**
+ * <ul>
+ * <li>Title: GeoMySqlAdapterTest</li>
+ * <li>Description: Tests for GeoMySqlAdapter</li>
+ * <li>Created: Aug 25, 2014</li>
+ * </ul>
+ * @version $Revision: 1.1 $
+ */
+public class GeoMysqlAdapterTest extends AbstractSchemaTest {
+
+    private static GeoMysqlAdapter geoMySqlAdapter=null;
+    private static boolean connected=false;
+
+    /**
+     * Constructs a new <code>GeoMysqlAdapterTest</code> instance.
+     * @param name
+     */
+
+    public GeoMysqlAdapterTest(String name) {
+        super(name);
+    }
+
+    /**
+     
+     * @throws java.lang.Exception
+     */
+    protected static void setUpBeforeClass() throws Exception {
+    }
+
+    /**
+     
+     * @throws java.lang.Exception
+     */
+    protected static void tearDownAfterClass() throws Exception {
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        Properties prop = super.getTestProperties();
+        assertNotNull(prop);   
+        System.out.println("username ="+(String)prop.get("mysql.username"));
+        if(skipTests(this.getMode())){
+            System.out.println(getClass().getName()+" Skipping Tests !!Local Mode");
+            return;//skip tests
+       }else{
+           GeoMysqlAdapterTest.setGeoMySqlAdapter(new GeoMysqlAdapter((String)prop.get("mysql.ip"), (new Integer((String)prop.get("mysql.port"))).intValue(),(String)prop.get("mysql.username"),(String)prop.get("mysql.password"), (String)prop.get("bolt.enrichment.geo.adapter.table")));
+           connected =geoMySqlAdapter.initializeAdapter();
+           assertTrue(connected);
+           URL schema_url = getClass().getClassLoader().getResource(
+               "TestSchemas/GeoMySqlSchema.json");
+           super.setSchemaJsonString(super.readSchemaFromFile(schema_url));  
+       }
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#tearDown()
+     */
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        GeoMysqlAdapterTest.setGeoMySqlAdapter(null);
+    }
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.geo.GeoMysqlAdapter#enrich(java.lang.String)}.
+     */
+    public void testEnrich() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{
+           
+         try {           
+                JSONObject json = geoMySqlAdapter.enrich("72.163.4.161");
+                
+                //assert Geo Response is not null
+                System.out.println("json ="+json);
+                assertNotNull(json);
+        
+                assertEquals(true, super.validateJsonData(super.getSchemaJsonString(), json.toString()));
+                //assert LocId is not null
+                assertNotNull(json.get("locID"));
+                
+                //assert right LocId is being returned
+                assertEquals("4522",json.get("locID"));    
+         } catch (Exception e) {
+            e.printStackTrace();
+            fail("Json validation Failed");
+         }
+       }
+    }
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.geo.GeoMysqlAdapter#initializeAdapter()}.
+     */
+    public void testInitializeAdapter() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{        
+        boolean connected =geoMySqlAdapter.initializeAdapter();
+        assertTrue(connected);
+       }
+    }
+ 
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.geo.GeoMysqlAdapter#GeoMysqlAdapter(java.lang.String, int, java.lang.String, java.lang.String, java.lang.String)}.
+     */
+    public void testGeoMysqlAdapter() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{       
+           assertTrue(connected);
+       }
+    }
+
+    /**
+     * Returns the geoMySqlAdapter.
+     * @return the geoMySqlAdapter.
+     */
+    
+    public static GeoMysqlAdapter getGeoMySqlAdapter() {
+        return geoMySqlAdapter;
+    }
+
+    /**
+     * Sets the geoMySqlAdapter.
+     * @param geoMySqlAdapter the geoMySqlAdapter.
+     */
+    
+    public static void setGeoMySqlAdapter(GeoMysqlAdapter geoMySqlAdapter) {
+    
+        GeoMysqlAdapterTest.geoMySqlAdapter = geoMySqlAdapter;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
new file mode 100644
index 0000000..3057c13
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/java/com/apache/metron/enrichment/adapters/whois/WhoisHBaseAdapterTest.java
@@ -0,0 +1,164 @@
+
+ 
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.enrichment.adapters.whois;
+
+import java.net.InetAddress;
+import java.util.Properties;
+
+import org.json.simple.JSONObject;
+
+import com.opensoc.test.AbstractTestContext;
+
+ /**
+ * <ul>
+ * <li>Title: </li>
+ * <li>Description: </li>
+ * <li>Created: Aug 25, 2014 </li>
+ * </ul>
+ * @version $Revision: 1.1 $
+ */
+public class WhoisHBaseAdapterTest extends AbstractTestContext {
+
+    private static WhoisHBaseAdapter whoisHbaseAdapter=null;   
+    private static boolean connected=false;
+    /**
+     * Constructs a new <code>WhoisHBaseAdapterTest</code> instance.
+     * @param name
+     */
+
+    public WhoisHBaseAdapterTest(String name) {
+        super(name);
+    }
+
+    /**
+     
+     * @throws java.lang.Exception
+     */
+    protected static void setUpBeforeClass() throws Exception {
+    }
+
+    /**
+     
+     * @throws java.lang.Exception
+     */
+    protected static void tearDownAfterClass() throws Exception {
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        Properties prop = super.getTestProperties();
+        assertNotNull(prop);   
+        
+        if(skipTests(this.getMode())){
+            return;//skip tests
+        }
+        
+        String[] zk = prop.get("kafka.zk.list").toString().split(",");
+        
+        for(String z : zk)
+        {
+        	InetAddress address = InetAddress.getByName(z);
+            boolean reachable = address.isReachable(100);
+
+            if(!reachable)
+            {
+            	this.setMode("local");
+            	break;
+            	//throw new Exception("Unable to reach zookeeper, skipping WHois adapter test");
+            }
+            
+            System.out.println("kafka.zk.list ="+(String) prop.get("kafka.zk.list"));
+            System.out.println("kafka.zk.list ="+(String) prop.get("kafka.zk.port"));   
+            System.out.println("kafka.zk.list ="+(String) prop.get("bolt.enrichment.cif.tablename")); 
+            
+        }
+        
+        if(skipTests(this.getMode())){
+            System.out.println("Local Mode Skipping tests !! ");
+        }else{
+            whoisHbaseAdapter=new WhoisHBaseAdapter((String)prop.get("bolt.enrichment.whois.hbase.table.name"),(String)prop.get("kafka.zk.list"),(String)prop.get("kafka.zk.port"));
+            connected =whoisHbaseAdapter.initializeAdapter();
+            assertTrue(connected);
+        }
+       
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#tearDown()
+     */
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.whois.WhoisHBaseAdapter#initializeAdapter()}.
+     */
+    public void testInitializeAdapter() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{
+           assertTrue(connected);
+       }
+    }
+
+    /**
+     * Test method for {@link com.opensoc.enrichment.adapters.whois.WhoisHBaseAdapter#enrich(java.lang.String)}.
+     */
+    public void testEnrich() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{
+            JSONObject json = whoisHbaseAdapter.enrich("72.163.4.161");
+            
+            //assert Geo Response is not null
+            assertNotNull(json);
+            
+            //assert LocId is not null
+            assertNotNull(json.get("cisco.com"));
+       }
+    }
+
+
+    /**
+     * Returns the whoisHbaseAdapter.
+     * @return the whoisHbaseAdapter.
+     */
+    
+    public static WhoisHBaseAdapter getWhoisHbaseAdapter() {
+        return whoisHbaseAdapter;
+    }
+
+    /**
+     * Sets the whoisHbaseAdapter.
+     * @param whoisHbaseAdapter the whoisHbaseAdapter.
+     */
+    
+    public static void setWhoisHbaseAdapter(WhoisHBaseAdapter whoisHbaseAdapter) {
+    
+        WhoisHBaseAdapterTest.whoisHbaseAdapter = whoisHbaseAdapter;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/CIFHbaseAdapterTest.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/CIFHbaseAdapterTest.properties b/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/CIFHbaseAdapterTest.properties
new file mode 100644
index 0000000..8217353
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/CIFHbaseAdapterTest.properties
@@ -0,0 +1,11 @@
+kafka.zk.port=2181
+kafka.zk.list=zkpr1
+kafka.zk=zkpr1:2181
+
+#CIF Enrichment
+bolt.enrichment.cif.tablename=cif_table
+bolt.enrichment.cif.host=tld
+bolt.enrichment.cif.email=email
+bolt.enrichment.cif.MAX_CACHE_SIZE_OBJECTS_NUM=10000
+bolt.enrichment.cif.MAX_TIME_RETAIN_MINUTES=10
+bolt.enrichment.cif.enrichment_tag=cif

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/GeoMysqlAdapterTest.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/GeoMysqlAdapterTest.properties b/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/GeoMysqlAdapterTest.properties
new file mode 100644
index 0000000..3a4e179
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/GeoMysqlAdapterTest.properties
@@ -0,0 +1,11 @@
+mysql.ip=172.30.9.120
+mysql.port=3306
+mysql.username=test
+mysql.password=123123
+
+#GeoEnrichment
+bolt.enrichment.geo.enrichment_tag=geo
+bolt.enrichment.geo.adapter.table=GEO
+bolt.enrichment.geo.MAX_CACHE_SIZE_OBJECTS_NUM=10000
+bolt.enrichment.geo.MAX_TIME_RETAIN_MINUTES=10
+bolt.enrichment.geo.source=ip_src_addr,ip_dst_addr

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/TestSchemas/CIFHbaseSchema.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/TestSchemas/CIFHbaseSchema.json b/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/TestSchemas/CIFHbaseSchema.json
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/TestSchemas/GeoMySqlSchema.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/TestSchemas/GeoMySqlSchema.json b/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/TestSchemas/GeoMySqlSchema.json
new file mode 100644
index 0000000..c4f2a82
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/TestSchemas/GeoMySqlSchema.json
@@ -0,0 +1,42 @@
+{
+"title": "GeoMySql Schema",
+"type": "object",
+"properties": {
+
+         "city"    : {
+					   "type": "string"
+				  },
+		 "country" : {
+						"type": "string"
+					},
+		 "dmaCode" :
+		 			 {
+						"type": "string"
+					},
+	     "geoHash" : 
+	     			{
+						"type": "string"
+					},
+		 "latitude" : 
+		 			{
+						"type": "string"
+				   },
+		 "locID" : 
+		 			{
+					   "type": "string"
+				   },
+		 "location_point" : 
+		 			{
+					   "type": "string"
+				    },
+		 "longitude" : 
+		 			{
+						"type": "string"
+					},
+		 "postalCode" : 
+		 			{
+						"type": "string"
+					}
+   },
+   "required": ["city", "country", "dmaCode","latitude","locID","location_point","postalCode"]
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/TestSchemas/WhoisHbaseSchema.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/TestSchemas/WhoisHbaseSchema.json b/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/TestSchemas/WhoisHbaseSchema.json
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/WhoisHbaseAdapterTest.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/WhoisHbaseAdapterTest.properties b/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/WhoisHbaseAdapterTest.properties
new file mode 100644
index 0000000..4f264ed
--- /dev/null
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/test/resources/WhoisHbaseAdapterTest.properties
@@ -0,0 +1,11 @@
+kafka.zk.port=2181
+kafka.zk.list=zkpr1
+kafka.zk=zkpr1:2181
+
+#WhoisEnrichment
+
+bolt.enrichment.whois.hbase.table.name=whois
+bolt.enrichment.whois.enrichment_tag=whois
+bolt.enrichment.whois.source=tld
+bolt.enrichment.whois.MAX_CACHE_SIZE_OBJECTS_NUM=10000
+bolt.enrichment.whois.MAX_TIME_RETAIN_MINUTES=10



Mime
View raw message