metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject incubator-metron git commit: METRON-222: Address ConcurrentModificationException in BulkMessageWriter. This closes apache/incubator-metron#150
Date Tue, 14 Jun 2016 19:28:24 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master bd3138832 -> 52a4924be


METRON-222:  Address ConcurrentModificationException in BulkMessageWriter.  This closes apache/incubator-metron#150


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/52a4924b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/52a4924b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/52a4924b

Branch: refs/heads/master
Commit: 52a4924be7e8682640edf7f19b97a4838f7037f2
Parents: bd31388
Author: cstella <cestella@gmail.com>
Authored: Tue Jun 14 15:28:08 2016 -0400
Committer: cstella <cestella@gmail.com>
Committed: Tue Jun 14 15:28:08 2016 -0400

----------------------------------------------------------------------
 .../enrichment/bolt/BulkMessageWriterBolt.java     | 17 +----------------
 .../metron/enrichment/bolt/EnrichmentJoinBolt.java |  2 +-
 .../enrichment/bolt/EnrichmentSplitterBolt.java    |  2 +-
 .../apache/metron/enrichment/bolt/SplitBolt.java   |  2 +-
 4 files changed, 4 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/52a4924b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
index 3a063d3..772dfc6 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBolt.java
@@ -61,26 +61,11 @@ public class BulkMessageWriterBolt extends ConfiguredEnrichmentBolt {
     }
   }
 
-  private JSONObject cloneMessage(Tuple tuple) {
-    JSONObject ret = new JSONObject();
-    JSONObject message = (JSONObject) tuple.getValueByField("message");
-    try {
-      for (Iterator<Map.Entry<String, Object>> it = message.entrySet().iterator();
it.hasNext(); ) {
-        Map.Entry<String, Object> kv = it.next();
-        ret.put(kv.getKey(), kv.getValue());
-      }
-    }
-    catch(ConcurrentModificationException cme) {
-      LOG.error(cme.getMessage() + "\n" + ErrorUtils.generateThreadDump(), cme);
-      throw cme;
-    }
-    return ret;
-  }
 
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
-    JSONObject message = cloneMessage(tuple);
+    JSONObject message =(JSONObject)tuple.getValueByField("message");
     String sensorType = MessageUtils.getSensorType(message);
     try
     {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/52a4924b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
index 9bb91af..8fa2b2e 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
@@ -81,7 +81,7 @@ public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
       message.remove(o);
     }
     message.put(getClass().getSimpleName().toLowerCase() + ".joiner.ts", "" + System.currentTimeMillis());
-    return (JSONObject) message.clone();
+    return  message;
   }
 
   public Map<String, List<String>> getFieldMap(String sourceType) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/52a4924b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index b599ea9..4b5c7bb 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -90,7 +90,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject>
{
             message = (JSONObject) tuple.getValueByField(messageFieldName);
             message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts",
"" + System.currentTimeMillis());
         }
-        return (JSONObject)message.clone();
+        return message;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/52a4924b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
index 7739edc..0515d29 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
@@ -64,7 +64,6 @@ public abstract class SplitBolt<T extends Cloneable> extends
   public void emit(Tuple tuple, T message) {
     if (message == null) return;
     String key = getKey(tuple, message);
-    collector.emit("message", tuple, new Values(key, message));
     Map<String, T> streamMessageMap = splitMessage(message);
     for (String streamId : streamMessageMap.keySet()) {
       T streamMessage = streamMessageMap.get(streamId);
@@ -73,6 +72,7 @@ public abstract class SplitBolt<T extends Cloneable> extends
       }
       collector.emit(streamId, new Values(key, streamMessage));
     }
+    collector.emit("message", tuple, new Values(key, message));
     collector.ack(tuple);
     emitOther(tuple, message);
   }


Mime
View raw message