metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sirs...@apache.org
Subject incubator-metron git commit: Merge branch 'bro-parser' of github.com:sirsean/bro-parser
Date Fri, 22 Jan 2016 18:06:31 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master bfa640493 -> 36fabec88


Merge branch 'bro-parser' of github.com:sirsean/bro-parser

This fixes apache/incubator-metron#11

Squashed commit of the following:

commit b3f8096365e5f82f4e14e81a1e906cf7dd696072
Author: Sean Schulte <sirsean@gmail.com>
Date:   Thu Jan 21 15:11:11 2016 -0600

    Minor tweaks after code review.

commit 41e159f61d5e68066024493e50079f3d1ba1302b
Author: Sean Schulte <sirsean@gmail.com>
Date:   Fri Jan 15 16:32:42 2016 -0600

    Bro Parser Improvements

    The old Bro parser required that the data came wrapped in a key that
    specified the protocol. But not every Bro produces data in that format;
    now the parser supports that format AND a format that is flatter and the
    protocol is just another attribute in the message.

    This also contains some minor code cleanup in the Bro parser.


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/36fabec8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/36fabec8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/36fabec8

Branch: refs/heads/master
Commit: 36fabec887c91a5b3c3065fc0a227113daad4813
Parents: bfa6404
Author: Sean Schulte <sirsean@gmail.com>
Authored: Fri Jan 22 12:05:18 2016 -0600
Committer: Sean Schulte <sirsean@gmail.com>
Committed: Fri Jan 22 12:05:18 2016 -0600

----------------------------------------------------------------------
 .../metron/parsing/parsers/BasicBroParser.java  | 238 +++++++++----------
 .../metron/parsing/test/BasicBroParserTest.java |  21 ++
 2 files changed, 136 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/36fabec8/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
