eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject incubator-eagle git commit: [EAGLE-586]: AlertEngine: multiple routing of alerts is broken because of event not copied
Date Mon, 03 Oct 2016 21:16:35 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 4f348e3dc -> c5279cab1


[EAGLE-586]: AlertEngine: multiple routing of alerts is broken because of event not copied

Author: ralphsu

This closes #471


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c5279cab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c5279cab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c5279cab

Branch: refs/heads/master
Commit: c5279cab10f47fdf289f698faedfa1a365cbff58
Parents: 4f348e3
Author: Ralph, Su <suliangfei@gmail.com>
Authored: Tue Oct 4 05:12:37 2016 +0800
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Tue Oct 4 05:16:36 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/model/AlertStreamEvent.java    | 18 ++++++++++++++-
 .../eagle/alert/engine/model/StreamEvent.java   | 13 ++++++-----
 .../publisher/impl/AlertKafkaPublisher.java     | 23 ++++++++++----------
 3 files changed, 36 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5279cab/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index a503dcf..5dc99cf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -16,9 +16,10 @@
  */
 package org.apache.eagle.alert.engine.model;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.apache.commons.lang3.StringUtils;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -33,6 +34,21 @@ public class AlertStreamEvent extends StreamEvent {
     private String createdBy;
     private long createdTime;
 
+    public AlertStreamEvent() {
+    }
+
+    public AlertStreamEvent(AlertStreamEvent event) {
+        this.policyId = event.policyId;
+        this.schema = event.schema;
+        this.createdBy = event.createdBy;
+        this.createdTime = event.createdTime;
+        this.setTimestamp(event.getTimestamp());
+        this.setData(new Object[event.data.length]);
+        System.arraycopy(event.data, 0, this.data, 0, event.data.length);
+        this.setStreamId(event.getStreamId());
+        this.setMetaVersion(event.getMetaVersion());
+    }
+
     public void setPolicyId(String policyId) {
         this.policyId = policyId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5279cab/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
index d91b001..693050d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
@@ -16,10 +16,11 @@
  */
 package org.apache.eagle.alert.engine.model;
 
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.utils.DateTimeUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,10 +33,10 @@ import java.util.Objects;
 public class StreamEvent implements Serializable {
     private static final long serialVersionUID = 2765116509856609763L;
 
-    private String streamId;
-    private Object[] data;
-    private long timestamp;
-    private String metaVersion;
+    protected String streamId;
+    protected Object[] data;
+    protected long timestamp;
+    protected String metaVersion;
 
     public StreamEvent() {
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5279cab/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index e5c351b..cd39ab6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -18,14 +18,7 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
@@ -34,7 +27,13 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class AlertKafkaPublisher extends AbstractPublishPlugin {
 
@@ -77,8 +76,10 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
             LOG.warn("Namespace column {} is not found, the found index {} is invalid",
                 namespaceLabel, namespaceColumnIndex);
         } else {
-            event.getData()[namespaceColumnIndex] = namespaceValue;
-            outputEvents.add(event);
+            // copy raw event to be duped
+            AlertStreamEvent newEvent = new AlertStreamEvent(event);
+            newEvent.getData()[namespaceColumnIndex] = namespaceValue;
+            outputEvents.add(newEvent);
         }
 
         List<AlertStreamEvent> dedupResults = dedup(event);


Mime
View raw message