eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject eagle git commit: [EAGLE-947] Publishers with same policy but different schema could produce duplicate alerts
Date Tue, 14 Mar 2017 07:22:10 GMT
Repository: eagle
Updated Branches:
  refs/heads/master 7e41ed0b9 -> 906692058


[EAGLE-947] Publishers with same policy but different schema could produce duplicate alerts

Assume that we have policy1 which have 2 kinds of output streams, one is stream1 and another
is stream2. If publisher1 is configured for policy1 and stream1, and publisher2 is configured
for policy1 and stream2, current code will produce 2 alerts for either stream1 or stream2.

Author: Xiancheng Li <xiancheng.li@ebay.com>

Closes #864 from garrettlish/master.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/90669205
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/90669205
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/90669205

Branch: refs/heads/master
Commit: 9066920586174c2ba967fbd2434a21fa5b8353fa
Parents: 7e41ed0
Author: Xiancheng Li <xiancheng.li@ebay.com>
Authored: Tue Mar 14 15:22:03 2017 +0800
Committer: Zhao, Qingwen <qingwzhao@apache.org>
Committed: Tue Mar 14 15:22:03 2017 +0800

----------------------------------------------------------------------
 .../impl/AlertBoltOutputCollectorWrapper.java   |  44 +++----
 .../AlertBoltOutputCollectorWrapperTest.java    | 120 +++++++++++++++++++
 .../alert/engine/router/CustomizedHandler.java  |   7 +-
 .../alert/engine/router/TestAlertBolt.java      |   4 +-
 .../engine/statecheck/TestStateCheckPolicy.java |   4 +-
 5 files changed, 151 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/90669205/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index cffb706..606ddce 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -16,19 +16,17 @@
  */
 package org.apache.eagle.alert.engine.evaluator.impl;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.coordinator.PublishPartition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.router.StreamOutputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.*;
+
 
 public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
 