index cf6070a..870991c 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicBroParser.java
@@ -22,130 +22,122 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.metron.tldextractor.BasicTldExtractor;
-
 @SuppressWarnings("serial")
 public class BasicBroParser extends AbstractParser {
 
-	protected static final Logger _LOG = LoggerFactory
-			.getLogger(BasicBroParser.class);
-	private JSONCleaner cleaner = new JSONCleaner();
-	private BasicTldExtractor tldex = new BasicTldExtractor();
-
-	@SuppressWarnings("unchecked")
-	public JSONObject parse(byte[] msg) {
-
-		_LOG.trace("[Metron] Starting to parse incoming message");
-
-		String raw_message = null;
-
-		try {
-
-			raw_message = new String(msg, "UTF-8");
-			_LOG.trace("[Metron] Received message: " + raw_message);
-
-			JSONObject cleaned_message = cleaner.Clean(raw_message);
-			_LOG.debug("[Metron] Cleaned message: " + raw_message);
-
-			if (cleaned_message == null || cleaned_message.isEmpty())
-				throw new Exception("Unable to clean message: " + raw_message);
-
-			String key = cleaned_message.keySet().iterator().next().toString();
-
-			if (key == null)
-				throw new Exception("Unable to retrieve key for message: "
-						+ raw_message);
-
-			JSONObject payload = (JSONObject) cleaned_message.get(key);
-
-			String originalString = " |";
-			for (Object k : payload.keySet()) {
-				originalString = originalString + " " + k.toString() + ":"
-						+ payload.get(k).toString();
-			}
-			originalString = key.toUpperCase() + originalString;
-			payload.put("original_string", originalString);
-
-			if (payload == null)
-				throw new Exception("Unable to retrieve payload for message: "
-						+ raw_message);
-
-			if (payload.containsKey("ts")) {
-				String ts = payload.remove("ts").toString();
-				payload.put("timestamp", ts);
-				_LOG.trace("[Metron] Added ts to: " + payload);
-			}
-
-			if (payload.containsKey("id.orig_h")) {
-				String source_ip = payload.remove("id.orig_h").toString();
-				payload.put("ip_src_addr", source_ip);
-				_LOG.trace("[Metron] Added ip_src_addr to: " + payload);
-			} else if (payload.containsKey("tx_hosts")) {
-				JSONArray txHosts = (JSONArray) payload.remove("tx_hosts");
-				if (txHosts != null && !txHosts.isEmpty()) {
-					payload.put("ip_src_addr", txHosts.get(0));
-					_LOG.trace("[Metron] Added ip_src_addr to: " + payload);
-				}
-			}
-			
-			if (payload.containsKey("id.resp_h")) {
-				String source_ip = payload.remove("id.resp_h").toString();
-				payload.put("ip_dst_addr", source_ip);
-				_LOG.trace("[Metron] Added ip_dst_addr to: " + payload);
-			} else if (payload.containsKey("rx_hosts")) {
-				JSONArray rxHosts = (JSONArray) payload.remove("rx_hosts");
-				if (rxHosts != null && !rxHosts.isEmpty()) {
-					payload.put("ip_dst_addr", rxHosts.get(0));
-					_LOG.trace("[Metron] Added ip_dst_addr to: " + payload);
-				}
-			}
-			
-			if (payload.containsKey("id.orig_p")) {
-				String source_port = payload.remove("id.orig_p").toString();
-				payload.put("ip_src_port", source_port);
-				_LOG.trace("[Metron] Added ip_src_port to: " + payload);
-			}
-			if (payload.containsKey("id.resp_p")) {
-				String dest_port = payload.remove("id.resp_p").toString();
-				payload.put("ip_dst_port", dest_port);
-				_LOG.trace("[Metron] Added ip_dst_port to: " + payload);
-			}
-			
-//			if (payload.containsKey("host")) {
-//
-//				String host = payload.get("host").toString().trim();
-//				String tld = tldex.extractTLD(host);
-//
-//				payload.put("tld", tld);
-//				_LOG.trace("[Metron] Added tld to: " + payload);
-//
-//			}
-//			if (payload.containsKey("query")) {
-//				String host = payload.get("query").toString();
-//				String[] parts = host.split("\\.");
-//				int length = parts.length;
-//				if (length >= 2) {
-//					payload.put("tld", parts[length - 2] + "."
-//							+ parts[length - 1]);
-//					_LOG.trace("[Metron] Added tld to: " + payload);
-//				}
-//			}
-
-			_LOG.trace("[Metron] Inner message: " + payload);
-
-			payload.put("protocol", key);
-			_LOG.debug("[Metron] Returning parsed message: " + payload);
-
-			return payload;
-
-		} catch (Exception e) {
-
-			_LOG.error("Unable to Parse Message: " + raw_message);
-			e.printStackTrace();
-			return null;
-		}
-
-	}
-
-	
+    protected static final Logger _LOG = LoggerFactory
+            .getLogger(BasicBroParser.class);
+    private JSONCleaner cleaner = new JSONCleaner();
+
+    @SuppressWarnings("unchecked")
+    public JSONObject parse(byte[] msg) {
+
+        _LOG.trace("[Metron] Starting to parse incoming message");
+
+        String rawMessage = null;
+
+        try {
+            rawMessage = new String(msg, "UTF-8");
+            _LOG.trace("[Metron] Received message: " + rawMessage);
+
+            JSONObject cleanedMessage = cleaner.Clean(rawMessage);
+            _LOG.debug("[Metron] Cleaned message: " + cleanedMessage);
+
+            if (cleanedMessage == null || cleanedMessage.isEmpty()) {
+                throw new Exception("Unable to clean message: " + rawMessage);
+            }
+
+            String key;
+            JSONObject payload;
+            if (cleanedMessage.containsKey("type")) {
+                key = cleanedMessage.get("type").toString();
+                payload = cleanedMessage;
+            } else {
+                key = cleanedMessage.keySet().iterator().next().toString();
+
+                if (key == null) {
+                    throw new Exception("Unable to retrieve key for message: "
+                            + rawMessage);
+                }
+
+                payload = (JSONObject) cleanedMessage.get(key);
+            }
+
+            if (payload == null) {
+                throw new Exception("Unable to retrieve payload for message: "
+                    + rawMessage);
+            }
+
+            String originalString = key.toUpperCase() + " |";
+            for (Object k : payload.keySet()) {
+                String value = payload.get(k).toString();
+                originalString += " " + k.toString() + ":" + value;
+            }
+            payload.put("original_string", originalString);
+
+            replaceKey(payload, "timestamp", new String[]{ "ts" });
+
+            if (payload.containsKey("timestamp")) {
+                try {
+                    long timestamp = Long.parseLong(payload.get("timestamp").toString());
+                    payload.put("timestamp", timestamp);
+                } catch (NumberFormatException nfe) {
+                    _LOG.error(String.format("[Metron] timestamp is invalid: %s", payload.get("timestamp")));
+                    payload.put("timestamp", 0);
+                }
+            }
+
+            boolean ipSrcReplaced = replaceKey(payload, "ip_src_addr", new String[]{"source_ip",
"id.orig_h"});
+            if (!ipSrcReplaced) {
+                replaceKeyArray(payload, "ip_src_addr", new String[]{ "tx_hosts" });
+            }
+
+            boolean ipDstReplaced = replaceKey(payload, "ip_dst_addr", new String[]{"dest_ip",
"id.resp_h"});
+            if (!ipDstReplaced) {
+                replaceKeyArray(payload, "ip_dst_addr", new String[]{ "rx_hosts" });
+            }
+
+            replaceKey(payload, "ip_src_port", new String[]{"source_port", "id.orig_p"});
+            replaceKey(payload, "ip_dst_port", new String[]{"dest_port", "id.resp_p"});
+
+            payload.put("protocol", key);
+            _LOG.debug("[Metron] Returning parsed message: " + payload);
+
+            return payload;
+
+        } catch (Exception e) {
+
+            _LOG.error("Unable to Parse Message: " + rawMessage);
+            e.printStackTrace();
+            return null;
+        }
+
+    }
+
+    private boolean replaceKey(JSONObject payload, String toKey, String[] fromKeys) {
+        for (String fromKey : fromKeys) {
+            if (payload.containsKey(fromKey)) {
+                Object value = payload.remove(fromKey);
+                payload.put(toKey, value);
+                _LOG.trace(String.format("[Metron] Added %s to %s", toKey, payload));
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean replaceKeyArray(JSONObject payload, String toKey, String[] fromKeys)
{
+        for (String fromKey : fromKeys) {
+            if (payload.containsKey(fromKey)) {
+                JSONArray value = (JSONArray) payload.remove(fromKey);
+                if (value != null && !value.isEmpty()) {
+                    payload.put(toKey, value.get(0));
+                    _LOG.trace(String.format("[Metron] Added %s to %s", toKey, payload));
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/36fabec8/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
index 7b05adf..2f643d7 100644
--- a/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
+++ b/metron-streaming/Metron-MessageParsers/src/test/java/org/apache/metron/parsing/test/BasicBroParserTest.java
@@ -29,6 +29,27 @@ public class BasicBroParserTest extends TestCase {
 		jsonParser = new JSONParser();		
 	}
 
+    public void testUnwrappedBroMessage() throws ParseException {
+        String rawMessage = "{\"timestamp\":\"1449511228474\",\"uid\":\"CFgSLp4HgsGqXnNjZi\",\"source_ip\":\"104.130.172.191\",\"source_port\":33893,\"dest_ip\":\"69.20.0.164\",\"dest_port\":53,\"proto\":\"udp\",\"trans_id\":3514,\"rcode\":3,\"rcode_name\":\"NXDOMAIN\",\"AA\":false,\"TC\":false,\"RD\":false,\"RA\":false,\"Z\":0,\"rejected\":false,\"sensor\":\"cloudbro\",\"type\":\"dns\"}";
+
+        JSONObject rawJson = (JSONObject)jsonParser.parse(rawMessage);
+
+        JSONObject broJson = broParser.parse(rawMessage.getBytes());
+
+        assertEquals(broJson.get("timestamp"), Long.parseLong(rawJson.get("timestamp").toString()));
+        assertEquals(broJson.get("ip_src_addr").toString(), rawJson.get("source_ip").toString());
+        assertEquals(broJson.get("ip_dst_addr").toString(), rawJson.get("dest_ip").toString());
+        assertEquals(broJson.get("ip_src_port"), rawJson.get("source_port"));
+        assertEquals(broJson.get("ip_dst_port"), rawJson.get("dest_port"));
+        assertEquals(broJson.get("uid").toString(), rawJson.get("uid").toString());
+        assertEquals(broJson.get("trans_id").toString(), rawJson.get("trans_id").toString());
+        assertEquals(broJson.get("sensor").toString(), rawJson.get("sensor").toString());
+        assertEquals(broJson.get("protocol").toString(), rawJson.get("type").toString());
+        assertEquals(broJson.get("rcode").toString(), rawJson.get("rcode").toString());
+        assertEquals(broJson.get("rcode_name").toString(), rawJson.get("rcode_name").toString());
+        assertTrue(broJson.get("original_string").toString().startsWith("DNS"));
+    }
+
 	@SuppressWarnings("rawtypes")
 	public void testHttpBroMessage() throws ParseException {
 		String rawMessage = "{\"http\":{\"ts\":1402307733473,\"uid\":\"CTo78A11g7CYbbOHvj\",\"id.orig_h\":\"192.249.113.37\",\"id.orig_p\":58808,\"id.resp_h\":\"72.163.4.161\",\"id.resp_p\":80,\"trans_depth\":1,\"method\":\"GET\",\"host\":\"www.cisco.com\",\"uri\":\"/\",\"user_agent\":\"curl/7.22.0
(x86_64-pc-linux-gnu) libcurl/7.22.0 OpenSSL/1.0.1 zlib/1.2.3.4 libidn/1.23 librtmp/2.3\",\"request_body_len\":0,\"response_body_len\":25523,\"status_code\":200,\"status_msg\":\"OK\",\"tags\":[],\"resp_fuids\":[\"FJDyMC15lxUn5ngPfd\"],\"resp_mime_types\":[\"text/html\"]}}";


Mime
View raw message