metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [39/89] [abbrv] [partial] incubator-metron git commit: Rename all OpenSOC files to Metron
Date Tue, 26 Jan 2016 14:18:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicFireEyeParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicFireEyeParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicFireEyeParser.java
new file mode 100644
index 0000000..baa2857
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicFireEyeParser.java
@@ -0,0 +1,234 @@
+package com.opensoc.parsing.parsers;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.json.simple.JSONObject;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.Match;
+import oi.thekraken.grok.api.exception.GrokException;
+
+public class BasicFireEyeParser extends AbstractParser implements Serializable {
+
+	private static final long serialVersionUID = 6328907550159134550L;
+	//String tsRegex = "(.*)([a-z][A-Z]+)\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)(.*)$";
+	String tsRegex ="([a-zA-Z]{3})\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)";
+	
+	
+	Pattern tsPattern = Pattern.compile(tsRegex);
+	// private transient static OpenSOCGrok grok;
+	// private transient static InputStream pattern_url;
+
+	public BasicFireEyeParser() throws Exception {
+		// pattern_url = getClass().getClassLoader().getResourceAsStream(
+		// "patterns/fireeye");
+		//
+		// File file = ParserUtils.stream2file(pattern_url);
+		// grok = OpenSOCGrok.create(file.getPath());
+		//
+		// grok.compile("%{FIREEYE_BASE}");
+	}
+
+	@Override
+	public JSONObject parse(byte[] raw_message) {
+		String toParse = "";
+
+		try {
+
+			toParse = new String(raw_message, "UTF-8");
+
+			// String[] mTokens = toParse.split(" ");
+
+			String positveIntPattern = "<[1-9][0-9]*>";
+			Pattern p = Pattern.compile(positveIntPattern);
+			Matcher m = p.matcher(toParse);
+
+			String delimiter = "";
+
+			while (m.find()) {
+				delimiter = m.group();
+
+			}
+
+			if (!StringUtils.isBlank(delimiter)) {
+				String[] tokens = toParse.split(delimiter);
+
+				if (tokens.length > 1)
+					toParse = delimiter + tokens[1];
+
+			}
+
+			JSONObject toReturn = parseMessage(toParse);
+
+			toReturn.put("timestamp", getTimeStamp(toParse,delimiter));
+
+			return toReturn;
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			return null;
+		}
+
+	}
+
+	public static Long convertToEpoch(String m, String d, String ts,
+			boolean adjust_timezone) throws ParseException {
+		d = d.trim();
+
+		if (d.length() <= 2)
+			d = "0" + d;
+
+		Date date = new SimpleDateFormat("MMM", Locale.ENGLISH).parse(m);
+		Calendar cal = Calendar.getInstance();
+		cal.setTime(date);
+		String month = String.valueOf(cal.get(Calendar.MONTH));
+		int year = Calendar.getInstance().get(Calendar.YEAR);
+
+		if (month.length() <= 2)
+			month = "0" + month;
+
+		String coglomerated_ts = year + "-" + month + "-" + d + " " + ts;
+
+		System.out.println(coglomerated_ts);
+
+		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+		if (adjust_timezone)
+			sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+		date = sdf.parse(coglomerated_ts);
+		long timeInMillisSinceEpoch = date.getTime();
+
+		return timeInMillisSinceEpoch;
+	}
+
+	private long getTimeStamp(String toParse,String delimiter) throws ParseException {
+		
+		long ts = 0;
+		String month = null;
+		String day = null;
+		String time = null;
+		Matcher tsMatcher = tsPattern.matcher(toParse);
+		if (tsMatcher.find()) {
+			month = tsMatcher.group(1);
+			day = tsMatcher.group(2);
+			time = tsMatcher.group(3);
+	
+				} else {
+			_LOG.warn("Unable to find timestamp in message: " + toParse);
+			ts = convertToEpoch(month, day, time, true);
+		}
+
+			return ts;
+	
+	}
+
+	private JSONObject parseMessage(String toParse) {
+
+		// System.out.println("Received message: " + toParse);
+
+		// OpenSOCMatch gm = grok.match(toParse);
+		// gm.captures();
+
+		JSONObject toReturn = new JSONObject();
+		//toParse = toParse.replaceAll("  ", " ");
+		String[] mTokens = toParse.split("\\s+");
+	 //mTokens = toParse.split(" ");
+
+		// toReturn.putAll(gm.toMap());
+
+		String id = mTokens[4];
+
+		// We are not parsing the fedata for multi part message as we cannot
+		// determine how we can split the message and how many multi part
+		// messages can there be.
+		// The message itself will be stored in the response.
+
+		String[] tokens = id.split("\\.");
+		if (tokens.length == 2) {
+
+			String[] array = Arrays.copyOfRange(mTokens, 1, mTokens.length - 1);
+			String syslog = Joiner.on(" ").join(array);
+
+			Multimap<String, String> multiMap = formatMain(syslog);
+
+			for (String key : multiMap.keySet()) {
+
+				String value = Joiner.on(",").join(multiMap.get(key));
+				toReturn.put(key, value.trim());
+			}
+
+		}
+
+		toReturn.put("original_string", toParse);
+
+		String ip_src_addr = (String) toReturn.get("dvc");
+		String ip_src_port = (String) toReturn.get("src_port");
+		String ip_dst_addr = (String) toReturn.get("dst_ip");
+		String ip_dst_port = (String) toReturn.get("dst_port");
+
+		if (ip_src_addr != null)
+			toReturn.put("ip_src_addr", ip_src_addr);
+		if (ip_src_port != null)
+			toReturn.put("ip_src_port", ip_src_port);
+		if (ip_dst_addr != null)
+			toReturn.put("ip_dst_addr", ip_dst_addr);
+		if (ip_dst_port != null)
+			toReturn.put("ip_dst_port", ip_dst_port);
+
+		System.out.println(toReturn);
+
+		return toReturn;
+	}
+
+	private Multimap<String, String> formatMain(String in) {
+		Multimap<String, String> multiMap = ArrayListMultimap.create();
+		String input = in.replaceAll("cn3", "dst_port")
+				.replaceAll("cs5", "cncHost").replaceAll("proto", "protocol")
+				.replaceAll("rt=", "timestamp=").replaceAll("cs1", "malware")
+				.replaceAll("dst=", "dst_ip=")
+				.replaceAll("shost", "src_hostname")
+				.replaceAll("dmac", "dst_mac").replaceAll("smac", "src_mac")
+				.replaceAll("spt", "src_port")
+				.replaceAll("\\bsrc\\b", "src_ip");
+		String[] tokens = input.split("\\|");
+
+		if (tokens.length > 0) {
+			String message = tokens[tokens.length - 1];
+
+			String pattern = "([\\w\\d]+)=([^=]*)(?=\\s*\\w+=|\\s*$) ";
+			Pattern p = Pattern.compile(pattern);
+			Matcher m = p.matcher(message);
+
+			while (m.find()) {
+				String[] str = m.group().split("=");
+				multiMap.put(str[0], str[1]);
+
+			}
+
+		}
+		return multiMap;
+	}
+
+	
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicIseParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicIseParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicIseParser.java
new file mode 100644
index 0000000..6990273
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicIseParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.parsing.parsers;
+
+import java.io.StringReader;
+
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.minlog.Log;
+import com.opensoc.ise.parser.ISEParser;
+
+@SuppressWarnings("serial")
+public class BasicIseParser extends AbstractParser {
+
+	protected static final Logger _LOG = LoggerFactory
+			.getLogger(BasicIseParser.class);
+	static final transient ISEParser _parser = new ISEParser("header=");
+
+	@SuppressWarnings("unchecked")
+	public JSONObject parse(byte[] msg) {
+	
+		String raw_message = "";
+
+		try {
+
+			raw_message = new String(msg, "UTF-8");
+			_LOG.debug("Received message: " + raw_message);
+			
+			/*
+			 * Reinitialize Parser. It has the effect of calling the constructor again. 
+			 */
+			_parser.ReInit(new StringReader("header=" + raw_message.trim()));
+
+			JSONObject payload = _parser.parseObject();
+
+			String ip_src_addr = (String) payload.get("Device IP Address");
+			String ip_src_port = (String) payload.get("Device Port");
+			String ip_dst_addr = (String) payload.get("DestinationIPAddress");
+			String ip_dst_port = (String) payload.get("DestinationPort");
+			
+			/*
+			 * Standard Fields for OpenSoc.
+			 */
+
+			if(ip_src_addr != null)
+				payload.put("ip_src_addr", ip_src_addr);
+			if(ip_src_port != null)
+				payload.put("ip_src_port", ip_src_port);
+			if(ip_dst_addr != null)
+				payload.put("ip_dst_addr", ip_dst_addr);
+			if(ip_dst_port != null)
+				payload.put("ip_dst_port", ip_dst_port);
+
+			JSONObject message = new JSONObject();
+			//message.put("message", payload);
+
+			return payload;
+
+		} catch (Exception e) {
+			Log.error(e.toString());
+			e.printStackTrace();
+		}
+		return null;
+	}
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicLancopeParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicLancopeParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicLancopeParser.java
new file mode 100644
index 0000000..73682ea
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicLancopeParser.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.parsing.parsers;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+@SuppressWarnings("serial")
+public class BasicLancopeParser extends AbstractParser {
+	// Sample Lancope Message
+	// {"message":"<131>Jul 17 15:59:01 smc-01 StealthWatch[12365]: 2014-07-17T15:58:30Z 10.40.10.254 0.0.0.0 Minor High Concern Index The host's concern index has either exceeded the CI threshold or rapidly increased. Observed 36.55M points. Policy maximum allows up to 20M points.","@version":"1","@timestamp":"2014-07-17T15:56:05.992Z","type":"syslog","host":"10.122.196.201"}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public JSONObject parse(byte[] msg) {
+
+		JSONObject payload = null;
+
+		try {
+			
+			String raw_message = new String(msg, "UTF-8");
+			
+			payload = (JSONObject) JSONValue.parse(raw_message);
+			
+			
+
+			String message = payload.get("message").toString();
+			String[] parts = message.split(" ");
+			payload.put("ip_src_addr", parts[6]);
+			payload.put("ip_dst_addr", parts[7]);
+
+			String fixed_date = parts[5].replace('T', ' ');
+			fixed_date = fixed_date.replace('Z', ' ').trim();
+
+			SimpleDateFormat formatter = new SimpleDateFormat(
+					"yyyy-MM-dd HH:mm:ss");
+
+			Date date;
+
+			date = formatter.parse(fixed_date);
+			payload.put("timestamp", date.getTime());
+
+			payload.remove("@timestamp");
+			payload.remove("message");
+			payload.put("original_string", message);
+
+			return payload;
+		} catch (Exception e) {
+
+			_LOG.error("Unable to parse message: " + payload.toJSONString());
+			return null;
+		}
+	}
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicLogstashParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicLogstashParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicLogstashParser.java
new file mode 100644
index 0000000..10bfcd2
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicLogstashParser.java
@@ -0,0 +1,65 @@
+package com.opensoc.parsing.parsers;
+
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+public class BasicLogstashParser extends AbstractParser {
+
+	@Override
+	public JSONObject parse(byte[] raw_message) {
+		
+		try {
+			
+			/*
+			 * We need to create a new JSONParser each time because its 
+			 * not serializable and the parser is created on the storm nimbus
+			 * node, then transfered to the workers.
+			 */
+			JSONParser jsonParser = new JSONParser();
+			String rawString = new String(raw_message, "UTF-8");
+			JSONObject rawJson = (JSONObject) jsonParser.parse(rawString);
+			
+			// remove logstash meta fields
+			rawJson.remove("@version");
+			rawJson.remove("type");
+			rawJson.remove("host");
+			rawJson.remove("tags");
+			
+			// rename other keys
+			rawJson = mutate(rawJson, "message", "original_string");
+			rawJson = mutate(rawJson, "src_ip", "ip_src_addr");
+			rawJson = mutate(rawJson, "dst_ip", "ip_dst_addr");
+			rawJson = mutate(rawJson, "src_port", "ip_src_port");
+			rawJson = mutate(rawJson, "dst_port", "ip_dst_port");
+			rawJson = mutate(rawJson, "src_ip", "ip_src_addr");
+			
+			// convert timestamp to milli since epoch
+			rawJson.put("timestamp", LogstashToEpoch((String) rawJson.remove("@timestamp")));
+
+			return rawJson;
+		} catch (Exception e) {
+			e.printStackTrace();
+			return null;
+		}	
+	}
+	
+	private JSONObject mutate(JSONObject json, String oldKey, String newKey) {
+		if (json.containsKey(oldKey)) {
+			json.put(newKey, json.remove(oldKey));
+		}	
+		return json;
+	}
+	
+	private long LogstashToEpoch(String timestamp) throws java.text.ParseException {
+		SimpleDateFormat logstashDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+		return logstashDateFormat.parse(timestamp).getTime();
+		
+	}
+
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
new file mode 100644
index 0000000..315ca3d
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicPaloAltoFirewallParser.java
@@ -0,0 +1,184 @@
+package com.opensoc.parsing.parsers;
+
+
+import org.json.simple.JSONObject;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import com.opensoc.parser.interfaces.MessageParser;
+
+public class BasicPaloAltoFirewallParser extends AbstractParser implements MessageParser{
+
+	private static final long serialVersionUID = 3147090149725343999L;
+	public static final String PaloAltoDomain  = "palo_alto_domain";
+	public static final String ReceiveTime  = "receive_time";
+	public static final String SerialNum  = "serial_num";
+	public static final String Type  = "type";
+	public static final String ThreatContentType  = "threat_content_type";
+	public static final String ConfigVersion  = "config_version";
+	public static final String GenerateTime  = "generate_time";
+	public static final String SourceAddress  = "source_address";
+	public static final String DestinationAddress  = "destination_address";
+	public static final String NATSourceIP  = "nat_source_ip";
+	public static final String NATDestinationIP  = "nat_destination_ip";
+	public static final String Rule  = "rule";
+	public static final String SourceUser  = "source_user";
+	public static final String DestinationUser  = "destination_user";
+	public static final String Application  = "application";
+	public static final String VirtualSystem  = "virtual_system";
+	public static final String SourceZone  = "source_zone";
+	public static final String DestinationZone  = "destination_zone";
+	public static final String InboundInterface  = "inbound_interface";
+	public static final String OutboundInterface  = "outbound_interface";
+	public static final String LogAction  = "log_action";
+	public static final String TimeLogged  = "time_logged";
+	public static final String SessionID  = "session_id";
+	public static final String RepeatCount  = "repeat_count";
+	public static final String SourcePort  = "source_port";
+	public static final String DestinationPort  = "destination_port";
+	public static final String NATSourcePort  = "nats_source_port";
+	public static final String NATDestinationPort  = "nats_destination_port";
+	public static final String Flags  = "flags";
+	public static final String IPProtocol  = "ip_protocol";
+	public static final String Action  = "action";
+	
+	//Threat
+	public static final String URL  = "url";
+	public static final String HOST  = "host";
+	public static final String ThreatContentName  = "threat_content_name";
+	public static final String Category  = "category";
+	public static final String Direction  = "direction";
+	public static final String Seqno  = "seqno";
+	public static final String ActionFlags  = "action_flags";
+	public static final String SourceCountry  = "source_country";
+	public static final String DestinationCountry  = "destination_country";
+	public static final String Cpadding  = "cpadding";
+	public static final String ContentType = "content_type";
+	
+	//Traffic
+	public static final String Bytes = "content_type";
+	public static final String BytesSent = "content_type";
+	public static final String BytesReceived = "content_type";
+	public static final String Packets = "content_type";
+	public static final String StartTime = "content_type";
+	public static final String ElapsedTimeInSec = "content_type";
+	public static final String Padding = "content_type";
+	public static final String PktsSent = "pkts_sent";
+	public static final String PktsReceived = "pkts_received";
+	
+
+	@SuppressWarnings({ "unchecked", "unused" })
+	public JSONObject parse(byte[] msg) {
+
+		JSONObject outputMessage = new JSONObject();
+		String toParse = "";
+
+		try {
+
+			toParse = new String(msg, "UTF-8");
+			_LOG.debug("Received message: " + toParse);
+			
+			
+			parseMessage(toParse,outputMessage);
+			
+				outputMessage.put("timestamp", System.currentTimeMillis());
+				outputMessage.put("ip_src_addr", outputMessage.remove("source_address"));
+				outputMessage.put("ip_src_port", outputMessage.remove("source_port"));
+				outputMessage.put("ip_dst_addr", outputMessage.remove("destination_address"));
+				outputMessage.put("ip_dst_port", outputMessage.remove("destination_port"));
+				outputMessage.put("protocol", outputMessage.remove("ip_protocol"));
+				
+				outputMessage.put("original_string", toParse);
+			return outputMessage;
+		} catch (Exception e) {
+			e.printStackTrace();
+			_LOG.error("Failed to parse: " + toParse);
+			return null;
+		}
+	}
+		
+		@SuppressWarnings("unchecked")
+		private void parseMessage(String message,JSONObject outputMessage) {
+			
+			String[] tokens = message.split(",");
+			
+			String type = tokens[3].trim();
+			
+			//populate common objects
+			outputMessage.put(PaloAltoDomain, tokens[0].trim());
+			outputMessage.put(ReceiveTime, tokens[1].trim());
+			outputMessage.put(SerialNum, tokens[2].trim());
+			outputMessage.put(Type, type);
+			outputMessage.put(ThreatContentType, tokens[4].trim());
+			outputMessage.put(ConfigVersion, tokens[5].trim());
+			outputMessage.put(GenerateTime, tokens[6].trim());
+			outputMessage.put(SourceAddress, tokens[7].trim());
+			outputMessage.put(DestinationAddress, tokens[8].trim());
+			outputMessage.put(NATSourceIP, tokens[9].trim());
+			outputMessage.put(NATDestinationIP, tokens[10].trim());
+			outputMessage.put(Rule, tokens[11].trim());
+			outputMessage.put(SourceUser, tokens[12].trim());
+			outputMessage.put(DestinationUser, tokens[13].trim());
+			outputMessage.put(Application, tokens[14].trim());
+			outputMessage.put(VirtualSystem, tokens[15].trim());
+			outputMessage.put(SourceZone, tokens[16].trim());
+			outputMessage.put(DestinationZone, tokens[17].trim());
+			outputMessage.put(InboundInterface, tokens[18].trim());
+			outputMessage.put(OutboundInterface, tokens[19].trim());
+			outputMessage.put(LogAction, tokens[20].trim());
+			outputMessage.put(TimeLogged, tokens[21].trim());
+			outputMessage.put(SessionID, tokens[22].trim());
+			outputMessage.put(RepeatCount, tokens[23].trim());
+			outputMessage.put(SourcePort, tokens[24].trim());
+			outputMessage.put(DestinationPort, tokens[25].trim());
+			outputMessage.put(NATSourcePort, tokens[26].trim());
+			outputMessage.put(NATDestinationPort, tokens[27].trim());
+			outputMessage.put(Flags, tokens[28].trim());
+			outputMessage.put(IPProtocol, tokens[29].trim());
+			outputMessage.put(Action, tokens[30].trim());
+			
+			
+			if("THREAT".equals(type.toUpperCase())) {
+				outputMessage.put(URL, tokens[31].trim());
+				try {
+					URL url = new URL(tokens[31].trim());
+					outputMessage.put(HOST, url.getHost());
+				} catch (MalformedURLException e) {
+				}
+				outputMessage.put(ThreatContentName, tokens[32].trim());
+				outputMessage.put(Category, tokens[33].trim());
+				outputMessage.put(Direction, tokens[34].trim());
+				outputMessage.put(Seqno, tokens[35].trim());
+				outputMessage.put(ActionFlags, tokens[36].trim());
+				outputMessage.put(SourceCountry, tokens[37].trim());
+				outputMessage.put(DestinationCountry, tokens[38].trim());
+				outputMessage.put(Cpadding, tokens[39].trim());
+				outputMessage.put(ContentType, tokens[40].trim());
+				
+			}
+			else
+			{
+				outputMessage.put(Bytes, tokens[31].trim());
+				outputMessage.put(BytesSent, tokens[32].trim());
+				outputMessage.put(BytesReceived, tokens[33].trim());
+				outputMessage.put(Packets, tokens[34].trim());
+				outputMessage.put(StartTime, tokens[35].trim());
+				outputMessage.put(ElapsedTimeInSec, tokens[36].trim());
+				outputMessage.put(Category, tokens[37].trim());
+				outputMessage.put(Padding, tokens[38].trim());
+				outputMessage.put(Seqno, tokens[39].trim());
+				outputMessage.put(ActionFlags, tokens[40].trim());
+				outputMessage.put(SourceCountry, tokens[41].trim());
+				outputMessage.put(DestinationCountry, tokens[42].trim());
+				outputMessage.put(Cpadding, tokens[43].trim());
+				outputMessage.put(PktsSent, tokens[44].trim());
+				outputMessage.put(PktsReceived, tokens[45].trim());
+			}
+			
+		}
+
+		
+		
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicSourcefireParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicSourcefireParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicSourcefireParser.java
new file mode 100644
index 0000000..be6d8ff
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/BasicSourcefireParser.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.parsing.parsers;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.json.simple.JSONObject;
+
+import com.opensoc.parser.interfaces.MessageParser;
+
+@SuppressWarnings("serial")
+public class BasicSourcefireParser extends AbstractParser implements MessageParser{
+
+	public static final String hostkey = "host";
+	String domain_name_regex = "([^\\.]+)\\.([a-z]{2}|[a-z]{3}|([a-z]{2}\\.[a-z]{2}))$";
+	String sidRegex = "(.*)(\\[[0-9]+:[0-9]+:[0-9]\\])(.*)$";
+	//String sidRegex = "(\\[[0-9]+:[0-9]+:[0-9]\\])(.*)$";
+	Pattern sidPattern = Pattern.compile(sidRegex);	
+	Pattern pattern = Pattern.compile(domain_name_regex);
+
+	@SuppressWarnings({ "unchecked", "unused" })
+	public JSONObject parse(byte[] msg) {
+
+		JSONObject payload = new JSONObject();
+		String toParse = "";
+
+		try {
+
+			toParse = new String(msg, "UTF-8");
+			_LOG.debug("Received message: " + toParse);
+
+			String tmp = toParse.substring(toParse.lastIndexOf("{"));
+			payload.put("key", tmp);
+
+			String protocol = tmp.substring(tmp.indexOf("{") + 1,
+					tmp.indexOf("}")).toLowerCase();
+			String source = tmp.substring(tmp.indexOf("}") + 1,
+					tmp.indexOf("->")).trim();
+			String dest = tmp.substring(tmp.indexOf("->") + 2, tmp.length())
+					.trim();
+
+			payload.put("protocol", protocol);
+
+			String source_ip = "";
+			String dest_ip = "";
+
+			if (source.contains(":")) {
+				String parts[] = source.split(":");
+				payload.put("ip_src_addr", parts[0]);
+				payload.put("ip_src_port", parts[1]);
+				source_ip = parts[0];
+			} else {
+				payload.put("ip_src_addr", source);
+				source_ip = source;
+
+			}
+
+			if (dest.contains(":")) {
+				String parts[] = dest.split(":");
+				payload.put("ip_dst_addr", parts[0]);
+				payload.put("ip_dst_port", parts[1]);
+				dest_ip = parts[0];
+			} else {
+				payload.put("ip_dst_addr", dest);
+				dest_ip = dest;
+			}
+
+			payload.put("timestamp", System.currentTimeMillis());
+			
+			Matcher sidMatcher = sidPattern.matcher(toParse);
+			String originalString = null;
+			String signatureId = "";
+			if (sidMatcher.find()) {
+				signatureId = sidMatcher.group(2);
+				originalString = sidMatcher.group(1) +" "+ sidMatcher.group(2) + " " + sidMatcher.group(3);
+			} else {
+				_LOG.warn("Unable to find SID in message: " + toParse);
+				originalString = toParse;
+			}
+			payload.put("original_string", originalString);
+			payload.put("signature_id", signatureId);
+
+			return payload;
+		} catch (Exception e) {
+			e.printStackTrace();
+			_LOG.error("Failed to parse: " + toParse);
+			return null;
+		}
+	}
+
+	
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokAsaParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokAsaParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokAsaParser.java
new file mode 100644
index 0000000..ff75313
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokAsaParser.java
@@ -0,0 +1,269 @@
+package com.opensoc.parsing.parsers;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.Match;
+import oi.thekraken.grok.api.exception.GrokException;
+
+import org.apache.commons.io.IOUtils;
+import org.json.simple.JSONObject;
+
+public class GrokAsaParser extends AbstractParser implements Serializable {
+
+	private static final long serialVersionUID = 945353287115350798L;
+	private transient  Grok  grok;
+	Map<String, String> patternMap;
+	private transient  Map<String, Grok> grokMap;
+	private transient  InputStream pattern_url;
+
+	public static final String PREFIX = "stream2file";
+	public static final String SUFFIX = ".tmp";
+
+	public static File stream2file(InputStream in) throws IOException {
+		final File tempFile = File.createTempFile(PREFIX, SUFFIX);
+		tempFile.deleteOnExit();
+		try (FileOutputStream out = new FileOutputStream(tempFile)) {
+			IOUtils.copy(in, out);
+		}
+		return tempFile;
+	}
+
+	public GrokAsaParser() throws Exception {
+		// pattern_url = Resources.getResource("patterns/asa");
+
+		pattern_url = getClass().getClassLoader().getResourceAsStream(
+				"patterns/asa");
+
+		File file = stream2file(pattern_url);
+		grok = Grok.create(file.getPath());
+
+		patternMap = getPatternMap();
+		grokMap = getGrokMap();
+
+		grok.compile("%{CISCO_TAGGED_SYSLOG}");
+	}
+
+	public GrokAsaParser(String filepath) throws Exception {
+
+		grok = Grok.create(filepath);
+		// grok.getNamedRegexCollection().put("ciscotag","CISCOFW302013_302014_302015_302016");
+		grok.compile("%{CISCO_TAGGED_SYSLOG}");
+
+	}
+
+	public GrokAsaParser(String filepath, String pattern) throws Exception {
+
+		grok = Grok.create(filepath);
+		grok.compile("%{" + pattern + "}");
+	}
+
+	private Map<String, Object> getMap(String pattern, String text)
+			throws GrokException {
+
+		Grok g = grokMap.get(pattern);
+		if (g != null) {
+			Match gm = g.match(text);
+			gm.captures();
+			return gm.toMap();
+		} else {
+			return new HashMap<String, Object>();
+		}
+
+	}
+
+	private Map<String, Grok> getGrokMap() throws GrokException, IOException {
+		Map<String, Grok> map = new HashMap<String, Grok>();
+
+		for (Map.Entry<String, String> entry : patternMap.entrySet()) {
+			File file = stream2file(pattern_url);
+			Grok grok = Grok.create(file.getPath());
+			grok.compile("%{" + entry.getValue() + "}");
+
+			map.put(entry.getValue(), grok);
+
+		}
+
+		return map;
+	}
+
+	private Map<String, String> getPatternMap() {
+		Map<String, String> map = new HashMap<String, String>();
+
+		map.put("ASA-2-106001", "CISCOFW106001");
+		map.put("ASA-2-106006", "CISCOFW106006_106007_106010");
+		map.put("ASA-2-106007", "CISCOFW106006_106007_106010");
+		map.put("ASA-2-106010", "CISCOFW106006_106007_106010");
+		map.put("ASA-3-106014", "CISCOFW106014");
+		map.put("ASA-6-106015", "CISCOFW106015");
+		map.put("ASA-1-106021", "CISCOFW106021");
+		map.put("ASA-4-106023", "CISCOFW106023");
+		map.put("ASA-5-106100", "CISCOFW106100");
+		map.put("ASA-6-110002", "CISCOFW110002");
+		map.put("ASA-6-302010", "CISCOFW302010");
+		map.put("ASA-6-302013", "CISCOFW302013_302014_302015_302016");
+		map.put("ASA-6-302014", "CISCOFW302013_302014_302015_302016");
+		map.put("ASA-6-302015", "CISCOFW302013_302014_302015_302016");
+		map.put("ASA-6-302016", "CISCOFW302013_302014_302015_302016");
+		map.put("ASA-6-302020", "CISCOFW302020_302021");
+		map.put("ASA-6-302021", "CISCOFW302020_302021");
+		map.put("ASA-6-305011", "CISCOFW305011");
+		map.put("ASA-3-313001", "CISCOFW313001_313004_313008");
+		map.put("ASA-3-313004", "CISCOFW313001_313004_313008");
+		map.put("ASA-3-313008", "CISCOFW313001_313004_313008");
+		map.put("ASA-4-313005", "CISCOFW313005");
+		map.put("ASA-4-402117", "CISCOFW402117");
+		map.put("ASA-4-402119", "CISCOFW402119");
+		map.put("ASA-4-419001", "CISCOFW419001");
+		map.put("ASA-4-419002", "CISCOFW419002");
+		map.put("ASA-4-500004", "CISCOFW500004");
+		map.put("ASA-6-602303", "CISCOFW602303_602304");
+		map.put("ASA-6-602304", "CISCOFW602303_602304");
+		map.put("ASA-7-710001", "CISCOFW710001_710002_710003_710005_710006");
+		map.put("ASA-7-710002", "CISCOFW710001_710002_710003_710005_710006");
+		map.put("ASA-7-710003", "CISCOFW710001_710002_710003_710005_710006");
+		map.put("ASA-7-710005", "CISCOFW710001_710002_710003_710005_710006");
+		map.put("ASA-7-710006", "CISCOFW710001_710002_710003_710005_710006");
+		map.put("ASA-6-713172", "CISCOFW713172");
+		map.put("ASA-4-733100", "CISCOFW733100");
+		map.put("ASA-6-305012", "CISCOFW305012");
+		map.put("ASA-7-609001", "CISCOFW609001");
+		map.put("ASA-7-609002", "CISCOFW609002");
+
+		return map;
+	}
+
+	public static Long convertToEpoch(String m, String d, String ts,
+			boolean adjust_timezone) throws ParseException {
+		d = d.trim();
+
+		if (d.length() <= 2)
+			d = "0" + d;
+
+		Date date = new SimpleDateFormat("MMM", Locale.ENGLISH).parse(m);
+		Calendar cal = Calendar.getInstance();
+		cal.setTime(date);
+		String month = String.valueOf(cal.get(Calendar.MONTH));
+		int year = Calendar.getInstance().get(Calendar.YEAR);
+
+		if (month.length() <= 2)
+			month = "0" + month;
+
+		String coglomerated_ts = year + "-" + month + "-" + d + " " + ts;
+
+		System.out.println(coglomerated_ts);
+
+		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+		if (adjust_timezone)
+			sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+		date = sdf.parse(coglomerated_ts);
+		long timeInMillisSinceEpoch = date.getTime();
+
+		return timeInMillisSinceEpoch;
+	}
+	
+	@Override
+	public void init() {
+		// pattern_url = Resources.getResource("patterns/asa");
+
+				pattern_url = getClass().getClassLoader().getResourceAsStream(
+						"patterns/asa");
+
+				File file = null;
+				try {
+					file = stream2file(pattern_url);
+				} catch (IOException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+				try {
+					grok = Grok.create(file.getPath());
+				} catch (GrokException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+
+				patternMap = getPatternMap();
+				try {
+					grokMap = getGrokMap();
+				} catch (GrokException | IOException e1) {
+					// TODO Auto-generated catch block
+					e1.printStackTrace();
+				}
+
+				try {
+					grok.compile("%{CISCO_TAGGED_SYSLOG}");
+				} catch (GrokException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+	}
+
+	@Override
+	public JSONObject parse(byte[] raw_message) {
+
+		String toParse = "";
+		JSONObject toReturn;
+
+		try {
+
+			toParse = new String(raw_message, "UTF-8");
+
+			System.out.println("Received message: " + toParse);
+
+			Match gm = grok.match(toParse);
+			gm.captures();
+
+			toReturn = new JSONObject();
+
+			toReturn.putAll(gm.toMap());
+
+			String str = toReturn.get("ciscotag").toString();
+			String pattern = patternMap.get(str);
+
+			Map<String, Object> response = getMap(pattern, toParse);
+
+			toReturn.putAll(response);
+
+			//System.out.println("*******I MAPPED: " + toReturn);
+
+			toReturn.put("timestamp", convertToEpoch(toReturn.get("MONTH").toString(), toReturn
+					.get("MONTHDAY").toString(), 
+					toReturn.get("TIME").toString(),
+					true));
+			
+			toReturn.remove("MONTHDAY");
+			toReturn.remove("TIME");
+			toReturn.remove("MINUTE");
+			toReturn.remove("HOUR");
+			toReturn.remove("YEAR");
+			toReturn.remove("SECOND");
+			
+			toReturn.put("ip_src_addr", toReturn.remove("IPORHOST"));
+			toReturn.put("original_string", toParse);
+
+			return toReturn;
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			return null;
+		}
+
+	}
+
+	
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokSourcefireParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokSourcefireParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokSourcefireParser.java
new file mode 100644
index 0000000..a5eabcd
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokSourcefireParser.java
@@ -0,0 +1,76 @@
+package com.opensoc.parsing.parsers;
+
+import java.net.URL;
+
+import oi.thekraken.grok.api.Match;
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.exception.GrokException;
+
+import org.json.simple.JSONObject;
+
+
+public class GrokSourcefireParser extends AbstractParser{
+	
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	Grok grok;
+	
+	public GrokSourcefireParser() throws GrokException
+	{
+		URL pattern_url = getClass().getClassLoader().getResource(
+				"pattarns/sourcefire");
+		grok = Grok.create(pattern_url.getFile());
+		grok.compile("%{SOURCEFIRE}");
+	}
+
+	public GrokSourcefireParser(String filepath) throws GrokException
+	{
+
+		grok = Grok.create(filepath);
+		grok.compile("%{SOURCEFIRE}");
+	}
+	
+	public GrokSourcefireParser(String filepath, String pattern) throws GrokException
+	{
+
+		grok = Grok.create(filepath);
+		grok.compile("%{"+pattern+"}");
+	}
+	
+	@Override
+	public JSONObject parse(byte[] raw_message) {
+		JSONObject payload = new JSONObject();
+		String toParse = "";
+		JSONObject toReturn;
+		
+
+		try {
+
+			toParse = new String(raw_message, "UTF-8");
+			Match gm = grok.match(toParse);
+			gm.captures();
+			
+			toReturn = new JSONObject();
+			
+			toReturn.putAll(gm.toMap());
+			toReturn.remove("SOURCEFIRE");
+			String proto = toReturn.get("protocol").toString();
+			proto = proto.replace("{", "");
+			proto = proto.replace("}", "");
+			toReturn.put("protocol", proto);
+			return toReturn;
+			
+		}
+		catch(Exception e)
+		{
+			e.printStackTrace();
+			return null;
+		}
+		
+	}
+
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokUtils.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokUtils.java
new file mode 100644
index 0000000..de2ba54
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/GrokUtils.java
@@ -0,0 +1,26 @@
+package com.opensoc.parsing.parsers;
+import java.io.Serializable;
+
+import com.google.code.regexp.Pattern;
+
+public class GrokUtils implements Serializable {
+
+	private static final long serialVersionUID = 7465176887422419286L;
+	/**
+	   * Extract Grok patter like %{FOO} to FOO, Also Grok pattern with semantic.
+	   */
+	  public static final Pattern GROK_PATTERN = Pattern.compile(
+	      "%\\{" +
+	      "(?<name>" +
+	        "(?<pattern>[A-z0-9]+)" +
+	          "(?::(?<subname>[A-z0-9_:;\\/\\s\\.]+))?" +
+	          ")" +
+	          "(?:=(?<definition>" +
+	            "(?:" +
+	            "(?:[^{}]+|\\.+)+" +
+	            ")+" +
+	            ")" +
+	      ")?" +
+	      "\\}");
+
+	}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/JSONCleaner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/JSONCleaner.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/JSONCleaner.java
new file mode 100644
index 0000000..8ce04dc
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/JSONCleaner.java
@@ -0,0 +1,78 @@
+package com.opensoc.parsing.parsers;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+/**
+ * @author kiran
+ *
+ */
+public class JSONCleaner implements Serializable {
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+
+
+	/**
+	 * @param jsonString
+	 * @return
+	 * @throws ParseException
+	 * Takes a json String as input and removes any Special Chars (^ a-z A-Z 0-9) in the keys
+	 */
+	@SuppressWarnings({"unchecked","rawtypes"})
+	public JSONObject Clean(String jsonString) throws ParseException
+	{
+		JSONParser parser = new JSONParser();
+		
+		
+		Map json = (Map) parser.parse(jsonString);
+		JSONObject output = new JSONObject();
+	    Iterator iter = json.entrySet().iterator();
+
+		 while(iter.hasNext()){
+		      Map.Entry entry = (Map.Entry)iter.next();
+		      
+		      String key = ((String)entry.getKey()).replaceAll("[^\\._a-zA-Z0-9]+","");
+		      output.put(key, entry.getValue());
+		    }
+
+		return output;
+	}
+	
+	
+	@SuppressWarnings({ "unchecked", "rawtypes", "unused" })
+	public static void main(String args[])
+	{
+		String jsonText = "{\"first_1\": 123, \"second\": [4, 5, 6], \"third\": 789}";
+		JSONCleaner cleaner = new JSONCleaner();
+		try {
+			//cleaner.Clean(jsonText);
+			Map obj=new HashMap();
+			  obj.put("name","foo");
+			  obj.put("num",new Integer(100));
+			  obj.put("balance",new Double(1000.21));
+			  obj.put("is_vip",new Boolean(true));
+			  obj.put("nickname",null);
+			Map obj1 = new HashMap();
+			obj1.put("sourcefile", obj);
+			
+			JSONObject json = new JSONObject(obj1);
+			System.out.println(json);
+			  
+			  
+			  
+			  System.out.print(jsonText);
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronConverter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronConverter.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronConverter.java
new file mode 100644
index 0000000..5d495a6
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronConverter.java
@@ -0,0 +1,183 @@
+package com.opensoc.parsing.parsers;
+
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OpenSOCConverter implements Serializable {
+	
+	private static final long serialVersionUID = 4319897815285922962L;
+	public static Map<String, IConverter<?>> _converters = new HashMap<String, IConverter<?>>();
+
+	static {
+		_converters.put("byte", new ByteConverter());
+		_converters.put("boolean", new BooleanConverter());
+		_converters.put("short", new ShortConverter());
+		_converters.put("int", new IntegerConverter());
+		_converters.put("long", new LongConverter());
+		_converters.put("float", new FloatConverter());
+		_converters.put("double", new DoubleConverter());
+		_converters.put("date", new DateConverter());
+		_converters.put("datetime", new DateConverter());
+		_converters.put("string", new StringConverter());
+
+	}
+	
+	private static IConverter getConverter(String key) throws Exception {
+		IConverter converter = _converters.get(key);
+		if (converter == null) {
+			throw new Exception("Invalid data type :" + key);
+		}
+		return converter;
+	}
+	
+	public static KeyValue convert(String key, Object value) {
+		String[] spec = key.split(";");
+		try {
+			if (spec.length == 1) {
+				return new KeyValue(spec[0], value);
+			} else if (spec.length == 2) {
+				return new KeyValue(spec[0], getConverter(spec[1]).convert(String.valueOf(value)));
+			} else if (spec.length == 3) {
+				return new KeyValue(spec[0], getConverter(spec[1]).convert(String.valueOf(value), spec[2]));
+			} else {
+				return new KeyValue(spec[0], value, "Unsupported spec :" + key);
+			}
+		} catch (Exception e) {
+			return new KeyValue(spec[0], value, e.toString());
+		}
+	}
+}
+
+
+//
+// KeyValue
+//
+
+class KeyValue {
+
+	private String key = null;
+	private Object value = null;
+	private String grokFailure = null;
+	
+	public KeyValue(String key, Object value) {
+		this.key = key;
+		this.value = value;
+	}
+	
+	public KeyValue(String key, Object value, String grokFailure) {
+		this.key = key;
+		this.value = value;
+		this.grokFailure = grokFailure;
+	}
+
+	public boolean hasGrokFailure() {
+		return grokFailure != null;
+	}
+
+	public String getGrokFailure() {
+		return this.grokFailure;
+	}
+
+	public String getKey() {
+		return key;
+	}
+
+	public void setKey(String key) {
+		this.key = key;
+	}
+
+	public Object getValue() {
+		return value;
+	}
+
+	public void setValue(Object value) {
+		this.value = value;
+	}
+}
+
+
+//
+// Converters
+//
+abstract class IConverter<T> {
+	
+	public T convert(String value, String informat) throws Exception {
+		return null;
+	}
+	
+	public abstract T convert(String value) throws Exception;
+}
+
+class ByteConverter extends IConverter<Byte> {
+	@Override
+	public Byte convert(String value) throws Exception {
+		return Byte.parseByte(value);
+	}
+}
+
+class BooleanConverter extends IConverter<Boolean> {
+	@Override
+	public Boolean convert(String value) throws Exception {
+		return Boolean.parseBoolean(value);
+	}
+}
+
+class ShortConverter extends IConverter<Short> {
+	@Override
+	public Short convert(String value) throws Exception {
+		return Short.parseShort(value);
+	}
+}
+
+class IntegerConverter extends IConverter<Integer> {
+	@Override
+	public Integer convert(String value) throws Exception {
+		return Integer.parseInt(value);
+	}
+}
+
+class LongConverter extends IConverter<Long> {
+	@Override
+	public Long convert(String value) throws Exception {
+		return Long.parseLong(value);
+	}
+}
+
+class FloatConverter extends IConverter<Float> {
+	@Override
+	public Float convert(String value) throws Exception {
+		return Float.parseFloat(value);
+	}
+}
+
+class DoubleConverter extends IConverter<Double> {
+	@Override
+	public Double convert(String value) throws Exception {
+		return Double.parseDouble(value);
+	}
+}
+
+class StringConverter extends IConverter<String> {
+	@Override
+	public String convert(String value) throws Exception {
+		return value;
+	}
+}
+
+class DateConverter extends IConverter<Date> {
+	@Override
+	public Date convert(String value) throws Exception {
+		return DateFormat.getInstance().parse(value);
+	}
+	
+	@Override
+	public Date convert(String value, String informat) throws Exception {
+		SimpleDateFormat formatter =  new SimpleDateFormat(informat);
+		return formatter.parse(value);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronGarbage.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronGarbage.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronGarbage.java
new file mode 100644
index 0000000..1f7f3c8
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronGarbage.java
@@ -0,0 +1,130 @@
+package com.opensoc.parsing.parsers;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class OpenSOCGarbage implements Serializable {
+
+	private static final long serialVersionUID = -7158895945268018603L;
+	private List<String> toRemove;
+	  private Map<String, Object> toRename;
+
+	  /**
+	   * Create a new {@code Garbage} object.
+	   */
+	  public OpenSOCGarbage() {
+
+	    toRemove = new ArrayList<String>();
+	    toRename = new TreeMap<String, Object>();
+	    /** this is a default value to remove */
+	    toRemove.add("UNWANTED");
+	  }
+
+	  /**
+	   * Set a new name to be change when exporting the final output.
+	   *
+	   * @param origin : original field name
+	   * @param value : New field name to apply
+	   */
+	  public void addToRename(String origin, Object value) {
+	    if (origin == null || value == null) {
+	      return;
+	    }
+
+	    if (!origin.isEmpty() && !value.toString().isEmpty()) {
+	      toRename.put(origin, value);
+	    }
+	  }
+
+	  /**
+	   * Set a field to be remove when exporting the final output.
+	   *
+	   * @param name of the field to remove
+	   */
+	  public void addToRemove(String name) {
+	    if (name == null) {
+	      return;
+	    }
+
+	    if (!name.isEmpty()) {
+	      toRemove.add(name);
+	    }
+	  }
+
+	  /**
+	   * Set a list of field name to be remove when exporting the final output.
+	   *
+	   * @param lst
+	   */
+	  public void addToRemove(List<String> lst) {
+	    if (lst == null) {
+	      return;
+	    }
+
+	    if (!lst.isEmpty()) {
+	      toRemove.addAll(lst);
+	    }
+	  }
+
+	  /**
+	   * Remove from the map the unwilling items.
+	   *
+	   * @param map to clean
+	   * @return nb of deleted item
+	   */
+	  public int remove(Map<String, Object> map) {
+	    int item = 0;
+
+	    if (map == null) {
+	      return item;
+	    }
+
+	    if (map.isEmpty()) {
+	      return item;
+	    }
+
+	    for (Iterator<Map.Entry<String, Object>> it = map.entrySet().iterator(); it.hasNext();) {
+	      Map.Entry<String, Object> entry = it.next();
+	      for (int i = 0; i < toRemove.size(); i++) {
+	        if (entry.getKey().equals(toRemove.get(i))) {
+	          it.remove();
+	          item++;
+	        }
+	      }
+	    }
+	    return item;
+	  }
+
+	  /**
+	   * Rename the item from the map.
+	   *
+	   * @param map
+	   * @return nb of renamed items
+	   */
+	  public int rename(Map<String, Object> map) {
+	    int item = 0;
+
+	    if (map == null) {
+	      return item;
+	    }
+
+	    if (map.isEmpty() || toRename.isEmpty()) {
+	      return item;
+	    }
+
+	    for (Iterator<Map.Entry<String, Object>> it = toRename.entrySet().iterator(); it.hasNext();) {
+	      Map.Entry<String, Object> entry = it.next();
+	      if (map.containsKey(entry.getKey())) {
+	        Object obj = map.remove(entry.getKey());
+	        map.put(entry.getValue().toString(), obj);
+	        item++;
+	      }
+	    }
+	    return item;
+	  }
+
+	}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronGrok.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronGrok.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronGrok.java
new file mode 100644
index 0000000..0cf998e
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronGrok.java
@@ -0,0 +1,367 @@
+package com.opensoc.parsing.parsers;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.code.regexp.Matcher;
+import com.google.code.regexp.Pattern;
+
+public class OpenSOCGrok implements Serializable {
+
+	private static final long serialVersionUID = 2002441320075020721L;
+	private static final Logger LOG = LoggerFactory.getLogger(OpenSOCGrok.class);
+	  /**
+	   * Named regex of the originalGrokPattern.
+	   */
+	  private String namedRegex;
+	  /**
+	   * Map of the named regex of the originalGrokPattern
+	   * with id = namedregexid and value = namedregex.
+	   */
+	  private Map<String, String> namedRegexCollection;
+	  /**
+	   * Original {@code Grok} pattern (expl: %{IP}).
+	   */
+	  private String originalGrokPattern;
+	  /**
+	   * Pattern of the namedRegex.
+	   */
+	  private Pattern compiledNamedRegex;
+	  /**
+	   * {@code Grok} discovery.
+	   */
+	  private Map<String, String> grokPatternDefinition;
+
+	  /** only use in grok discovery. */
+	  private String savedPattern;
+
+	  /**
+	   * Create Empty {@code Grok}.
+	   */
+	  public static final OpenSOCGrok EMPTY = new OpenSOCGrok();
+
+	  /**
+	   * Create a new <i>empty</i>{@code Grok} object.
+	   */
+	  public OpenSOCGrok() {
+	    originalGrokPattern = StringUtils.EMPTY;
+	    namedRegex = StringUtils.EMPTY;
+	    compiledNamedRegex = null;
+	    grokPatternDefinition = new TreeMap<String, String>();
+	    namedRegexCollection = new TreeMap<String, String>();
+	    savedPattern = StringUtils.EMPTY;
+	  }
+
+	  public String getSaved_pattern() {
+	    return savedPattern;
+	  }
+
+	  public void setSaved_pattern(String savedpattern) {
+	    this.savedPattern = savedpattern;
+	  }
+
+	  /**
+	   * Create a {@code Grok} instance with the given patterns file and
+	   * a {@code Grok} pattern.
+	   *
+	   * @param grokPatternPath Path to the pattern file
+	   * @param grokExpression  - <b>OPTIONAL</b> - Grok pattern to compile ex: %{APACHELOG}
+	   * @return {@code Grok} instance
+	   * @throws Exception
+	   */
+	  public static OpenSOCGrok create(String grokPatternPath, String grokExpression)
+	      throws Exception {
+	    if (StringUtils.isBlank(grokPatternPath)) {
+	      throw new Exception("{grokPatternPath} should not be empty or null");
+	    }
+	    OpenSOCGrok g = new OpenSOCGrok();
+	    g.addPatternFromFile(grokPatternPath);
+	    if (StringUtils.isNotBlank(grokExpression)) {
+	      g.compile(grokExpression);
+	    }
+	    return g;
+	  }
+
+	  /**
+	   * Create a {@code Grok} instance with the given grok patterns file.
+	   *
+	   * @param  grokPatternPath : Path to the pattern file
+	   * @return Grok
+	   * @throws Exception
+	   */
+	  public static OpenSOCGrok create(String grokPatternPath) throws Exception {
+	    return create(grokPatternPath, null);
+	  }
+
+	  /**
+	   * Add custom pattern to grok in the runtime.
+	   *
+	   * @param name : Pattern Name
+	   * @param pattern : Regular expression Or {@code Grok} pattern
+	   * @throws Exception
+	   **/
+	  public void addPattern(String name, String pattern) throws Exception {
+	    if (StringUtils.isBlank(name)) {
+	      throw new Exception("Invalid Pattern name");
+	    }
+	    if (StringUtils.isBlank(name)) {
+	      throw new Exception("Invalid Pattern");
+	    }
+	    grokPatternDefinition.put(name, pattern);
+	  }
+
+	  /**
+	   * Copy the given Map of patterns (pattern name, regular expression) to {@code Grok},
+	   * duplicate element will be override.
+	   *
+	   * @param cpy : Map to copy
+	   * @throws Exception
+	   **/
+	  public void copyPatterns(Map<String, String> cpy) throws Exception {
+	    if (cpy == null) {
+	      throw new Exception("Invalid Patterns");
+	    }
+
+	    if (cpy.isEmpty()) {
+	      throw new Exception("Invalid Patterns");
+	    }
+	    for (Map.Entry<String, String> entry : cpy.entrySet()) {
+	      grokPatternDefinition.put(entry.getKey().toString(), entry.getValue().toString());
+	    }
+	  }
+
+	  /**
+	   * Get the current map of {@code Grok} pattern.
+	   *
+	   * @return Patterns (name, regular expression)
+	   */
+	  public Map<String, String> getPatterns() {
+	    return grokPatternDefinition;
+	  }
+
+	  /**
+	   * Get the named regex from the {@code Grok} pattern. <p></p>
+	   * See {@link #compile(String)} for more detail.
+	   * @return named regex
+	   */
+	  public String getNamedRegex() {
+	    return namedRegex;
+	  }
+
+	  /**
+	   * Add patterns to {@code Grok} from the given file.
+	   *
+	   * @param file : Path of the grok pattern
+	   * @throws Exception
+	   */
+	  public void addPatternFromFile(String file) throws Exception {
+
+	    File f = new File(file);
+	    if (!f.exists()) {
+	      throw new Exception("Pattern not found");
+	    }
+
+	    if (!f.canRead()) {
+	      throw new Exception("Pattern cannot be read");
+	    }
+
+	    FileReader r = null;
+	    try {
+	      r = new FileReader(f);
+	      addPatternFromReader(r);
+	    } catch (FileNotFoundException e) {
+	      throw new Exception(e.getMessage());
+	    } catch (@SuppressWarnings("hiding") IOException e) {
+	      throw new Exception(e.getMessage());
+	    } finally {
+	      try {
+	        if (r != null) {
+	          r.close();
+	        }
+	      } catch (IOException io) {
+	        // TODO(anthony) : log the error
+	      }
+	    }
+	  }
+
+	  /**
+	   * Add patterns to {@code Grok} from a Reader.
+	   *
+	   * @param r : Reader with {@code Grok} patterns
+	   * @throws Exception
+	   */
+	  public void addPatternFromReader(Reader r) throws Exception {
+	    BufferedReader br = new BufferedReader(r);
+	    String line;
+	    // We dont want \n and commented line
+	    Pattern pattern = Pattern.compile("^([A-z0-9_]+)\\s+(.*)$");
+	    try {
+	      while ((line = br.readLine()) != null) {
+	        Matcher m = pattern.matcher(line);
+	        if (m.matches()) {
+	          this.addPattern(m.group(1), m.group(2));
+	        }
+	      }
+	      br.close();
+	    } catch (IOException e) {
+	      throw new Exception(e.getMessage());
+	    } catch (Exception e) {
+	      throw new Exception(e.getMessage());
+	    }
+
+	  }
+
+	  /**
+	   * Match the given <tt>log</tt> with the named regex.
+	   * And return the json representation of the matched element
+	   *
+	   * @param log : log to match
+	   * @return json representation og the log
+	   */
+	  public String capture(String log){
+		  OpenSOCMatch match = match(log);
+	    match.captures();
+	    return match.toJson();
+	  }
+
+	  /**
+	   * Match the given list of <tt>log</tt> with the named regex
+	   * and return the list of json representation of the matched elements.
+	   *
+	   * @param logs : list of log
+	   * @return list of json representation of the log
+	   */
+	  public List<String> captures(List<String> logs){
+	    List<String> matched = new ArrayList<String>();
+	    for (String log : logs) {
+	    	OpenSOCMatch match = match(log);
+	      match.captures();
+	      matched.add(match.toJson());
+	    }
+	    return matched;
+	  }
+
+	  /**
+	   * Match the given <tt>text</tt> with the named regex
+	   * {@code Grok} will extract data from the string and get an extence of {@link Match}.
+	   *
+	   * @param text : Single line of log
+	   * @return Grok Match
+	   */
+	  public OpenSOCMatch match(String text) {
+	    if (compiledNamedRegex == null || StringUtils.isBlank(text)) {
+	      return OpenSOCMatch.EMPTY;
+	    }
+
+	    Matcher m = compiledNamedRegex.matcher(text);
+	    OpenSOCMatch match = new OpenSOCMatch();
+	    if (m.find()) {
+	      match.setSubject(text);
+	      match.setGrok(this);
+	      match.setMatch(m);
+	      match.setStart(m.start(0));
+	      match.setEnd(m.end(0));
+	    }
+	    return match;
+	  }
+
+	  /**
+	   * Compile the {@code Grok} pattern to named regex pattern.
+	   *
+	   * @param pattern : Grok pattern (ex: %{IP})
+	   * @throws Exception
+	   */
+	  public void compile(String pattern) throws Exception {
+
+	    if (StringUtils.isBlank(pattern)) {
+	      throw new Exception("{pattern} should not be empty or null");
+	    }
+
+	    namedRegex = pattern;
+	    originalGrokPattern = pattern;
+	    int index = 0;
+	    /** flag for infinite recurtion */
+	    int iterationLeft = 1000;
+	    Boolean continueIteration = true;
+
+	    // Replace %{foo} with the regex (mostly groupname regex)
+	    // and then compile the regex
+	    while (continueIteration) {
+	      continueIteration = false;
+	      if (iterationLeft <= 0) {
+	        throw new Exception("Deep recursion pattern compilation of " + originalGrokPattern);
+	      }
+	      iterationLeft--;
+
+	      Matcher m = GrokUtils.GROK_PATTERN.matcher(namedRegex);
+	      // Match %{Foo:bar} -> pattern name and subname
+	      // Match %{Foo=regex} -> add new regex definition
+	      if (m.find()) {
+	        continueIteration = true;
+	        Map<String, String> group = m.namedGroups();
+	        if (group.get("definition") != null) {
+	          try {
+	            addPattern(group.get("pattern"), group.get("definition"));
+	            group.put("name", group.get("name") + "=" + group.get("definition"));
+	          } catch (Exception e) {
+	            // Log the exeception
+	          }
+	        }
+	        namedRegexCollection.put("name" + index,
+	            (group.get("subname") != null ? group.get("subname") : group.get("name")));
+	        namedRegex =
+	            StringUtils.replace(namedRegex, "%{" + group.get("name") + "}", "(?<name" + index + ">"
+	                + grokPatternDefinition.get(group.get("pattern")) + ")");
+	        // System.out.println(_expanded_pattern);
+	        index++;
+	      }
+	    }
+
+	    if (namedRegex.isEmpty()) {
+	      throw new Exception("Pattern not fount");
+	    }
+	    // Compile the regex
+	    compiledNamedRegex = Pattern.compile(namedRegex);
+	  }
+
+	 	  /**
+	   * Original grok pattern used to compile to the named regex.
+	   *
+	   * @return String Original Grok pattern
+	   */
+	  public String getOriginalGrokPattern(){
+	    return originalGrokPattern;
+	  }
+
+	  /**
+	   * Get the named regex from the given id.
+	   *
+	   * @param id : named regex id
+	   * @return String of the named regex
+	   */
+	  public String getNamedRegexCollectionById(String id) {
+	    return namedRegexCollection.get(id);
+	  }
+
+	  /**
+	   * Get the full collection of the named regex.
+	   *
+	   * @return named RegexCollection
+	   */
+	  public Map<String, String> getNamedRegexCollection() {
+	    return namedRegexCollection;
+	  }
+	}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronMatch.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronMatch.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronMatch.java
new file mode 100644
index 0000000..bd4f0ad
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/MetronMatch.java
@@ -0,0 +1,280 @@
+package com.opensoc.parsing.parsers;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import com.google.code.regexp.Matcher;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class OpenSOCMatch implements Serializable {
+
+	private static final long serialVersionUID = -1129245286587945311L;
+	private String subject; // texte
+	  private Map<String, Object> capture;
+	  private OpenSOCGarbage garbage;
+	  private OpenSOCGrok grok;
+	  private Matcher match;
+	  private int start;
+	  private int end;
+
+	  /**
+	   * For thread safety
+	   */
+	  private static ThreadLocal<OpenSOCMatch> matchHolder = new ThreadLocal<OpenSOCMatch>() {
+		  @Override
+		  protected OpenSOCMatch initialValue() {
+			  return new OpenSOCMatch();
+		  }
+	  };
+
+	  /**
+	   *Create a new {@code Match} object.
+	   */
+	  public OpenSOCMatch() {
+	    subject = "Nothing";
+	    grok = null;
+	    match = null;
+	    capture = new TreeMap<String, Object>();
+	    garbage = new OpenSOCGarbage();
+	    start = 0;
+	    end = 0;
+	  }
+
+	  /**
+	   * Create Empty grok matcher
+	   */
+	  public static final OpenSOCMatch EMPTY = new OpenSOCMatch();
+
+	  public void setGrok(OpenSOCGrok grok){
+	    if (grok != null) {
+	      this.grok = grok;
+	    }
+	  }
+
+	  public Matcher getMatch() {
+	    return match;
+	  }
+
+	  public void setMatch(Matcher match) {
+	    this.match = match;
+	  }
+
+	  public int getStart() {
+	    return start;
+	  }
+
+	  public void setStart(int start) {
+	    this.start = start;
+	  }
+
+	  public int getEnd() {
+	    return end;
+	  }
+
+	  public void setEnd(int end) {
+	    this.end = end;
+	  }
+
+	  /**
+	   * Singleton.
+	   *
+	   * @return instance of Match
+	   */
+	  public static OpenSOCMatch getInstance() {
+		 return matchHolder.get();
+	  }
+
+	  /**
+	   *  Set the single line of log to parse.
+	   *
+	   * @param text : single line of log
+	   */
+	  public void setSubject(String text) {
+	    if (text == null) {
+	      return;
+	    }
+	    if (text.isEmpty()) {
+	      return;
+	    }
+	    subject = text;
+	  }
+
+	  /**
+	   * Retrurn the single line of log.
+	   *
+	   * @return the single line of log
+	   */
+	  public String getSubject() {
+	    return subject;
+	  }
+
+	  /**
+	   * Match to the <tt>subject</tt> the <tt>regex</tt> and save the matched element into a map.
+	   *
+	   */
+	  public void captures() {
+	    if (match == null) {
+	      return;
+	    }
+	    capture.clear();
+
+	    // _capture.put("LINE", this.line);
+	    // _capture.put("LENGTH", this.line.length() +"");
+
+	    Map<String, String> mappedw = this.match.namedGroups();
+	    Iterator<Entry<String, String>> it = mappedw.entrySet().iterator();
+	    while (it.hasNext()) {
+
+	      @SuppressWarnings("rawtypes")
+	      Map.Entry pairs = (Map.Entry) it.next();
+	      String key = null;
+	      Object value = null;
+	      if (this.grok.getNamedRegexCollectionById(pairs.getKey().toString()) == null) {
+	        key = pairs.getKey().toString();
+	      } else if (!this.grok.getNamedRegexCollectionById(pairs.getKey().toString()).isEmpty()) {
+	        key = this.grok.getNamedRegexCollectionById(pairs.getKey().toString());
+	      }
+	      if (pairs.getValue() != null) {
+	        value = pairs.getValue().toString();
+	        
+	        KeyValue keyValue = OpenSOCConverter.convert(key, value);
+	        
+	        //get validated key
+	        key = keyValue.getKey();
+	        
+	        //resolve value
+	        if (keyValue.getValue() instanceof String) {
+	        	 value = cleanString((String)keyValue.getValue());
+	        } else {
+	        	value = keyValue.getValue();
+	        }
+	        
+	        //set if grok failure
+	        if (keyValue.hasGrokFailure()) {
+	        	capture.put(key + "_grokfailure", keyValue.getGrokFailure());
+	        }
+	      }
+
+	      capture.put(key, value);
+	      it.remove(); // avoids a ConcurrentModificationException
+	    }
+	  }
+
+
+	  /**
+	   * remove from the string the quote and double quote.
+	   *
+	   * @param string to pure: "my/text"
+	   * @return unquoted string: my/text
+	   */
+	  private String cleanString(String value) {
+	    if (value == null) {
+	      return value;
+	    }
+	    if (value.isEmpty()) {
+	      return value;
+	    }
+	    char[] tmp = value.toCharArray();
+	    if ((tmp[0] == '"' && tmp[value.length() - 1] == '"')
+	        || (tmp[0] == '\'' && tmp[value.length() - 1] == '\'')) {
+	      value = value.substring(1, value.length() - 1);
+	    }
+	    return value;
+	  }
+
+
+	  /**
+	   * Get the json representation of the matched element.
+	   * <p>
+	   * example:
+	   * map [ {IP: 127.0.0.1}, {status:200}]
+	   * will return
+	   * {"IP":"127.0.0.1", "status":200}
+	   * </p>
+	   * If pretty is set to true, json will return prettyprint json string.
+	   *
+	   * @return Json of the matched element in the text
+	   */
+	  public String toJson(Boolean pretty) {
+	    if (capture == null) {
+	      return "{}";
+	    }
+	    if (capture.isEmpty()) {
+	      return "{}";
+	    }
+
+	    this.cleanMap();
+	    Gson gs;
+	    if (pretty) {
+	     gs = new GsonBuilder().setPrettyPrinting().create();
+	    } else {
+	      gs = new Gson();
+	    }
+	    return gs.toJson(/* cleanMap( */capture/* ) */);
+	  }
+
+	  /**
+	   * Get the json representation of the matched element.
+	   * <p>
+	   * example:
+	   * map [ {IP: 127.0.0.1}, {status:200}]
+	   * will return
+	   * {"IP":"127.0.0.1", "status":200}
+	   * </p>
+	   *
+	   * @return Json of the matched element in the text
+	   */
+	  public String toJson() {
+	    return toJson(false);
+	  }
+
+	  /**
+	   * Get the map representation of the matched element in the text.
+	   *
+	   * @return map object from the matched element in the text
+	   */
+	  public Map<String, Object> toMap() {
+	    this.cleanMap();
+	    return capture;
+	  }
+
+	  /**
+	   * Remove and rename the unwanted elelents in the matched map.
+	   */
+	  private void cleanMap() {
+	    garbage.rename(capture);
+	    garbage.remove(capture);
+	  }
+
+	  /**
+	   * Util fct.
+	   *
+	   * @return boolean
+	   */
+	  public Boolean isNull() {
+	    if (this.match == null) {
+	      return true;
+	    }
+	    return false;
+	  }
+
+	  /**
+	   * Util fct.
+	   *
+	   * @param s
+	   * @return boolean
+	   */
+	  private boolean isInteger(String s) {
+	    try {
+	      Integer.parseInt(s);
+	    } catch (NumberFormatException e) {
+	      return false;
+	    }
+	    return true;
+	  }
+	  
+	}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/ParserUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/ParserUtils.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/ParserUtils.java
new file mode 100644
index 0000000..b986cae
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/ParserUtils.java
@@ -0,0 +1,23 @@
+package com.opensoc.parsing.parsers;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+
+public class ParserUtils {
+	
+	public static final String PREFIX = "stream2file";
+	public static final String SUFFIX = ".tmp";
+
+	public static File stream2file(InputStream in) throws IOException {
+		final File tempFile = File.createTempFile(PREFIX, SUFFIX);
+		tempFile.deleteOnExit();
+		try (FileOutputStream out = new FileOutputStream(tempFile)) {
+			IOUtils.copy(in, out);
+		}
+		return tempFile;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/90cda3ff/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/PcapParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/PcapParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/PcapParser.java
new file mode 100644
index 0000000..5740977
--- /dev/null
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/com/apache/metron/parsing/parsers/PcapParser.java
@@ -0,0 +1,185 @@
+package com.opensoc.parsing.parsers;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
+import org.krakenapps.pcap.decoder.ethernet.EthernetType;
+import org.krakenapps.pcap.decoder.ip.IpDecoder;
+import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
+import org.krakenapps.pcap.decoder.tcp.TcpPacket;
+import org.krakenapps.pcap.decoder.udp.UdpPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+import com.opensoc.pcap.Constants;
+import com.opensoc.pcap.OpenSocEthernetDecoder;
+import com.opensoc.pcap.PacketInfo;
+import com.opensoc.pcap.PcapByteInputStream;
+
+/**
+ * The Class PcapParser.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public final class PcapParser {
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger.getLogger(PcapParser.class);
+
+  /** The ETHERNET_DECODER. */
+  private static final EthernetDecoder ETHERNET_DECODER = new OpenSocEthernetDecoder();
+
+  /** The ip decoder. */
+  private static final IpDecoder IP_DECODER = new IpDecoder();
+
+  // /** The tcp decoder. */
+  // private static final TcpDecoder TCP_DECODER = new TcpDecoder(new
+  // TcpPortProtocolMapper());
+  //
+  // /** The udp decoder. */
+  // private static final UdpDecoder UDP_DECODER = new UdpDecoder(new
+  // UdpPortProtocolMapper());
+
+  static {
+    // IP_DECODER.register(InternetProtocol.TCP, TCP_DECODER);
+    // IP_DECODER.register(InternetProtocol.UDP, UDP_DECODER);
+    ETHERNET_DECODER.register(EthernetType.IPV4, IP_DECODER);
+  }
+
+  /**
+   * Instantiates a new pcap parser.
+   */
+  private PcapParser() { // $codepro.audit.disable emptyMethod
+
+  }
+
+  /**
+   * Parses the.
+   * 
+   * @param pcap
+   *          the pcap
+   * @return the list * @throws IOException Signals that an I/O exception has
+   *         occurred. * @throws IOException * @throws IOException * @throws
+   *         IOException
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static List<PacketInfo> parse(byte[] pcap) throws IOException {
+    List<PacketInfo> packetInfoList = new ArrayList<PacketInfo>();
+
+    PcapByteInputStream pcapByteInputStream = new PcapByteInputStream(pcap);
+
+    GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
+    while (true) {
+      try
+
+      {
+        PcapPacket packet = pcapByteInputStream.getPacket();
+        // int packetCounter = 0;
+        // PacketHeader packetHeader = null;
+        // Ipv4Packet ipv4Packet = null;
+        TcpPacket tcpPacket = null;
+        UdpPacket udpPacket = null;
+        // Buffer packetDataBuffer = null;
+        int sourcePort = 0;
+        int destinationPort = 0;
+
+        // LOG.trace("Got packet # " + ++packetCounter);
+
+        // LOG.trace(packet.getPacketData());
+        ETHERNET_DECODER.decode(packet);
+
+        PacketHeader packetHeader = packet.getPacketHeader();
+        Ipv4Packet ipv4Packet = Ipv4Packet.parse(packet.getPacketData());
+
+        if (ipv4Packet.getProtocol() == Constants.PROTOCOL_TCP) {
+          tcpPacket = TcpPacket.parse(ipv4Packet);
+
+        }
+
+        if (ipv4Packet.getProtocol() == Constants.PROTOCOL_UDP) {
+
+          Buffer packetDataBuffer = ipv4Packet.getData();
+          sourcePort = packetDataBuffer.getUnsignedShort();
+          destinationPort = packetDataBuffer.getUnsignedShort();
+
+          udpPacket = new UdpPacket(ipv4Packet, sourcePort, destinationPort);
+
+          udpPacket.setLength(packetDataBuffer.getUnsignedShort());
+          udpPacket.setChecksum(packetDataBuffer.getUnsignedShort());
+          packetDataBuffer.discardReadBytes();
+          udpPacket.setData(packetDataBuffer);
+        }
+
+        packetInfoList.add(new PacketInfo(globalHeader, packetHeader, packet,
+            ipv4Packet, tcpPacket, udpPacket));
+      } catch (NegativeArraySizeException ignored) {
+        LOG.debug("Ignorable exception while parsing packet.", ignored);
+      } catch (EOFException eof) { // $codepro.audit.disable logExceptions
+        // Ignore exception and break
+        break;
+      }
+    }
+    return packetInfoList;
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   * @throws InterruptedException
+   *           the interrupted exception
+   */
+  public static void main(String[] args) throws IOException,
+      InterruptedException {
+
+    double totalIterations = 1000000;
+    double parallelism = 64;
+    double targetEvents = 1000000;
+
+    File fin = new File("/Users/sheetal/Downloads/bad_packets/bad_packet_1405988125427.pcap");
+    File fout = new File(fin.getAbsolutePath() + ".parsed");
+    byte[] pcapBytes = FileUtils.readFileToByteArray(fin);
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < totalIterations; i++) {
+      List<PacketInfo> list = parse(pcapBytes);
+
+      for (PacketInfo packetInfo : list) {
+        System.out.println(packetInfo.getJsonIndexDoc());
+      }
+    }
+    long endTime = System.currentTimeMillis();
+
+    System.out.println("Time taken to process " + totalIterations + " events :"
+        + (endTime - startTime) + " milliseconds");
+
+    System.out
+        .println("With parallelism of "
+            + parallelism
+            + " estimated time to process "
+            + targetEvents
+            + " events: "
+            + (((((endTime - startTime) / totalIterations) * targetEvents) / parallelism) / 1000)
+            + " seconds");
+    System.out.println("With parallelism of " + parallelism
+        + " estimated # of events per second: "
+        + ((parallelism * 1000 * totalIterations) / (endTime - startTime))
+        + " events");
+    System.out.println("Expected Parallelism to process " + targetEvents
+        + " events in a second: "
+        + (targetEvents / ((1000 * totalIterations) / (endTime - startTime))));
+  }
+
+}
\ No newline at end of file


Mime
View raw message