@@ -53,24 +51,26 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector
{
     public void emit(AlertStreamEvent event) {
         Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions);
         for (PublishPartition publishPartition : clonedPublishPartitions) {
-            // skip the publish partition which is not belong to this policy
+            // skip the publish partition which is not belong to this policy and also check
streamId
             PublishPartition cloned = publishPartition.clone();
-            if (!cloned.getPolicyId().equalsIgnoreCase(event.getPolicyId())) {
-                continue;
-            }
-            for (String column : cloned.getColumns()) {
-                int columnIndex = event.getSchema().getColumnIndex(column);
-                if (columnIndex < 0) {
-                    LOG.warn("Column {} is not found in stream {}", column, cloned.getStreamId());
-                    continue;
-                }
-                cloned.getColumnValues().add(event.getData()[columnIndex]);
-            }
-
-            synchronized (outputLock) {
-                streamContext.counter().incr("alert_count");
-                delegate.emit(Arrays.asList(cloned, event));
-            }
+            Optional.ofNullable(event)
+                .filter(x -> x != null
+                    && x.getSchema() != null
+                    && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId())
+                    && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId())
+                    || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT)))
+                .ifPresent(x -> {
+                    cloned.getColumns().stream()
+                        .filter(y -> event.getSchema().getColumnIndex(y) >= 0
+                            && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size())
+                        .map(y -> event.getData()[event.getSchema().getColumnIndex(y)])
+                        .filter(y -> y != null)
+                        .forEach(y -> cloned.getColumnValues().add(y));
+                    synchronized (outputLock) {
+                        streamContext.counter().incr("alert_count");
+                        delegate.emit(Arrays.asList(cloned, event));
+                    }
+                });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/90669205/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java
new file mode 100644
index 0000000..9febed5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.impl;
+
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.StreamCounter;
+import org.apache.eagle.alert.engine.coordinator.PublishPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.router.StreamOutputCollector;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.mockito.Mockito.*;
+
+public class AlertBoltOutputCollectorWrapperTest {
+
+    private AlertBoltOutputCollectorWrapper alertBoltOutputCollectorWrapper;
+
+    // mock objects
+    private StreamOutputCollector outputCollector;
+    private Object outputLock;
+    private StreamContext streamContext;
+    private StreamCounter streamCounter;
+
+    private Set<PublishPartition> publishPartitions = new HashSet<>();
+
+    private static final String samplePublishId = "samplePublishId";
+    private static final String samplePublishId2 = "samplePublishId2";
+    private static final String samplePolicyId = "samplePolicyId";
+    private static final String sampleStreamId = "sampleStreamId";
+    private static final String sampleStreamId2 = "sampleStreamId2";
+
+    @Before
+    public void setUp() throws Exception {
+        outputCollector = mock(StreamOutputCollector.class);
+        outputLock = mock(Object.class);
+        streamContext = mock(StreamContext.class);
+        streamCounter = mock(StreamCounter.class);
+        alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorWrapper(outputCollector,
outputLock, streamContext);
+    }
+
+    @Before
+    public void tearDown() throws Exception {
+        alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(new HashSet<>(), publishPartitions,
new HashSet<>());
+        publishPartitions.clear();
+    }
+
+    @Test
+    public void testNormal() throws Exception {
+        doReturn(streamCounter).when(streamContext).counter();
+
+        publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId));
+        publishPartitions.add(createPublishPartition(samplePublishId2, samplePolicyId, sampleStreamId2));
+        alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(publishPartitions, new HashSet<>(),
new HashSet<>());
+
+        AlertStreamEvent event = new AlertStreamEvent();
+        event.setPolicyId(samplePolicyId);
+        StreamDefinition sd = new StreamDefinition();
+        sd.setStreamId(sampleStreamId);
+        sd.setColumns(new ArrayList<>());
+        event.setSchema(sd);
+
+        alertBoltOutputCollectorWrapper.emit(event);
+
+        verify(streamCounter, times(1)).incr(anyString());
+        verify(outputCollector, times(1)).emit(anyObject());
+    }
+
+    @Test
+    public void testExceptional() throws Exception {
+        doReturn(streamCounter).when(streamContext).counter();
+
+        publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId));
+        publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId));
+        alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(publishPartitions, new HashSet<>(),
new HashSet<>());
+
+        AlertStreamEvent event = new AlertStreamEvent();
+        event.setPolicyId(samplePolicyId);
+        StreamDefinition sd = new StreamDefinition();
+        sd.setStreamId(sampleStreamId);
+        sd.setColumns(new ArrayList<>());
+        event.setSchema(sd);
+
+        alertBoltOutputCollectorWrapper.emit(event);
+
+        verify(streamCounter, times(1)).incr(anyString());
+        verify(outputCollector, times(1)).emit(anyObject());
+    }
+
+    private PublishPartition createPublishPartition(String publishId, String policyId, String
streamId) {
+        PublishPartition publishPartition = new PublishPartition();
+        publishPartition.setPolicyId(policyId);
+        publishPartition.setStreamId(streamId);
+        publishPartition.setPublishId(publishId);
+        publishPartition.setColumns(new HashSet<>());
+        return publishPartition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/90669205/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
index 4d124e1..284abc4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
@@ -31,8 +31,10 @@ import java.util.Map;
 public class CustomizedHandler implements PolicyStreamHandler {
     private Collector<AlertStreamEvent> collector;
     private PolicyHandlerContext context;
+    private Map<String, StreamDefinition> sds;
 
     public CustomizedHandler(Map<String, StreamDefinition> sds) {
+        this.sds = sds;
     }
 
     @Override
@@ -43,8 +45,9 @@ public class CustomizedHandler implements PolicyStreamHandler {
 
     @Override
     public void send(StreamEvent event) throws Exception {
-	AlertStreamEvent alert = new AlertStreamEvent();
-	alert.setPolicyId(context.getPolicyDefinition().getName());
+        AlertStreamEvent alert = new AlertStreamEvent();
+        alert.setPolicyId(context.getPolicyDefinition().getName());
+        alert.setSchema(sds.get(event.getStreamId()));
         this.collector.emit(alert);
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/90669205/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 440f555..c9e09fd 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -364,7 +364,7 @@ public class TestAlertBolt {
         GeneralTopologyContext context = mock(GeneralTopologyContext.class);
         int taskId = 1;
         when(context.getComponentId(taskId)).thenReturn("comp1");
-        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+        when(context.getComponentOutputFields("comp1", TEST_STREAM)).thenReturn(new Fields("f0"));
         // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange)
         PartitionedEvent pe = new PartitionedEvent();
         pe.setPartitionKey(1);
@@ -377,7 +377,7 @@ public class TestAlertBolt {
 
         PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt);
         byte[] serializedEvent = peSer.serialize(pe);
-        return new TupleImpl(context, Collections.singletonList(serializedEvent), taskId,
"default");
+        return new TupleImpl(context, Collections.singletonList(serializedEvent), taskId,
TEST_STREAM);
     }
 
     private StreamPartition createPartition() {

http://git-wip-us.apache.org/repos/asf/eagle/blob/90669205/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
index f4d8303..d4fe01a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
@@ -58,7 +58,7 @@ public class TestStateCheckPolicy {
             @Override
             public List<Integer> emit(String streamId, Collection<Tuple> anchors,
List<Object> tuple) {
                 verified.set(true);
-                Assert.assertEquals("perfmon_latency_stream", ((PublishPartition) tuple.get(0)).getStreamId());
+                Assert.assertEquals("perfmon_latency_check_output2", ((PublishPartition)
tuple.get(0)).getStreamId());
                 AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
                 System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s]
", ((PublishPartition) tuple.get(0)).getStreamId(), tuple));
                 return null;
@@ -92,7 +92,7 @@ public class TestStateCheckPolicy {
         List<StreamDefinition> streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"),
                 new TypeReference<List<StreamDefinition>>() {
                 });
-        spec.addPublishPartition(streams.get(0).getStreamId(), policies.get(0).getName(),
"testPublishBolt", null);
+        spec.addPublishPartition("perfmon_latency_check_output2", policies.get(0).getName(),
"testPublishBolt", null);
         
         alertBolt.onAlertBoltSpecChange(spec, definitionMap);
 


Mime
View raw message