eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinh...@apache.org
Subject incubator-eagle git commit: [EAGLE-824] Multiple policies in one alert bolt produces duplicated tuples
Date Tue, 06 Dec 2016 04:59:05 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master f833e9831 -> 9f4e7633d


[EAGLE-824] Multiple policies in one alert bolt produces duplicated tuples

Multiple policies in one alert bolt will cause each policy in this bolt produce the tuple
and emit tuple into publisher, the publisher will got multiple duplicated tuples.

Author: Xiancheng Li <xiancheng.li@ebay.com>

Closes #714 from garrettlish/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9f4e7633
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9f4e7633
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9f4e7633

Branch: refs/heads/master
Commit: 9f4e7633d939580ce640d7488ec1fdeb07c00c3e
Parents: f833e98
Author: Xiancheng Li <xiancheng.li@ebay.com>
Authored: Tue Dec 6 12:58:59 2016 +0800
Committer: wujinhu <wujinhu920@126.com>
Committed: Tue Dec 6 12:58:59 2016 +0800

----------------------------------------------------------------------
 .../evaluator/impl/AlertBoltOutputCollectorWrapper.java  |  4 ++++
 .../org/apache/eagle/alert/engine/runner/AlertBolt.java  | 11 ++++++-----
 .../eagle/alert/engine/router/CustomizedHandler.java     |  6 +++++-
 .../org/apache/eagle/app/stream/CEPFunctionTest.java     |  2 +-
 4 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index 3053e6e..cffb706 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -53,7 +53,11 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector
{
     public void emit(AlertStreamEvent event) {
         Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions);
         for (PublishPartition publishPartition : clonedPublishPartitions) {
+            // skip the publish partition which is not belong to this policy
             PublishPartition cloned = publishPartition.clone();
+            if (!cloned.getPolicyId().equalsIgnoreCase(event.getPolicyId())) {
+                continue;
+            }
             for (String column : cloned.getColumns()) {
                 int columnIndex = event.getSchema().getColumnIndex(column);
                 if (columnIndex < 0) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 02bc47e..7d66f47 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -198,16 +198,16 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         policyGroupEvaluator.onPolicyChange(comparator.getAdded(), comparator.getRemoved(),
comparator.getModified(), sds);
 
         // update alert output collector
-        Set<PublishPartition> tempPublishPartitions = new HashSet<>();
+        Set<PublishPartition> newPublishPartitions = new HashSet<>();
         spec.getPublishPartitions().forEach(p -> {
             if (newPolicies.stream().filter(o -> o.getName().equals(p.getPolicyId())).count()
> 0) {
-                tempPublishPartitions.add(p);
+                newPublishPartitions.add(p);
             }
         });
 
-        Collection<PublishPartition> addedPublishPartitions = CollectionUtils.subtract(tempPublishPartitions,
cachedPublishPartitions);
-        Collection<PublishPartition> removedPublishPartitions = CollectionUtils.subtract(cachedPublishPartitions,
tempPublishPartitions);
-        Collection<PublishPartition> modifiedPublishPartitions = CollectionUtils.intersection(tempPublishPartitions,
cachedPublishPartitions);
+        Collection<PublishPartition> addedPublishPartitions = CollectionUtils.subtract(newPublishPartitions,
cachedPublishPartitions);
+        Collection<PublishPartition> removedPublishPartitions = CollectionUtils.subtract(cachedPublishPartitions,
newPublishPartitions);
+        Collection<PublishPartition> modifiedPublishPartitions = CollectionUtils.intersection(newPublishPartitions,
cachedPublishPartitions);
 
         LOG.debug("added PublishPartition " + addedPublishPartitions);
         LOG.debug("removed PublishPartition " + removedPublishPartitions);
@@ -217,6 +217,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
 
         // switch
         cachedPolicies = newPoliciesMap;
+        cachedPublishPartitions = newPublishPartitions;
         sdf = sds;
         specVersion = spec.getVersion();
         this.spec = spec;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
index be69ffb..4d124e1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
@@ -30,6 +30,7 @@ import java.util.Map;
  */
 public class CustomizedHandler implements PolicyStreamHandler {
     private Collector<AlertStreamEvent> collector;
+    private PolicyHandlerContext context;
 
     public CustomizedHandler(Map<String, StreamDefinition> sds) {
     }
@@ -37,11 +38,14 @@ public class CustomizedHandler implements PolicyStreamHandler {
     @Override
     public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext
context) throws Exception {
         this.collector = collector;
+        this.context = context;
     }
 
     @Override
     public void send(StreamEvent event) throws Exception {
-        this.collector.emit(new AlertStreamEvent());
+	AlertStreamEvent alert = new AlertStreamEvent();
+	alert.setPolicyId(context.getPolicyDefinition().getName());
+        this.collector.emit(alert);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
index a8613df..039c087 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
@@ -54,7 +54,7 @@ public class CEPFunctionTest {
             put("name","cpu.usage");
             put("value", 0.96);
         }});
-        Assert.assertTrue("Should get result in 5 s", semaphore.tryAcquire(5, TimeUnit.SECONDS));
+        Assert.assertTrue("Should get result in 5 s", semaphore.tryAcquire(15, TimeUnit.SECONDS));
         function.close();
     }
 }


Mime
View raw message