metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sirs...@apache.org
Subject [47/85] [partial] incubator-metron git commit: Rename all OpenSOC files to Metron
Date Thu, 14 Jan 2016 17:03:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/ise/parser/TokenMgrError.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/ise/parser/TokenMgrError.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/ise/parser/TokenMgrError.java
new file mode 100644
index 0000000..944d483
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/ise/parser/TokenMgrError.java
@@ -0,0 +1,147 @@
+/* Generated By:JavaCC: Do not edit this line. TokenMgrError.java Version 5.0 */
+/* JavaCCOptions: */
+package com.opensoc.ise.parser;
+
+/** Token Manager Error. */
+class TokenMgrError extends Error
+{
+
+  /**
+   * The version identifier for this Serializable class.
+   * Increment only if the <i>serialized</i> form of the
+   * class changes.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /*
+   * Ordinals for various reasons why an Error of this type can be thrown.
+   */
+
+  /**
+   * Lexical error occurred.
+   */
+  static final int LEXICAL_ERROR = 0;
+
+  /**
+   * An attempt was made to create a second instance of a static token manager.
+   */
+  static final int STATIC_LEXER_ERROR = 1;
+
+  /**
+   * Tried to change to an invalid lexical state.
+   */
+  static final int INVALID_LEXICAL_STATE = 2;
+
+  /**
+   * Detected (and bailed out of) an infinite loop in the token manager.
+   */
+  static final int LOOP_DETECTED = 3;
+
+  /**
+   * Indicates the reason why the exception is thrown. It will have
+   * one of the above 4 values.
+   */
+  int errorCode;
+
+  /**
+   * Replaces unprintable characters by their escaped (or unicode escaped)
+   * equivalents in the given string
+   */
+  protected static final String addEscapes(String str) {
+    StringBuffer retval = new StringBuffer();
+    char ch;
+    for (int i = 0; i < str.length(); i++) {
+      switch (str.charAt(i))
+      {
+        case 0 :
+          continue;
+        case '\b':
+          retval.append("\\b");
+          continue;
+        case '\t':
+          retval.append("\\t");
+          continue;
+        case '\n':
+          retval.append("\\n");
+          continue;
+        case '\f':
+          retval.append("\\f");
+          continue;
+        case '\r':
+          retval.append("\\r");
+          continue;
+        case '\"':
+          retval.append("\\\"");
+          continue;
+        case '\'':
+          retval.append("\\\'");
+          continue;
+        case '\\':
+          retval.append("\\\\");
+          continue;
+        default:
+          if ((ch = str.charAt(i)) < 0x20 || ch > 0x7e) {
+            String s = "0000" + Integer.toString(ch, 16);
+            retval.append("\\u" + s.substring(s.length() - 4, s.length()));
+          } else {
+            retval.append(ch);
+          }
+          continue;
+      }
+    }
+    return retval.toString();
+  }
+
+  /**
+   * Returns a detailed message for the Error when it is thrown by the
+   * token manager to indicate a lexical error.
+   * Parameters :
+   *    EOFSeen     : indicates if EOF caused the lexical error
+   *    curLexState : lexical state in which this error occurred
+   *    errorLine   : line number when the error occurred
+   *    errorColumn : column number when the error occurred
+   *    errorAfter  : prefix that was seen before this error occurred
+   *    curchar     : the offending character
+   * Note: You can customize the lexical error message by modifying this method.
+   */
+  protected static String LexicalError(boolean EOFSeen, int lexState, int errorLine, int errorColumn, String errorAfter, char curChar) {
+    return("Lexical error at line " +
+          errorLine + ", column " +
+          errorColumn + ".  Encountered: " +
+          (EOFSeen ? "<EOF> " : ("\"" + addEscapes(String.valueOf(curChar)) + "\"") + " (" + (int)curChar + "), ") +
+          "after : \"" + addEscapes(errorAfter) + "\"");
+  }
+
+  /**
+   * You can also modify the body of this method to customize your error messages.
+   * For example, cases like LOOP_DETECTED and INVALID_LEXICAL_STATE are not
+   * of end-users concern, so you can return something like :
+   *
+   *     "Internal Error : Please file a bug report .... "
+   *
+   * from this method for such cases in the release version of your parser.
+   */
+  public String getMessage() {
+    return super.getMessage();
+  }
+
+  /*
+   * Constructors of various flavors follow.
+   */
+
+  /** No arg constructor. */
+  public TokenMgrError() {
+  }
+
+  /** Constructor with message and reason. */
+  public TokenMgrError(String message, int reason) {
+    super(message);
+    errorCode = reason;
+  }
+
+  /** Full Constructor. */
+  public TokenMgrError(boolean EOFSeen, int lexState, int errorLine, int errorColumn, String errorAfter, char curChar, int reason) {
+    this(LexicalError(EOFSeen, lexState, errorLine, errorColumn, errorAfter, curChar), reason);
+  }
+}
+/* JavaCC - OriginalChecksum=5fbf6813c9d6a1d713f1d4a002af1322 (do not edit this line) */

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONDecoderHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONDecoderHelper.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONDecoderHelper.java
new file mode 100644
index 0000000..1954aa7
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONDecoderHelper.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.json.serialization;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+/**
+ * Helper class used for decoding objects from byte arrays 
+ *
+ * @author kiran
+ * 
+ */
+public class JSONDecoderHelper {
+
+	public static String getString(DataInputStream data) throws IOException {
+
+		int strSize = data.readInt();
+
+		byte[] bytes = new byte[strSize];
+		data.read(bytes);
+		return new String(bytes);
+	}
+
+	public static Number getNumber(DataInputStream data) throws IOException {
+		// Treating all ints,shorts, long as long.
+		// Everything else as Double
+		int flag = data.readByte();
+		if (flag == 0)
+			return data.readDouble();
+
+		return data.readLong();
+	}
+
+	public static Boolean getBoolean(DataInputStream data) throws IOException {
+
+		return data.readBoolean();
+	}
+
+	@SuppressWarnings("unchecked")
+	public static JSONArray getArray(DataInputStream data) throws IOException {
+		// TODO Auto-generated method stub
+		JSONArray output = new JSONArray();
+		int size = data.readInt();
+
+		for (int i = 0; i < size; i++) {
+			Object value = getObject(data);
+			output.add(value);
+		}
+
+		return output;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static JSONObject getJSON(DataInputStream data) throws IOException {
+		// TODO Auto-generated method stub
+		JSONObject output = new JSONObject();
+		int size = data.readInt();
+
+		for (int i = 0; i < size; i++) {
+			String key = (String) getObject(data);
+			Object value = getObject(data);
+			output.put(key, value);
+		}
+
+		return output;
+	}
+
+	public static Object getObject(DataInputStream data) throws IOException {
+		// TODO Auto-generated method stub
+		byte objID = data.readByte();
+
+		if (objID == JSONKafkaSerializer.StringID)
+			return getString(data);
+
+		if (objID == JSONKafkaSerializer.JSONObjectID)
+			return getJSON(data);
+
+		if (objID == JSONKafkaSerializer.NumberID)
+			return getNumber(data);
+
+		if (objID == JSONKafkaSerializer.BooleanID)
+			return getBoolean(data);
+
+		if (objID == JSONKafkaSerializer.NULLID)
+			return null;
+
+		if (objID == JSONKafkaSerializer.JSONArrayID)
+			return getArray(data);
+
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONEncoderHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONEncoderHelper.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONEncoderHelper.java
new file mode 100644
index 0000000..b388397
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONEncoderHelper.java
@@ -0,0 +1,89 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.json.serialization;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONObject;
+
+/**
+ * Helper class used for encoding objects into byte arrays 
+ *
+ * @author kiran
+ * 
+ */
+public class JSONEncoderHelper {
+
+	public static void putNull(DataOutputStream data, Object value)
+			throws IOException {
+		// TODO Auto-generated method stub
+		data.writeByte(JSONKafkaSerializer.NULLID);
+
+	}
+
+	public static void putBoolean(DataOutputStream data, Boolean value)
+			throws IOException {
+		// TODO Auto-generated method stub
+		data.writeByte(JSONKafkaSerializer.BooleanID);
+		data.writeBoolean(value);
+
+	}
+
+	public static void putNumber(DataOutputStream data, Number value)
+			throws IOException {
+		// TODO Auto-generated method stub
+		data.writeByte(JSONKafkaSerializer.NumberID);
+		if (value instanceof Double) {
+			data.writeByte(0);
+			data.writeDouble((Double) value);
+			return;
+		}
+		data.writeByte(1);
+		data.writeLong((Long) value);
+
+	}
+
+	public static void putString(DataOutputStream data, String str)
+			throws IOException {
+		// String ID is 1
+		data.writeByte(JSONKafkaSerializer.StringID);
+		data.writeInt(str.length());
+		data.write(str.getBytes());
+
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public static JSONObject getJSON(Configuration config) {
+
+		JSONObject output = new JSONObject();
+
+		if (!config.isEmpty()) {
+			Iterator it = config.getKeys();
+			while (it.hasNext()) {
+				String k = (String) it.next();
+				// noinspection unchecked
+				String v = (String) config.getProperty(k);
+				output.put(k, v);
+			}
+		}
+		return output;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONKafkaSerializer.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONKafkaSerializer.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONKafkaSerializer.java
new file mode 100644
index 0000000..c08444f
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONKafkaSerializer.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.json.serialization;
+
+import static com.opensoc.json.serialization.JSONDecoderHelper.getObject;
+import static com.opensoc.json.serialization.JSONEncoderHelper.putBoolean;
+import static com.opensoc.json.serialization.JSONEncoderHelper.putNull;
+import static com.opensoc.json.serialization.JSONEncoderHelper.putNumber;
+import static com.opensoc.json.serialization.JSONEncoderHelper.putString;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+import kafka.utils.VerifiableProperties;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+/**
+ * JSON Serailization class for kafka. Implements kafka Encoder and Decoder
+ * String, JSONObject, Number, Boolean,JSONObject.NULL JSONArray
+ * 
+ * @author kiran
+ * 
+ */
+
+public class JSONKafkaSerializer implements Encoder<JSONObject>,
+		Decoder<JSONObject> {
+
+	// Object ID's for different types
+	public static final byte StringID = 1;
+	public static final byte JSONObjectID = 2;
+	public static final byte NumberID = 3;
+	public static final byte BooleanID = 4;
+	public static final byte NULLID = 5;
+	public static final byte JSONArrayID = 6;
+
+	public JSONKafkaSerializer() {
+		// Blank constructor needed by Storm
+
+	}
+
+	public JSONKafkaSerializer(VerifiableProperties props) {
+		// Do Nothing. constructor needed by Storm
+	}
+
+	/*
+	 * Main Method for unit testing
+	 */
+	public static void main(String args[]) throws IOException {
+
+		//String Input = "/home/kiran/git/opensoc-streaming/OpenSOC-Common/BroExampleOutput";
+		String Input = "/tmp/test";
+
+		BufferedReader reader = new BufferedReader(new FileReader(Input));
+
+		// String jsonString =
+		// "{\"dns\":{\"ts\":[14.0,12,\"kiran\"],\"uid\":\"abullis@mail.csuchico.edu\",\"id.orig_h\":\"10.122.196.204\", \"endval\":null}}";
+		String jsonString ="";// reader.readLine();
+		JSONParser parser = new JSONParser();
+		JSONObject json = null;
+		int count = 1;
+
+		if (args.length > 0)
+			count = Integer.parseInt(args[0]);
+
+		//while ((jsonString = reader.readLine()) != null) 
+		jsonString = reader.readLine();
+		{
+			try {
+				json = (JSONObject) parser.parse(jsonString);
+				System.out.println(json);
+			} catch (ParseException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+
+			String jsonString2 = null;
+
+			JSONKafkaSerializer ser = new JSONKafkaSerializer();
+
+			for (int i = 0; i < count; i++) {
+				byte[] bytes = ser.toBytes(json);
+
+				jsonString2 = ((JSONObject)ser.fromBytes(bytes)).toJSONString();
+			}
+			System.out.println((jsonString2));
+			System.out
+					.println(jsonString2.equalsIgnoreCase(json.toJSONString()));
+		}
+
+	}
+
+	@SuppressWarnings("unchecked")
+	public JSONObject fromBytes(byte[] input) {
+
+		ByteArrayInputStream inputBuffer = new ByteArrayInputStream(input);
+		DataInputStream data = new DataInputStream(inputBuffer);
+
+		JSONObject output = new JSONObject();
+
+		try {
+			int mapSize = data.readInt();
+
+			for (int i = 0; i < mapSize; i++) {
+				String key = (String) getObject(data);
+				// System.out.println("Key Found"+ key);
+				Object val = getObject(data);
+				output.put(key, val);
+			}
+
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			return null;
+		}
+
+		return output;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public JSONObject fromBytes1(DataInputStream data) {
+
+		//ByteArrayInputStream inputBuffer = new ByteArrayInputStream(input);
+		//DataInputStream data = new DataInputStream(inputBuffer);
+
+		JSONObject output = new JSONObject();
+
+		try {
+			int mapSize = data.readInt();
+
+			for (int i = 0; i < mapSize; i++) {
+				String key = (String) getObject(data);
+				// System.out.println("Key Found"+ key);
+				Object val = getObject(data);
+				output.put(key, val);
+			}
+
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			return null;
+		}
+
+		return output;
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public byte[] toBytes(JSONObject input) {
+
+		ByteArrayOutputStream outputBuffer = new ByteArrayOutputStream();
+		DataOutputStream data = new DataOutputStream(outputBuffer);
+
+		Iterator it = input.entrySet().iterator();
+		try {
+
+			// write num of entries into output. 
+			//each KV pair is counted as an entry
+			data.writeInt(input.size());
+
+			// Write every single entry in hashmap
+			//Assuming key to be String.
+			while (it.hasNext()) {
+				Map.Entry<String, Object> entry = (Entry<String, Object>) it
+						.next();
+				putObject(data, entry.getKey());
+				putObject(data, entry.getValue());
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+			return null;
+		}
+
+		return outputBuffer.toByteArray();
+	}
+
+	private void putObject(DataOutputStream data, Object value)
+			throws IOException {
+
+		//Check object type and invoke appropriate method
+		if (value instanceof JSONObject) {
+			putJSON(data, (JSONObject) value);
+			return;
+
+		}
+
+		if (value instanceof String) {
+			putString(data, (String) value);
+			return;
+		}
+
+		if (value instanceof Number) {
+			putNumber(data, (Number) value);
+			return;
+		}
+
+		if (value instanceof Boolean) {
+			putBoolean(data, (Boolean) value);
+			return;
+		}
+
+		if (value == null) {
+			putNull(data, value);
+			return;
+		}
+
+		if (value instanceof JSONArray) {
+			putArray(data, (JSONArray) value);
+			return;
+		}
+
+	}
+
+	private void putJSON(DataOutputStream data, JSONObject value)
+			throws IOException {
+
+		// JSON ID is 2
+		data.writeByte(JSONKafkaSerializer.JSONObjectID);
+		data.write(toBytes(value));
+
+	}
+
+	public void putArray(DataOutputStream data, JSONArray array)
+			throws IOException {
+
+		data.writeByte(JSONKafkaSerializer.JSONArrayID);
+
+		data.writeInt(array.size());
+
+		for (Object o : array)
+			putObject(data, o);
+
+	}
+
+
+	
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONKryoSerializer.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONKryoSerializer.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONKryoSerializer.java
new file mode 100644
index 0000000..7b7d394
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/json/serialization/JSONKryoSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.json.serialization;
+
+import org.json.simple.JSONObject;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * @author kiran Custom Serializer to help Storm encode and decode JSONObjects
+ */
+
+public class JSONKryoSerializer extends
+		com.esotericsoftware.kryo.Serializer<JSONObject> {
+
+	// JSONKafkaSerializer object actually does the heavy lifting.
+	private JSONKafkaSerializer jsonSerde = new JSONKafkaSerializer();
+
+	@Override
+	public void write(Kryo kryo, Output output, JSONObject json) {
+
+		byte[] bytes = jsonSerde.toBytes(json);
+		output.writeInt(bytes.length);
+		output.write(bytes);
+	}
+
+	@Override
+	public JSONObject read(Kryo kryo, Input input, Class<JSONObject> type) {
+
+		// Get number of Entries
+		int size = input.readInt();
+		byte[] bytes = input.readBytes(size);
+
+		JSONObject json = jsonSerde.fromBytes(bytes);
+
+		return json;
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/MetricReporter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/MetricReporter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/MetricReporter.java
new file mode 100644
index 0000000..3d344a2
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/MetricReporter.java
@@ -0,0 +1,89 @@
+package com.opensoc.metrics;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteReporter;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+public class MetricReporter {
+
+	final MetricRegistry metrics = new MetricRegistry();
+	private ConsoleReporter consoleReporter = null;
+	private JmxReporter jmxReporter = null;
+	private GraphiteReporter graphiteReporter = null;
+
+	private Class _klas;
+	private String _topologyname = "topology";
+
+	/** The Constant LOGGER. */
+	private static final Logger _Logger = Logger
+			.getLogger(MetricReporter.class);
+
+	public void initialize(Map config, Class klas) {
+
+		_Logger.debug("===========Initializing Reporter");
+		this._klas = klas;
+		if (config.get("topologyname")!=null)
+			_topologyname = (String) config.get("topologyname");
+			
+		this.start(config);
+
+	}
+
+	public Counter registerCounter(String countername) {
+		return metrics.counter(MetricRegistry.name(_topologyname,_klas.getCanonicalName(), countername));
+	}
+
+	public void start(Map config) {
+		try {
+			if (config.get("reporter.jmx").equals("true")) {
+				jmxReporter = JmxReporter.forRegistry(metrics).build();
+				jmxReporter.start();
+			}
+
+			if (config.get("reporter.console").equals("true")) {
+				consoleReporter = ConsoleReporter.forRegistry(metrics).build();
+				consoleReporter.start(1, TimeUnit.SECONDS);
+			}
+
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+
+		try {
+			if (config.get("reporter.graphite").equals("true")) {
+				String address = (String) config.get("graphite.address");
+				int port = Integer.parseInt((String) config
+						.get("graphite.port"));
+
+				_Logger.debug("===========Graphite ADDRESS: " + address + ":"
+						+ port);
+
+				Graphite graphite = new Graphite(new InetSocketAddress(address,
+						port));
+				// Check if graphite connectivity works
+				graphite.connect();
+				graphite.close();
+
+				graphiteReporter = GraphiteReporter.forRegistry(metrics).build(
+						graphite);
+
+				_Logger.debug("---------******STARTING GRAPHITE*********---------");
+				graphiteReporter.start(1, TimeUnit.SECONDS);
+			}
+		}
+
+		catch (IOException io) {
+			_Logger.warn("Unable to Connect to Graphite");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/MyMetricReporter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/MyMetricReporter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/MyMetricReporter.java
new file mode 100644
index 0000000..fc6752f
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/MyMetricReporter.java
@@ -0,0 +1,33 @@
+package com.opensoc.metrics;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.graphite.GraphiteReporter;
+
+public class MyMetricReporter extends MetricReporter {
+	
+	final MetricRegistry metrics = new MetricRegistry();
+	private ConsoleReporter consoleReporter = null;
+	private JmxReporter jmxReporter=null; 
+	private GraphiteReporter graphiteReporter = null;
+
+	
+	public MyMetricReporter(boolean withConsole, boolean withJMX, boolean witGraphite)
+	{
+		consoleReporter = ConsoleReporter.forRegistry(metrics).build();
+		jmxReporter = JmxReporter.forRegistry(metrics).build();
+		graphiteReporter = GraphiteReporter.forRegistry(metrics).build(null);
+	}
+	
+
+	public static void main(String[] args) {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void report() {
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/NullReporter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/NullReporter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/NullReporter.java
new file mode 100644
index 0000000..6585a32
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/metrics/NullReporter.java
@@ -0,0 +1,10 @@
+package com.opensoc.metrics;
+
+public class NullReporter extends MetricReporter {
+	
+	public void report()
+	{
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/parser/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/parser/interfaces/MessageFilter.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/parser/interfaces/MessageFilter.java
new file mode 100644
index 0000000..339a2ec
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/parser/interfaces/MessageFilter.java
@@ -0,0 +1,10 @@
+package com.opensoc.parser.interfaces;
+
+import org.json.simple.JSONObject;
+
+
+public interface MessageFilter {
+
+	public boolean emitTuple(JSONObject message);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/parser/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/parser/interfaces/MessageParser.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/parser/interfaces/MessageParser.java
new file mode 100644
index 0000000..b71e4f9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/parser/interfaces/MessageParser.java
@@ -0,0 +1,11 @@
+package com.opensoc.parser.interfaces;
+
+import org.json.simple.JSONObject;
+
+public interface MessageParser {
+	
+	void initializeParser();
+	void init();
+	JSONObject parse(byte[] raw_message);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/Constants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/Constants.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/Constants.java
new file mode 100644
index 0000000..9192665
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/Constants.java
@@ -0,0 +1,21 @@
+package com.opensoc.pcap;
+
+
+/**
+* The Interface Constants.
+* 
+* @author sheetal
+* @version $Revision: 1.0 $
+*/
+public interface Constants {
+
+/** The protocol tcp. */
+public static final int PROTOCOL_TCP = 6;
+
+/** The protocol udp. */
+public static final int PROTOCOL_UDP = 17;
+
+/** The document key separator. */
+public static final char DOCUMENT_KEY_SEPARATOR = '-';
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/IEEE_802_1Q.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/IEEE_802_1Q.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/IEEE_802_1Q.java
new file mode 100644
index 0000000..6375a3f
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/IEEE_802_1Q.java
@@ -0,0 +1,27 @@
+package com.opensoc.pcap;
+
+public class IEEE_802_1Q {
+
+	  int priorityCodePoint = 0;
+	  int dropEligibleIndicator = 0;
+	  int vLANIdentifier = 0;
+
+	  public IEEE_802_1Q(int priorityCodePoint, int dropEligibleIndicator,
+	      int vLANIdentifier) {
+	    this.priorityCodePoint = priorityCodePoint;
+	    this.dropEligibleIndicator = dropEligibleIndicator;
+	    this.vLANIdentifier = vLANIdentifier;
+	  }
+
+	  public int getPriorityCodePoint() {
+	    return priorityCodePoint;
+	  }
+
+	  public int getDropEligibleIndicator() {
+	    return dropEligibleIndicator;
+	  }
+
+	  public int getvLANIdentifier() {
+	    return vLANIdentifier;
+	  }
+	}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/MetronEthernetDecoder.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/MetronEthernetDecoder.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/MetronEthernetDecoder.java
new file mode 100644
index 0000000..2dee341
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/MetronEthernetDecoder.java
@@ -0,0 +1,117 @@
+package com.opensoc.pcap;
+
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
+import org.krakenapps.pcap.decoder.ethernet.EthernetFrame;
+import org.krakenapps.pcap.decoder.ethernet.EthernetProcessor;
+import org.krakenapps.pcap.decoder.ethernet.MacAddress;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+public class OpenSocEthernetDecoder extends EthernetDecoder {
+
+  private Set<EthernetProcessor> callbacks;
+  private Map<Integer, Set<EthernetProcessor>> typeCallbacks;
+
+  public OpenSocEthernetDecoder() {
+    callbacks = new CopyOnWriteArraySet<EthernetProcessor>();
+    typeCallbacks = new ConcurrentHashMap<Integer, Set<EthernetProcessor>>();
+  }
+
+  public void register(EthernetProcessor processor) {
+    this.callbacks.add(processor);
+  }
+
+  public void register(int type, EthernetProcessor processor) {
+    Set<EthernetProcessor> processors = typeCallbacks.get(type);
+    if (processors == null) {
+      processors = new HashSet<EthernetProcessor>();
+      typeCallbacks.put(type, processors);
+    }
+
+    processors.add(processor);
+  }
+
+  public void unregister(EthernetProcessor processor) {
+    this.callbacks.remove(processor);
+  }
+
+  public void unregister(int type, EthernetProcessor processor) {
+    Set<EthernetProcessor> processors = typeCallbacks.get(type);
+    if (processors == null)
+      return;
+
+    processors.remove(processor);
+  }
+
+  public void decode(PcapPacket packet) {
+    // do not reorder following codes (parse sequence)
+    MacAddress destination = getMacAddress(packet.getPacketData());
+    MacAddress source = getMacAddress(packet.getPacketData());
+    int type = getEtherType(packet.getPacketData());
+
+    if (type == 0x8100) {
+      // It is 802.1Q VLAN tag
+      IEEE_802_1Q iee802_1qTag = get802_1qTag(packet.getPacketData());
+      // Now get the type
+      type = getEtherType(packet.getPacketData());
+    }
+
+    Buffer buffer = packet.getPacketData();
+    buffer.discardReadBytes();
+
+    EthernetFrame frame = new EthernetFrame(source, destination, type, buffer);
+    frame.setPcapPacket(packet);
+    dispatch(frame);
+  }
+
+  private MacAddress getMacAddress(Buffer data) {
+    byte[] mac = new byte[6];
+    data.gets(mac, 0, 6);
+    return new MacAddress(mac);
+  }
+
+  private int getEtherType(Buffer data) {
+    return ((int) data.getShort()) & 0x0000FFFF;
+  }
+
+  private IEEE_802_1Q get802_1qTag(Buffer data) {
+
+    // reference http://en.wikipedia.org/wiki/EtherType &
+    // http://en.wikipedia.org/wiki/IEEE_802.1Q
+    byte[] b802_1qTag = new byte[2];
+    data.gets(b802_1qTag, 0, 2);
+    BitSet bits = BitSet.valueOf(b802_1qTag);
+    int pcp = convertBitToInt(bits.get(0, 3));
+    int dei = convertBitToInt(bits.get(3, 4));
+    int vid = convertBitToInt(bits.get(4, 16));
+
+    return new IEEE_802_1Q(pcp, dei, vid);
+  }
+
+  public static int convertBitToInt(BitSet bits) {
+    int value = 0;
+    for (int i = 0; i < bits.length(); ++i) {
+      value += bits.get(i) ? (1 << i) : 0;
+    }
+    return value;
+  }
+
+  private void dispatch(EthernetFrame frame) {
+    for (EthernetProcessor processor : callbacks)
+      processor.process(frame);
+
+    Set<EthernetProcessor> processors = typeCallbacks.get(frame.getType());
+    if (processors == null)
+      return;
+
+    for (EthernetProcessor processor : processors)
+      processor.process(frame.dup());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PacketInfo.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PacketInfo.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PacketInfo.java
new file mode 100644
index 0000000..804387d
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PacketInfo.java
@@ -0,0 +1,454 @@
+package com.opensoc.pcap;
+
+import java.text.MessageFormat;
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
+import org.krakenapps.pcap.decoder.tcp.TcpPacket;
+import org.krakenapps.pcap.decoder.udp.UdpPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+
+import com.opensoc.pcap.Constants;
+import com.opensoc.pcap.PcapUtils;
+
+/**
+ * The Class PacketInfo.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PacketInfo {
+
+  /** The packetHeader. */
+  private PacketHeader packetHeader = null;
+
+  /** The packet. */
+  private PcapPacket packet = null;
+
+  /** The ipv4 packet. */
+  private Ipv4Packet ipv4Packet = null;
+
+  /** The tcp packet. */
+  private TcpPacket tcpPacket = null;
+
+  /** The udp packet. */
+  private UdpPacket udpPacket = null;
+
+  /** The global header. */
+  private GlobalHeader globalHeader = null;
+
+  /** The Constant globalHeaderJsonTemplateSB. */
+  private static final StringBuffer globalHeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant ipv4HeaderJsonTemplateSB. */
+  private static final StringBuffer ipv4HeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant tcpHeaderJsonTemplateSB. */
+  private static final StringBuffer tcpHeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant udpHeaderJsonTemplateSB. */
+  private static final StringBuffer udpHeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger.getLogger(PacketInfo.class);
+  
+  static {
+    globalHeaderJsonTemplateSB.append("<\"global_header\":<\"pcap_id\":\"").append("{0}").append('"');
+    globalHeaderJsonTemplateSB.append(",\"inc_len\":").append("{1}");
+    globalHeaderJsonTemplateSB.append(",\"orig_len\":").append("{2}");
+    globalHeaderJsonTemplateSB.append(",\"ts_sec\":").append("{3}");
+    globalHeaderJsonTemplateSB.append(",\"ts_usec\":").append("{4}");
+    globalHeaderJsonTemplateSB.append(">,"); // NOPMD by sheetal on 1/29/14 2:37
+    // PM
+
+    // ipv4 header
+
+    ipv4HeaderJsonTemplateSB.append("\"ipv4_header\":");
+
+    ipv4HeaderJsonTemplateSB.append("\"ip_dst\":").append("{0}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_dst_addr\":\"").append("{1}");
+    ipv4HeaderJsonTemplateSB.append("\",\"ip_flags\":").append("{2}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_fragment_offset\":").append("{3}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_header_checksum\":").append("{4}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_id\":").append("{5}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_header_length\":").append("{6}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_protocol\":").append("{7}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_src\":").append("{8}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_src_addr\":\"").append("{9}");
+    ipv4HeaderJsonTemplateSB.append("\",\"ip_tos\":").append("{10}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_total_length\":").append("{11}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_ttl\":").append("{12}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_version\":").append("{13}");
+    ipv4HeaderJsonTemplateSB.append('>');
+
+    // tcp header
+    tcpHeaderJsonTemplateSB.append(",\"tcp_header\":<\"ack\":").append("{0}");
+    tcpHeaderJsonTemplateSB.append(",\"checksum\":").append("{1}");
+    tcpHeaderJsonTemplateSB.append(",\"data_length\":").append("{2}");
+    tcpHeaderJsonTemplateSB.append(",\"data_offset\":").append("{3}");
+    tcpHeaderJsonTemplateSB.append(",\"dst_addr\":\"").append("{4}");
+    tcpHeaderJsonTemplateSB.append("\",\"dst_port\":").append("{5}");
+    tcpHeaderJsonTemplateSB.append(",\"direction\":").append("{6}");
+    tcpHeaderJsonTemplateSB.append(",\"flags\":").append("{7}");
+    tcpHeaderJsonTemplateSB.append(",\"reassembled_length \":").append("{8}");
+    tcpHeaderJsonTemplateSB.append(",\"relative_ack\":").append("{9}");
+    tcpHeaderJsonTemplateSB.append(",\"relative_seq\":").append("{10}");
+    tcpHeaderJsonTemplateSB.append(",\"seq\":").append("{11}");
+    tcpHeaderJsonTemplateSB.append(",\"session_key\":\"").append("{12}");
+    tcpHeaderJsonTemplateSB.append("\",\"src_addr\":\"").append("{13}");
+    tcpHeaderJsonTemplateSB.append("\",\"src_port\":").append("{14}");
+    tcpHeaderJsonTemplateSB.append(",\"total_length\":").append("{15}");
+    tcpHeaderJsonTemplateSB.append(",\"urgent_pointer\":").append("{16}");
+    tcpHeaderJsonTemplateSB.append(",\"window\":").append("{17}");
+    tcpHeaderJsonTemplateSB.append(">>");
+
+    // udp headers
+    udpHeaderJsonTemplateSB.append(",\"udp_header\":<\"checksum\":").append("{0}");
+    udpHeaderJsonTemplateSB.append(",\"dst_port\":").append("{1}");
+    udpHeaderJsonTemplateSB.append(",\"length\":").append("{2}");
+    udpHeaderJsonTemplateSB.append(",\"src_port\":").append("{3}");
+    udpHeaderJsonTemplateSB.append(",\"dst_addr\":\"").append("{4}");
+    udpHeaderJsonTemplateSB.append("\",\"src_addr\":\"").append("{5}").append('"');
+    tcpHeaderJsonTemplateSB.append(">>");
+
+  }
+
+  /** The Constant globalHeaderJsonTemplateString. */
+  private static final String globalHeaderJsonTemplateString = globalHeaderJsonTemplateSB.toString();
+
+  /** The Constant ipv4HeaderJsonTemplateString. */
+  private static final String ipv4HeaderJsonTemplateString = ipv4HeaderJsonTemplateSB.toString();
+
+  /** The Constant tcpHeaderJsonTemplateString. */
+  private static final String tcpHeaderJsonTemplateString = tcpHeaderJsonTemplateSB.toString();
+
+  /** The Constant udpHeaderJsonTemplateString. */
+  private static final String udpHeaderJsonTemplateString = udpHeaderJsonTemplateSB.toString();
+
+  /**
+   * Instantiates a new packet info.
+   * 
+   * @param globalHeader
+   *          the global header
+   * @param packetHeader
+   *          the packet header
+   * @param packet
+   *          the packet
+   * @param ipv4Packet
+   *          the ipv4 packet
+   * @param tcpPacket
+   *          the tcp packet
+   * @param udpPacket
+   *          the udp packet
+   */
+  public PacketInfo(GlobalHeader globalHeader, PacketHeader packetHeader, PcapPacket packet, Ipv4Packet ipv4Packet, TcpPacket tcpPacket,
+      UdpPacket udpPacket) {
+    this.packetHeader = packetHeader;
+    this.packet = packet;
+    this.ipv4Packet = ipv4Packet;
+    this.tcpPacket = tcpPacket;
+    this.udpPacket = udpPacket;
+    this.globalHeader = globalHeader;
+  }
+
+  /**
+   * Gets the global header.
+   * 
+   * @return the global header
+   */
+  public GlobalHeader getGlobalHeader() {
+    return globalHeader;
+  }
+
+  /**
+   * Gets the packet header.
+   * 
+   * 
+   * @return the packet header
+   */
+  public PacketHeader getPacketHeader() {
+    return packetHeader;
+  }
+
+  /**
+   * Gets the packet.
+   * 
+   * 
+   * @return the packet
+   */
+  public PcapPacket getPacket() {
+    return packet;
+  }
+
+  /**
+   * Gets the ipv4 packet.
+   * 
+   * 
+   * @return the ipv4 packet
+   */
+  public Ipv4Packet getIpv4Packet() {
+    return ipv4Packet;
+  }
+
+  /**
+   * Gets the tcp packet.
+   * 
+   * 
+   * @return the tcp packet
+   */
+  public TcpPacket getTcpPacket() {
+    return tcpPacket;
+  }
+
+  /**
+   * Gets the udp packet.
+   * 
+   * 
+   * @return the udp packet
+   */
+  public UdpPacket getUdpPacket() {
+    return udpPacket;
+  }
+
+  /**
+   * Gets the key.
+   * 
+   * 
+   * @return the key
+   */
+  public String getKey() {
+    int sourcePort = 0;
+    int destinationPort = 0;
+    if (Constants.PROTOCOL_UDP == ipv4Packet.getProtocol()) {
+      sourcePort = udpPacket.getSourcePort();
+
+      destinationPort = udpPacket.getDestinationPort();
+
+    } else if (Constants.PROTOCOL_TCP == ipv4Packet.getProtocol()) {
+      sourcePort = tcpPacket.getSourcePort();
+
+      destinationPort = tcpPacket.getDestinationPort();
+
+    }
+
+    return PcapUtils.getSessionKey(ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet.getDestinationAddress().getHostAddress(),
+        ipv4Packet.getProtocol(), sourcePort, destinationPort, ipv4Packet.getId(), ipv4Packet.getFragmentOffset());
+
+  }
+
+  /**
+   * Gets the short key
+   * 
+   * 
+   * @return the short key
+   */
+  public String getShortKey() {
+	int sourcePort = 0;
+	int destinationPort = 0;
+	if(Constants.PROTOCOL_UDP == ipv4Packet.getProtocol()) {
+		sourcePort = udpPacket.getSourcePort();
+		destinationPort = udpPacket.getDestinationPort();
+	} else if (Constants.PROTOCOL_TCP == ipv4Packet.getProtocol()) {
+		sourcePort = tcpPacket.getSourcePort();
+		destinationPort = tcpPacket.getDestinationPort();
+	}
+	  
+	return PcapUtils.getShortSessionKey(ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet.getDestinationAddress().getHostAddress(),
+	    ipv4Packet.getProtocol(), sourcePort, destinationPort);
+			 
+  }
+  
+  /**
+   * Gets the json doc.
+   * 
+   * 
+   * @return the json doc
+   */
+  public String getJsonDoc() {
+
+    return getJsonDocUsingSBAppend();
+  }
+
+  /**
+   * Gets the json doc.
+   * 
+   * 
+   * @return the json doc
+   */
+  public String getJsonIndexDoc() {
+
+    return getJsonIndexDocUsingSBAppend();
+  }
+
+  /**
+   * Gets the json doc using sb append.
+   * 
+   * @return the json doc using sb append
+   */
+  private String getJsonDocUsingSBAppend() {
+
+	
+    StringBuffer jsonSb = new StringBuffer(1024);
+
+    // global header
+    jsonSb.append("{\"global_header\":{\"pcap_id\":\"").append(getKey());
+    jsonSb.append("\",\"inc_len\":").append(packetHeader.getInclLen());
+    jsonSb.append(",\"orig_len\":").append(packetHeader.getOrigLen());
+    jsonSb.append(",\"ts_sec\":").append(packetHeader.getTsSec());
+    jsonSb.append(",\"ts_usec\":").append(packetHeader.getTsUsec());
+    jsonSb.append("},"); // NOPMD by sheetal on 1/29/14 2:37 PM
+
+    // ipv4 header
+
+    jsonSb.append("\"ipv4_header\":{");
+
+    jsonSb.append("\"ip_dst\":").append(ipv4Packet.getDestination());
+    jsonSb.append(",\"ip_dst_addr\":\"").append(ipv4Packet.getDestinationAddress().getHostAddress());
+    jsonSb.append("\",\"ip_flags\":").append(ipv4Packet.getFlags());
+    jsonSb.append(",\"ip_fragment_offset\":").append(ipv4Packet.getFragmentOffset());
+    jsonSb.append(",\"ip_header_checksum\":").append(ipv4Packet.getHeaderChecksum());
+    jsonSb.append(",\"ip_id\":").append(ipv4Packet.getId());
+    jsonSb.append(",\"ip_header_length\":").append(ipv4Packet.getIhl());
+    jsonSb.append(",\"ip_protocol\":").append(ipv4Packet.getProtocol());
+    jsonSb.append(",\"ip_src\":").append(ipv4Packet.getSource());
+    jsonSb.append(",\"ip_src_addr\":\"").append(ipv4Packet.getSourceAddress().getHostAddress());
+    jsonSb.append("\",\"ip_tos\":").append(ipv4Packet.getTos());
+    jsonSb.append(",\"ip_total_length\":").append(ipv4Packet.getTotalLength());
+    jsonSb.append(",\"ip_ttl\":").append(ipv4Packet.getTtl());
+    jsonSb.append(",\"ip_version\":").append(ipv4Packet.getVersion());
+    jsonSb.append('}');
+
+    // tcp header
+    if (tcpPacket != null) {
+      jsonSb.append(",\"tcp_header\":{\"ack\":").append(tcpPacket.getAck());
+      jsonSb.append(",\"checksum\":").append(tcpPacket.getChecksum());
+      jsonSb.append(",\"data_length\":").append(tcpPacket.getDataLength());
+      jsonSb.append(",\"data_offset\":").append(tcpPacket.getDataOffset());
+      jsonSb.append(",\"dst_addr\":\"").append(tcpPacket.getDestinationAddress().getHostAddress());
+      jsonSb.append("\",\"dst_port\":").append(tcpPacket.getDestinationPort());
+      jsonSb.append(",\"direction\":").append(tcpPacket.getDirection());
+      jsonSb.append(",\"flags\":").append(tcpPacket.getFlags());
+      jsonSb.append(",\"reassembled_length \":").append(tcpPacket.getReassembledLength());
+      jsonSb.append(",\"relative_ack\":").append(tcpPacket.getRelativeAck());
+      jsonSb.append(",\"relative_seq\":").append(tcpPacket.getRelativeSeq());
+      jsonSb.append(",\"seq\":").append(tcpPacket.getSeq());
+      jsonSb.append(",\"session_key\":\"").append(tcpPacket.getSessionKey());
+      jsonSb.append("\",\"src_addr\":\"").append(tcpPacket.getSourceAddress().getHostAddress());
+      jsonSb.append("\",\"src_port\":").append(tcpPacket.getSourcePort());
+      jsonSb.append(",\"total_length\":").append(tcpPacket.getTotalLength());
+      jsonSb.append(",\"urgent_pointer\":").append(tcpPacket.getUrgentPointer());
+      jsonSb.append(",\"window\":").append(tcpPacket.getWindow());
+      jsonSb.append('}');
+    }
+
+    // udp headers
+    if (udpPacket != null) {
+      jsonSb.append(",\"udp_header\":{\"checksum\":").append(udpPacket.getChecksum());
+      jsonSb.append(",\"dst_port\":").append(udpPacket.getDestinationPort());
+      jsonSb.append(",\"length\":").append(udpPacket.getLength());
+      jsonSb.append(",\"src_port\":").append(udpPacket.getSourcePort());
+      jsonSb.append(",\"dst_addr\":\"").append(udpPacket.getDestination().getAddress().getHostAddress());
+      jsonSb.append("\",\"src_addr\":\"").append(udpPacket.getSource().getAddress().getHostAddress());
+      jsonSb.append("\"}");
+    }
+
+    jsonSb.append('}');
+
+    return jsonSb.toString();
+  }
+
+  /**
+   * Gets the json doc using message format.
+   * 
+   * @return the json doc using message format
+   */
+  private String getJsonDocUsingMessageFormat() {
+
+    StringBuffer jsonSb = new StringBuffer(600);
+
+    jsonSb.append(MessageFormat.format(globalHeaderJsonTemplateString, getKey(), packetHeader.getInclLen(), packetHeader.getOrigLen(),
+        packetHeader.getTsSec(), packetHeader.getTsUsec()));
+
+    jsonSb.append(MessageFormat.format(ipv4HeaderJsonTemplateString, ipv4Packet.getDestination(), ipv4Packet.getDestinationAddress()
+        .getHostAddress(), ipv4Packet.getFlags(), ipv4Packet.getFragmentOffset(), ipv4Packet.getHeaderChecksum(), ipv4Packet.getId(),
+        ipv4Packet.getIhl(), ipv4Packet.getProtocol(), ipv4Packet.getSource(), ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet
+            .getTos(), ipv4Packet.getTotalLength(), ipv4Packet.getTtl(), ipv4Packet.getVersion()));
+
+    // tcp header
+    if (tcpPacket != null) {
+      jsonSb.append(MessageFormat.format(tcpHeaderJsonTemplateString, tcpPacket.getAck(), tcpPacket.getChecksum(), tcpPacket
+          .getDataLength(), tcpPacket.getDataOffset(), tcpPacket.getDestinationAddress().getHostAddress(), tcpPacket.getDestinationPort(),
+          tcpPacket.getDirection(), tcpPacket.getFlags(), tcpPacket.getReassembledLength(), tcpPacket.getRelativeAck(), tcpPacket
+              .getRelativeSeq(), tcpPacket.getSeq(), tcpPacket.getSessionKey(), tcpPacket.getSourceAddress().getHostAddress(), tcpPacket
+              .getSourcePort(), tcpPacket.getTotalLength(), tcpPacket.getUrgentPointer(), tcpPacket.getWindow()));
+    } else
+    // udp headers
+    if (udpPacket != null) {
+      jsonSb.append(MessageFormat.format(udpHeaderJsonTemplateString, udpPacket.getChecksum(), udpPacket.getDestinationPort(),
+          udpPacket.getLength(), udpPacket.getSourcePort(), udpPacket.getDestination().getAddress().getHostAddress(), udpPacket.getSource()
+              .getAddress().getHostAddress()));
+
+    } else {
+      jsonSb.append('}');
+    }
+    return jsonSb.toString().replace('<', '{').replace('>', '}');
+  }
+
+  /**
+   * Gets the json index doc using sb append.
+   * 
+   * @return the json index doc using sb append
+   */
+  private String getJsonIndexDocUsingSBAppend() {
+
+	Long ts_micro = getPacketTimeInNanos() / 1000L;
+	StringBuffer jsonSb = new StringBuffer(175);
+
+	jsonSb.append("{\"pcap_id\":\"").append(getShortKey());
+    jsonSb.append("\",\"ip_protocol\":").append(ipv4Packet.getProtocol());
+    jsonSb.append(",\"ip_id\":").append(ipv4Packet.getId());
+    jsonSb.append(",\"frag_offset\":").append(ipv4Packet.getFragmentOffset());
+    jsonSb.append(",\"ts_micro\":").append(ts_micro);
+
+
+    // tcp header
+    if (tcpPacket != null) {
+      jsonSb.append(",\"ip_src_addr\":\"").append(tcpPacket.getSourceAddress().getHostAddress());
+      jsonSb.append("\",\"ip_src_port\":").append(tcpPacket.getSourcePort());
+      jsonSb.append(",\"ip_dst_addr\":\"").append(tcpPacket.getDestinationAddress().getHostAddress());
+      jsonSb.append("\",\"ip_dst_port\":").append(tcpPacket.getDestinationPort());
+    }
+
+    // udp headers
+    if (udpPacket != null) {
+      jsonSb.append(",\"ip_src_addr\":\"").append(udpPacket.getSource().getAddress().getHostAddress());
+      jsonSb.append("\",\"ip_src_port\":").append(udpPacket.getSourcePort());
+      jsonSb.append(",\"ip_dst_addr\":\"").append(udpPacket.getDestination().getAddress().getHostAddress());
+      jsonSb.append("\",\"ip_dst_port\":").append(udpPacket.getDestinationPort());
+    }
+
+    jsonSb.append('}');
+
+    return jsonSb.toString();
+  }
+  
+  public long getPacketTimeInNanos()
+  {
+	  if ( getGlobalHeader().getMagicNumber() == 0xa1b2c3d4 || getGlobalHeader().getMagicNumber() == 0xd4c3b2a1 )
+	  {
+		  //Time is in micro assemble as nano
+		  LOG.info("Times are in micro according to the magic number");
+		  return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() * 1000L ; 
+	  }
+	  else if ( getGlobalHeader().getMagicNumber() == 0xa1b23c4d || getGlobalHeader().getMagicNumber() == 0x4d3cb2a1 ) {
+		//Time is in nano assemble as nano
+		  LOG.info("Times are in nano according to the magic number");
+		  return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() ; 
+	  }
+	  //Default assume time is in micro assemble as nano
+	  LOG.warn("Unknown magic number. Defaulting to micro");
+	  return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() * 1000L ;  
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapByteInputStream.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapByteInputStream.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapByteInputStream.java
new file mode 100644
index 0000000..44f3597
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapByteInputStream.java
@@ -0,0 +1,168 @@
+package com.opensoc.pcap;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.krakenapps.pcap.PcapInputStream;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+import org.krakenapps.pcap.util.ByteOrderConverter;
+import org.krakenapps.pcap.util.ChainBuffer;
+
+/**
+ * The Class PcapByteInputStream.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapByteInputStream implements PcapInputStream {
+
+  /** The is. */
+  private DataInputStream is;
+
+  /** The global header. */
+  private GlobalHeader globalHeader;
+
+  /**
+   * Opens pcap file input stream.
+   * 
+   * @param pcap
+   *          the byte array to be read
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public PcapByteInputStream(byte[] pcap) throws IOException {
+    is = new DataInputStream(new ByteArrayInputStream(pcap)); // $codepro.audit.disable
+                                                              // closeWhereCreated
+    readGlobalHeader();
+  }
+
+  /**
+   * Reads a packet from pcap byte array.
+   * 
+   * @return the packet throws IOException the stream has been closed and the
+   *         contained input stream does not support reading after close, or
+   *         another I/O error occurs. * @throws IOException Signals that an I/O
+   *         exception has occurred. * @see
+   *         org.krakenapps.pcap.PcapInputStream#getPacket()
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+
+  public PcapPacket getPacket() throws IOException {
+    return readPacket(globalHeader.getMagicNumber());
+  }
+
+  /**
+   * Gets the global header.
+   * 
+   * 
+   * @return the global header
+   */
+  public GlobalHeader getGlobalHeader() {
+    return globalHeader;
+  }
+
+  /**
+   * Read global header.
+   * 
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private void readGlobalHeader() throws IOException {
+    int magic = is.readInt();
+    short major = is.readShort();
+    short minor = is.readShort();
+    int tz = is.readInt();
+    int sigfigs = is.readInt();
+    int snaplen = is.readInt();
+    int network = is.readInt();
+
+    globalHeader = new GlobalHeader(magic, major, minor, tz, sigfigs, snaplen,
+        network);
+
+    if (globalHeader.getMagicNumber() == 0xD4C3B2A1) {
+      globalHeader.swapByteOrder();
+    }
+  }
+
+  /**
+   * Read packet.
+   * 
+   * @param magicNumber
+   *          the magic number
+   * @return the pcap packet * @throws IOException Signals that an I/O exception
+   *         has occurred. * @throws EOFException the EOF exception
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private PcapPacket readPacket(int magicNumber) throws IOException {
+    PacketHeader packetHeader = readPacketHeader(magicNumber);
+    Buffer packetData = readPacketData(packetHeader.getInclLen());
+    return new PcapPacket(packetHeader, packetData);
+  }
+
+  /**
+   * Read packet header.
+   * 
+   * @param magicNumber
+   *          the magic number
+   * @return the packet header * @throws IOException Signals that an I/O
+   *         exception has occurred. * @throws EOFException the EOF exception
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private PacketHeader readPacketHeader(int magicNumber) throws IOException {
+    int tsSec = is.readInt();
+    int tsUsec = is.readInt();
+    int inclLen = is.readInt();
+    int origLen = is.readInt();
+
+    if (magicNumber == 0xD4C3B2A1) {
+      tsSec = ByteOrderConverter.swap(tsSec);
+      tsUsec = ByteOrderConverter.swap(tsUsec);
+      inclLen = ByteOrderConverter.swap(inclLen);
+      origLen = ByteOrderConverter.swap(origLen);
+    }
+
+    return new PacketHeader(tsSec, tsUsec, inclLen, origLen);
+  }
+
+  /**
+   * Read packet data.
+   * 
+   * @param packetLength
+   *          the packet length
+   * @return the buffer * @throws IOException Signals that an I/O exception has
+   *         occurred.
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private Buffer readPacketData(int packetLength) throws IOException {
+    byte[] packets = new byte[packetLength];
+    is.read(packets);
+
+    Buffer payload = new ChainBuffer();
+    payload.addLast(packets);
+    return payload;
+    // return new PacketPayload(packets);
+  }
+
+  /**
+   * Closes pcap stream handle.
+   * 
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred. * @see
+   *           org.krakenapps.pcap.PcapInputStream#close()
+   */
+
+  public void close() throws IOException {
+    is.close(); // $codepro.audit.disable closeInFinally
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapByteOutputStream.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapByteOutputStream.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapByteOutputStream.java
new file mode 100644
index 0000000..8a5ad18
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapByteOutputStream.java
@@ -0,0 +1,288 @@
+// $codepro.audit.disable explicitThisUsage, lossOfPrecisionInCast
+package com.opensoc.pcap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.krakenapps.pcap.PcapOutputStream;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PcapByteOutputStream.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapByteOutputStream implements PcapOutputStream {
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger
+      .getLogger(PcapByteOutputStream.class);
+
+  /** The Constant MAX_CACHED_PACKET_NUMBER. */
+  private static final int MAX_CACHED_PACKET_NUMBER = 1000;
+
+  /** The cached packet num. */
+  private int cachedPacketNum = 0; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+  /** The baos. */
+  private ByteArrayOutputStream baos; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+  /** The list. */
+  private List<Byte> list; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+  /**
+   * Instantiates a new pcap byte output stream.
+   * 
+   * @param baos
+   *          the baos
+   */
+  public PcapByteOutputStream(ByteArrayOutputStream baos) {
+    this.baos = baos;
+    list = new ArrayList<Byte>();
+    createGlobalHeader();
+  }
+
+  /**
+   * Instantiates a new pcap byte output stream.
+   * 
+   * @param baos
+   *          the baos
+   * @param header
+   *          the header
+   */
+  public PcapByteOutputStream(ByteArrayOutputStream baos, GlobalHeader header) {
+    this.baos = baos;
+    list = new ArrayList<Byte>();
+    copyGlobalHeader(header);
+  }
+
+  /**
+   * Creates the global header.
+   */
+  private void createGlobalHeader() {
+    /* magic number(swapped) */
+    list.add((byte) 0xd4);
+    list.add((byte) 0xc3);
+    list.add((byte) 0xb2);
+    list.add((byte) 0xa1);
+
+    /* major version number */
+    list.add((byte) 0x02);
+    list.add((byte) 0x00);
+
+    /* minor version number */
+    list.add((byte) 0x04);
+    list.add((byte) 0x00);
+
+    /* GMT to local correction */
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+
+    /* accuracy of timestamps */
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+
+    /* max length of captured packets, in octets */
+    list.add((byte) 0xff);
+    list.add((byte) 0xff);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+
+    /* data link type(ethernet) */
+    list.add((byte) 0x01);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+  }
+
+  /**
+   * Copy global header.
+   * 
+   * @param header
+   *          the header
+   */
+  private void copyGlobalHeader(GlobalHeader header) {
+    final byte[] magicNumber = intToByteArray(header.getMagicNumber());
+    final byte[] majorVersion = shortToByteArray(header.getMajorVersion());
+    final byte[] minorVersion = shortToByteArray(header.getMinorVersion());
+    final byte[] zone = intToByteArray(header.getThiszone());
+    final byte[] sigFigs = intToByteArray(header.getSigfigs());
+    final byte[] snapLen = intToByteArray(header.getSnaplen());
+    final byte[] network = intToByteArray(header.getNetwork());
+
+    list.add(magicNumber[0]);
+    list.add(magicNumber[1]);
+    list.add(magicNumber[2]);
+    list.add(magicNumber[3]);
+
+    list.add(majorVersion[1]);
+    list.add(majorVersion[0]);
+
+    list.add(minorVersion[1]);
+    list.add(minorVersion[0]);
+
+    list.add(zone[3]);
+    list.add(zone[2]);
+    list.add(zone[1]);
+    list.add(zone[0]);
+
+    list.add(sigFigs[3]);
+    list.add(sigFigs[2]);
+    list.add(sigFigs[1]);
+    list.add(sigFigs[0]);
+
+    list.add(snapLen[3]);
+    list.add(snapLen[2]);
+    list.add(snapLen[1]);
+    list.add(snapLen[0]);
+
+    list.add(network[3]);
+    list.add(network[2]);
+    list.add(network[1]);
+    list.add(network[0]);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.krakenapps.pcap.PcapOutputStream#write(org.krakenapps.pcap.packet
+   * .PcapPacket)
+   */
+  /**
+   * Method write.
+   * 
+   * @param packet
+   *          PcapPacket
+   * 
+   * 
+   * @throws IOException
+   *           * @see org.krakenapps.pcap.PcapOutputStream#write(PcapPacket) * @see
+   *           org.krakenapps.pcap.PcapOutputStream#write(PcapPacket)
+   */
+ 
+  public void write(PcapPacket packet) throws IOException {
+    PacketHeader packetHeader = packet.getPacketHeader();
+
+    int tsSec = packetHeader.getTsSec();
+    int tsUsec = packetHeader.getTsUsec();
+    int inclLen = packetHeader.getInclLen();
+    int origLen = packetHeader.getOrigLen();
+
+    addInt(tsSec);
+    addInt(tsUsec);
+    addInt(inclLen);
+    addInt(origLen);
+
+    Buffer payload = packet.getPacketData();
+
+    try {
+      payload.mark();
+      while (true) {
+        list.add(payload.get());
+      }
+    } catch (BufferUnderflowException e) {
+      //LOG.debug("Ignorable exception while writing packet", e);
+      payload.reset();
+    }
+
+    cachedPacketNum++;
+    if (cachedPacketNum == MAX_CACHED_PACKET_NUMBER) {
+      flush();
+    }
+  }
+
+  /**
+   * Adds the int.
+   * 
+   * @param number
+   *          the number
+   */
+  private void addInt(int number) {
+    list.add((byte) (number & 0xff));
+    list.add((byte) ((number & 0xff00) >> 8));
+    list.add((byte) ((number & 0xff0000) >> 16));
+    list.add((byte) ((number & 0xff000000) >> 24));
+  }
+
+  /**
+   * Int to byte array.
+   * 
+   * @param number
+   *          the number
+   * 
+   * @return the byte[]
+   */
+  private byte[] intToByteArray(int number) {
+    return new byte[] { (byte) (number >>> 24), (byte) (number >>> 16),
+        (byte) (number >>> 8), (byte) number };
+  }
+
+  /**
+   * Short to byte array.
+   * 
+   * @param number
+   *          the number
+   * 
+   * @return the byte[]
+   */
+  private byte[] shortToByteArray(short number) {
+    return new byte[] { (byte) (number >>> 8), (byte) number };
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.krakenapps.pcap.PcapOutputStream#flush()
+   */
+  /**
+   * Method flush.
+   * 
+   * 
+   * @throws IOException
+   *           * @see org.krakenapps.pcap.PcapOutputStream#flush() * @see
+   *           org.krakenapps.pcap.PcapOutputStream#flush()
+   */
+ 
+  public void flush() throws IOException {
+    byte[] fileBinary = new byte[list.size()];
+    for (int i = 0; i < fileBinary.length; i++) {
+      fileBinary[i] = list.get(i);
+    }
+
+    list.clear();
+    baos.write(fileBinary);
+    cachedPacketNum = 0;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.krakenapps.pcap.PcapOutputStream#close()
+   */
+  /**
+   * Method close.
+   * 
+   * 
+   * @throws IOException
+   *           * @see org.krakenapps.pcap.PcapOutputStream#close() * @see
+   *           org.krakenapps.pcap.PcapOutputStream#close()
+   */
+ 
+  public void close() throws IOException {
+    flush();
+    baos.close(); // $codepro.audit.disable closeInFinally
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapMerger.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapMerger.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapMerger.java
new file mode 100644
index 0000000..392523b
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapMerger.java
@@ -0,0 +1,245 @@
+ package com.opensoc.pcap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PcapMerger.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public final class PcapMerger {
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger.getLogger(PcapMerger.class);
+  
+  /** The comparator for PcapPackets */
+  private static PcapPacketComparator PCAP_PACKET_COMPARATOR = new PcapPacketComparator();
+
+  /**
+   * Instantiates a new pcap merger.
+   */
+  private PcapMerger() { // $codepro.audit.disable emptyMethod
+  }
+
+  /**
+   * Merge two pcap byte arrays.
+   * 
+   * @param baos
+   *          the baos
+   * @param pcaps
+   *          the pcaps
+   * 
+   * @throws IOException
+   *           if there is no byte array, no access permission, or other io
+   *           related problems.
+   */
+  // public static void merge(byte[] to, byte[] from) throws IOException {
+  // PcapByteInputStream is = null;
+  // PcapByteOutputStream os = null;
+  // ByteArrayOutputStream baos = null;
+  // try {
+  // is = new PcapByteInputStream(from);
+  // baos = new ByteArrayOutputStream();
+  // os = new PcapByteOutputStream(baos, is.getGlobalHeader());
+  //
+  // writePacket(is, os);
+  // } finally {
+  // closeInput(is);
+  // if (baos != null) {
+  // baos.close();
+  // }
+  // closeOutput(os);
+  // }
+  // }
+
+  public static void merge(ByteArrayOutputStream baos, List<byte[]> pcaps)
+      throws IOException {
+    PcapByteInputStream is = null;
+    PcapByteOutputStream os = null;
+    ByteArrayOutputStream unsortedBaos = new ByteArrayOutputStream();
+    
+    try {
+      int i = 1;
+      for (byte[] pcap : pcaps) {
+        is = new PcapByteInputStream(pcap);
+        if (i == 1) {
+          os = new PcapByteOutputStream(unsortedBaos, is.getGlobalHeader());
+        }
+
+        writePacket(is, os);
+        i++;
+        closeInput(is);
+      }
+    } finally {
+      if (unsortedBaos != null) {
+        unsortedBaos.close();
+      }
+      closeOutput(os);
+      sort(baos, unsortedBaos.toByteArray());
+    }
+  }
+
+  /**
+   * Merge byte array1 with byte array2, and write to output byte array. It
+   * doesn't hurt original pcap dump byte arrays.
+   * 
+   * @param baos
+   *          the baos
+   * @param pcaps
+   *          the pcaps
+   * 
+   * @throws IOException
+   *           if there are no source byte arrays, have no read and/or write
+   *           permissions, or anything else.
+   */
+  public static void merge(ByteArrayOutputStream baos, byte[]... pcaps) // $codepro.audit.disable
+                                                                        // overloadedMethods
+      throws IOException {
+    merge(baos, Arrays.asList(pcaps));
+
+  }
+  
+  /**
+   * Sort the potentially unsorted byte array according to the timestamp
+   * in the packet header
+   * 
+   * @param unsortedBytes
+   * 	a byte array of a pcap file
+   * 
+   * @return byte array of a pcap file with packets in cronological order
+   * 
+   * @throws IOException
+   * 	if there are no source byte arrays, have no read and or write 
+   * 	permission, or anything else.
+   */
+  private static void sort(ByteArrayOutputStream baos, byte[] unsortedBytes) throws IOException {
+	  PcapByteInputStream pcapIs = new PcapByteInputStream(unsortedBytes);
+	  PcapByteOutputStream pcapOs = new PcapByteOutputStream(baos, pcapIs.getGlobalHeader());
+	  PcapPacket packet;
+	  ArrayList<PcapPacket> packetList = new ArrayList<PcapPacket>();
+	  
+	  try {
+		  while (true) {
+			  packet = pcapIs.getPacket();
+			  if (packet == null)
+				  break;
+			  packetList.add(packet);
+			  LOG.debug("Presort packet: " + packet.getPacketHeader().toString());
+		  }
+	  } catch (EOFException e) {
+		  //LOG.debug("Ignoreable exception in sort", e);
+	  }
+	  
+	  Collections.sort(packetList, PCAP_PACKET_COMPARATOR);
+	  for (PcapPacket p : packetList) {
+		  pcapOs.write(p);
+		  LOG.debug("Postsort packet: " + p.getPacketHeader().toString());
+	  }
+	  pcapOs.close();  
+  }
+  
+  /**
+   * Write packet.
+   * 
+   * @param is
+   *          the is
+   * @param os
+   *          the os
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private static void writePacket(PcapByteInputStream is,
+      PcapByteOutputStream os) throws IOException {
+    PcapPacket packet = null;
+    try {
+      while (true) {
+        packet = is.getPacket();
+        if (packet == null) {
+          break;
+        }
+        os.write(packet);
+      }
+    } catch (EOFException e) {
+      //LOG.debug("Ignorable exception in writePacket", e);
+    }
+
+  }
+
+  /**
+   * Close input.
+   * 
+   * @param is
+   *          the is
+   */
+  private static void closeInput(PcapByteInputStream is) {
+    if (is == null) {
+      return;
+    }
+    try {
+      is.close(); // $codepro.audit.disable closeInFinally
+    } catch (IOException e) {
+      LOG.error("Failed to close input stream", e);
+    }
+  }
+
+  /**
+   * Close output.
+   * 
+   * @param os
+   *          the os
+   */
+  private static void closeOutput(PcapByteOutputStream os) {
+    if (os == null) {
+      return;
+    }
+    try {
+      os.close();
+    } catch (IOException e) {
+      LOG.error("Failed to close output stream", e);
+
+    }
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static void main(String[] args) throws IOException {
+    byte[] b1 = FileUtils.readFileToByteArray(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.1.pcap"));
+    byte[] b2 = FileUtils.readFileToByteArray(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.2.pcap"));
+    byte[] b3 = FileUtils.readFileToByteArray(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.3.pcap"));
+
+    ByteArrayOutputStream boas = new ByteArrayOutputStream(); // $codepro.audit.disable
+                                                              // closeWhereCreated
+    PcapMerger.merge(boas, b1, b2, b3);
+
+    FileUtils.writeByteArrayToFile(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.automerged.1.2.pcap"),
+        boas.toByteArray(), false);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapPacketComparator.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapPacketComparator.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapPacketComparator.java
new file mode 100644
index 0000000..29a2414
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapPacketComparator.java
@@ -0,0 +1,22 @@
+package com.opensoc.pcap;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.packet.PcapPacket;
+
+public class PcapPacketComparator implements Comparator<PcapPacket> {
+
+	/** The Constant LOG. */
+	private static final Logger LOG = Logger.getLogger(PcapMerger.class);
+	
+	public int compare(PcapPacket p1, PcapPacket p2) {
+
+		Long p1time = new Long(p1.getPacketHeader().getTsSec()) * 1000000L + new Long(p1.getPacketHeader().getTsUsec());
+		Long p2time = new Long(p2.getPacketHeader().getTsSec()) * 1000000L + new Long(p2.getPacketHeader().getTsUsec());
+		Long delta = p1time - p2time;
+		LOG.debug("p1time: " + p1time.toString() + " p2time: " + p2time.toString() + " delta: " + delta.toString());
+		return delta.intValue();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0648a447/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapParser.java b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapParser.java
new file mode 100644
index 0000000..abc0873
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/com/apache/metron/pcap/PcapParser.java
@@ -0,0 +1,183 @@
+package com.opensoc.pcap;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
+import org.krakenapps.pcap.decoder.ethernet.EthernetType;
+import org.krakenapps.pcap.decoder.ip.IpDecoder;
+import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
+import org.krakenapps.pcap.decoder.tcp.TcpPacket;
+import org.krakenapps.pcap.decoder.udp.UdpPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PcapParser.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public final class PcapParser {
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger.getLogger(PcapParser.class);
+
+  /** The ETHERNET_DECODER. */
+  private static final EthernetDecoder ETHERNET_DECODER = new EthernetDecoder();
+
+  /** The ip decoder. */
+  private static final IpDecoder IP_DECODER = new IpDecoder();
+
+  // /** The tcp decoder. */
+  // private static final TcpDecoder TCP_DECODER = new TcpDecoder(new
+  // TcpPortProtocolMapper());
+  //
+  // /** The udp decoder. */
+  // private static final UdpDecoder UDP_DECODER = new UdpDecoder(new
+  // UdpPortProtocolMapper());
+
+  static {
+    // IP_DECODER.register(InternetProtocol.TCP, TCP_DECODER);
+    // IP_DECODER.register(InternetProtocol.UDP, UDP_DECODER);
+    ETHERNET_DECODER.register(EthernetType.IPV4, IP_DECODER);
+  }
+
+  /**
+   * Instantiates a new pcap parser.
+   */
+  private PcapParser() { // $codepro.audit.disable emptyMethod
+
+  }
+
+  /**
+   * Parses the.
+   * 
+   * @param tcpdump
+   *          the tcpdump
+   * @return the list * @throws IOException Signals that an I/O exception has
+   *         occurred. * @throws IOException * @throws IOException * @throws
+   *         IOException
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static List<PacketInfo> parse(byte[] tcpdump) throws IOException {
+    List<PacketInfo> packetInfoList = new ArrayList<PacketInfo>();
+
+    PcapByteInputStream pcapByteInputStream = new PcapByteInputStream(tcpdump);
+
+    GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
+    while (true) {
+      try
+
+      {
+        PcapPacket packet = pcapByteInputStream.getPacket();
+        // int packetCounter = 0;
+        // PacketHeader packetHeader = null;
+        // Ipv4Packet ipv4Packet = null;
+        TcpPacket tcpPacket = null;
+        UdpPacket udpPacket = null;
+        // Buffer packetDataBuffer = null;
+        int sourcePort = 0;
+        int destinationPort = 0;
+
+        // LOG.trace("Got packet # " + ++packetCounter);
+
+        // LOG.trace(packet.getPacketData());
+        ETHERNET_DECODER.decode(packet);
+
+        PacketHeader packetHeader = packet.getPacketHeader();
+        Ipv4Packet ipv4Packet = Ipv4Packet.parse(packet.getPacketData());
+
+        if (ipv4Packet.getProtocol() == Constants.PROTOCOL_TCP) {
+          tcpPacket = TcpPacket.parse(ipv4Packet);
+
+        }
+
+        if (ipv4Packet.getProtocol() == Constants.PROTOCOL_UDP) {
+
+          Buffer packetDataBuffer = ipv4Packet.getData();
+          sourcePort = packetDataBuffer.getUnsignedShort();
+          destinationPort = packetDataBuffer.getUnsignedShort();
+
+          udpPacket = new UdpPacket(ipv4Packet, sourcePort, destinationPort);
+
+          udpPacket.setLength(packetDataBuffer.getUnsignedShort());
+          udpPacket.setChecksum(packetDataBuffer.getUnsignedShort());
+          packetDataBuffer.discardReadBytes();
+          udpPacket.setData(packetDataBuffer);
+        }
+
+        packetInfoList.add(new PacketInfo(globalHeader, packetHeader, packet,
+            ipv4Packet, tcpPacket, udpPacket));
+      } catch (NegativeArraySizeException ignored) {
+        LOG.debug("Ignorable exception while parsing packet.", ignored);
+      } catch (EOFException eof) { // $codepro.audit.disable logExceptions
+        // Ignore exception and break
+        break;
+      }
+    }
+    return packetInfoList;
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   * @throws InterruptedException
+   *           the interrupted exception
+   */
+  public static void main(String[] args) throws IOException,
+      InterruptedException {
+
+    double totalIterations = 1000000;
+    double parallelism = 64;
+    double targetEvents = 1000000;
+
+    File fin = new File("/Users/sheetal/Downloads/udp.pcap");
+    File fout = new File(fin.getAbsolutePath() + ".parsed");
+    byte[] pcapBytes = FileUtils.readFileToByteArray(fin);
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < totalIterations; i++) {
+      List<PacketInfo> list = parse(pcapBytes);
+
+      for (PacketInfo packetInfo : list) {
+        // FileUtils.writeStringToFile(fout, packetInfo.getJsonDoc(), true);
+        // FileUtils.writeStringToFile(fout, "\n", true);
+        // System.out.println(packetInfo.getJsonDoc());
+      }
+    }
+    long endTime = System.currentTimeMillis();
+
+    System.out.println("Time taken to process " + totalIterations + " events :"
+        + (endTime - startTime) + " milliseconds");
+
+    System.out
+        .println("With parallelism of "
+            + parallelism
+            + " estimated time to process "
+            + targetEvents
+            + " events: "
+            + (((((endTime - startTime) / totalIterations) * targetEvents) / parallelism) / 1000)
+            + " seconds");
+    System.out.println("With parallelism of " + parallelism
+        + " estimated # of events per second: "
+        + ((parallelism * 1000 * totalIterations) / (endTime - startTime))
+        + " events");
+    System.out.println("Expected Parallelism to process " + targetEvents
+        + " events in a second: "
+        + (targetEvents / ((1000 * totalIterations) / (endTime - startTime))));
+  }
+
+}


Mime
View raw message