eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject eagle git commit: [EAGLE-1038] Support alertDuplication customization for each policy
Date Thu, 08 Jun 2017 02:21:12 GMT
Repository: eagle
Updated Branches:
  refs/heads/master 1db33df5a -> e2fbb8613


[EAGLE-1038] Support alertDuplication customization for each policy

https://issues.apache.org/jira/browse/EAGLE-1038

* support duplication check for each outputStream of a policy
* compatible with the duplication check in old versions (check in a publisher)

Author: Zhao, Qingwen <qingwzhao@apache.org>

Closes #944 from qingwen220/minor.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/e2fbb861
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/e2fbb861
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/e2fbb861

Branch: refs/heads/master
Commit: e2fbb861389ebfeb7d0c96743f7a79b185857e43
Parents: 1db33df
Author: Zhao, Qingwen <qingwzhao@apache.org>
Authored: Thu Jun 8 10:21:04 2017 +0800
Committer: Zhao, Qingwen <qingwzhao@apache.org>
Committed: Thu Jun 8 10:21:04 2017 +0800

----------------------------------------------------------------------
 .../engine/coordinator/AlertDeduplication.java  |  13 +-
 .../engine/coordinator/PolicyDefinition.java    |  70 ++++----
 .../alert/engine/model/AlertStreamEvent.java    |   9 +
 .../coordinator/AlertDeduplicationTest.java     |  47 +++++
 .../engine/publisher/dedup/DedupCache.java      |   1 -
 .../engine/publisher/dedup/DedupEntity.java     |   2 -
 .../alert/engine/publisher/dedup/DedupKey.java  |  71 ++++++++
 .../publisher/dedup/DefaultDeduplicator.java    | 178 +++++++++++++++++++
 .../alert/engine/publisher/dedup/EventUniq.java |  82 +++++++++
 .../publisher/impl/AbstractPublishPlugin.java   |   5 +-
 .../publisher/impl/DefaultDeduplicator.java     | 173 ------------------
 .../alert/engine/publisher/impl/EventUniq.java  |  83 ---------
 .../alert/engine/runner/AlertPublisherBolt.java |  42 +++--
 .../engine/publisher/dedup/DedupCacheTest.java  |   1 -
 .../engine/publisher/dedup/DedupKeyTest.java    |  56 ++++++
 .../dedup/DefaultDedupWithoutStateTest.java     |   1 -
 .../dedup/DefaultDeduplicatorTest.java          |   1 -
 .../publisher/dedup/TestDeduplicator.java       |   1 -
 .../engine/router/TestAlertPublisherBolt.java   |  83 ++++++++-
 19 files changed, 596 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
index 78fef7a..d47d7d0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java
@@ -24,9 +24,18 @@ import java.util.List;
 import java.util.Objects;
 
 public class AlertDeduplication {
+    private String outputStreamId;
     private String dedupIntervalMin;
     private List<String> dedupFields;
 
+    public String getOutputStreamId() {
+        return outputStreamId;
+    }
+
+    public void setOutputStreamId(String outputStreamId) {
+        this.outputStreamId = outputStreamId;
+    }
+
     public String getDedupIntervalMin() {
         return dedupIntervalMin;
     }
@@ -46,6 +55,7 @@ public class AlertDeduplication {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
+                .append(outputStreamId)
                 .append(dedupFields)
                 .append(dedupIntervalMin)
                 .build();
@@ -61,7 +71,8 @@ public class AlertDeduplication {
         }
         AlertDeduplication another = (AlertDeduplication) that;
         if (ListUtils.isEqualList(another.dedupFields, this.dedupFields)
-                && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)) {
+                && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)
+                && Objects.equals(another.outputStreamId, this.outputStreamId)) {
             return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 5004513..698605e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -43,7 +43,7 @@ public class PolicyDefinition implements Serializable {
     private Definition stateDefinition;
     private PolicyStatus policyStatus = PolicyStatus.ENABLED;
     private AlertDefinition alertDefinition;
-    private AlertDeduplication deduplication;
+    private List<AlertDeduplication> alertDeduplications = new ArrayList<>();
 
     // one stream only have one partition in one policy, since we don't support stream alias
     private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>();
@@ -136,6 +136,38 @@ public class PolicyDefinition implements Serializable {
         this.policyStatus = policyStatus;
     }
 
+    public List<AlertDeduplication> getAlertDeduplications() {
+        return alertDeduplications;
+    }
+
+    public void setAlertDeduplications(List<AlertDeduplication> alertDeduplications) {
+        this.alertDeduplications = alertDeduplications;
+    }
+
+    public AlertDefinition getAlertDefinition() {
+        return alertDefinition;
+    }
+
+    public void setAlertDefinition(AlertDefinition alertDefinition) {
+        this.alertDefinition = alertDefinition;
+    }
+
+    public AlertSeverity getAlertSeverity() {
+        return alertDefinition == null ? null : alertDefinition.getSeverity();
+    }
+
+    public String getAlertCategory() {
+        return alertDefinition == null ? null : alertDefinition.getCategory();
+    }
+
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
@@ -148,7 +180,7 @@ public class PolicyDefinition implements Serializable {
                 .append(policyStatus)
                 .append(parallelismHint)
                 .append(alertDefinition)
-                .append(deduplication)
+                .append(alertDeduplications)
                 .build();
     }
 
@@ -175,44 +207,12 @@ public class PolicyDefinition implements Serializable {
                 && another.policyStatus.equals(this.policyStatus)
                 && another.parallelismHint == this.parallelismHint
                 && Objects.equals(another.alertDefinition, alertDefinition)
-                && Objects.equals(another.deduplication, deduplication)) {
+                && CollectionUtils.isEqualCollection(another.alertDeduplications, alertDeduplications)) {
             return true;
         }
         return false;
     }
 
-    public AlertDefinition getAlertDefinition() {
-        return alertDefinition;
-    }
-
-    public void setAlertDefinition(AlertDefinition alertDefinition) {
-        this.alertDefinition = alertDefinition;
-    }
-
-    public AlertSeverity getAlertSeverity() {
-        return alertDefinition == null ? null : alertDefinition.getSeverity();
-    }
-
-    public String getAlertCategory() {
-        return alertDefinition == null ? null : alertDefinition.getCategory();
-    }
-
-    public String getSiteId() {
-        return siteId;
-    }
-
-    public void setSiteId(String siteId) {
-        this.siteId = siteId;
-    }
-
-    public AlertDeduplication getDeduplication() {
-        return deduplication;
-    }
-
-    public void setDeduplication(AlertDeduplication deduplication) {
-        this.deduplication = deduplication;
-    }
-
     @JsonIgnoreProperties(ignoreUnknown = true)
     public static class Definition implements Serializable {
         private static final long serialVersionUID = -622366527887848346L;

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index 00170df..3079f77 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -39,6 +39,7 @@ public class AlertStreamEvent extends StreamEvent {
     private long createdTime;
     private String category;
     private AlertSeverity severity = AlertSeverity.WARNING;
+    private boolean duplicationChecked = false;
 
     // ----------------------
     // Lazy Alert Fields
@@ -187,4 +188,12 @@ public class AlertStreamEvent extends StreamEvent {
     public void setSiteId(String siteId) {
         this.siteId = siteId;
     }
+
+    public boolean isDuplicationChecked() {
+        return duplicationChecked;
+    }
+
+    public void setDuplicationChecked(boolean duplicationChecked) {
+        this.duplicationChecked = duplicationChecked;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java
new file mode 100644
index 0000000..59da244
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class AlertDeduplicationTest {
+
+    @Test
+    public void testEqual() {
+        AlertDeduplication deduplication1 = new AlertDeduplication();
+        deduplication1.setDedupIntervalMin("1");
+        deduplication1.setOutputStreamId("stream");
+
+        AlertDeduplication deduplication2 = new AlertDeduplication();
+        deduplication2.setDedupIntervalMin("1");
+        deduplication2.setOutputStreamId("stream");
+        deduplication2.setDedupFields(new ArrayList<>());
+
+        Assert.assertFalse(deduplication1.equals(deduplication2));
+
+        AlertDeduplication deduplication3 = new AlertDeduplication();
+        deduplication3.setDedupFields(new ArrayList<>());
+        deduplication3.setOutputStreamId("stream");
+        deduplication3.setDedupIntervalMin("1");
+
+        Assert.assertTrue(deduplication3.equals(deduplication2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index abb83d6..96eeffd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
index 86bc9b3..e666c64 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
@@ -20,8 +20,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-
 public class DedupEntity {
 
     private String publishName;

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java
new file mode 100644
index 0000000..f6b03a4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java
@@ -0,0 +1,71 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.Objects;
+
+public class DedupKey {
+    private String policyId;
+    private String outputStreamId;
+
+    public DedupKey(String policyId, String outputStreamId) {
+        this.policyId = policyId;
+        this.outputStreamId = outputStreamId;
+    }
+
+    public String getPolicyId() {
+        return policyId;
+    }
+
+    public void setPolicyId(String policyId) {
+        this.policyId = policyId;
+    }
+
+    public String getOutputStreamId() {
+        return outputStreamId;
+    }
+
+    public void setOutputStreamId(String outputStreamId) {
+        this.outputStreamId = outputStreamId;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj instanceof DedupKey) {
+            DedupKey au = (DedupKey) obj;
+            return Objects.equals(au.getOutputStreamId(), this.outputStreamId)
+                    && Objects.equals(au.getPolicyId(), this.policyId);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(outputStreamId).append(policyId).build();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("DedupKey[outputStreamId: %s, policyId: %s]", outputStreamId, policyId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java
new file mode 100644
index 0000000..2307d7a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java
@@ -0,0 +1,178 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.AlertDeduplication;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+public class DefaultDeduplicator implements AlertDeduplicator {
+
+    private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
+
+    private long dedupIntervalSec;
+    private List<String> customDedupFields = new ArrayList<>();
+    private String dedupStateField;
+    private String dedupStateCloseValue;
+    private AlertDeduplication alertDeduplication = null;
+
+    private DedupCache dedupCache;
+
+    private Cache<EventUniq, String> withoutStatesCache;
+
+    public DefaultDeduplicator(AlertDeduplication alertDeduplication) {
+        this.alertDeduplication = alertDeduplication;
+        this.customDedupFields = alertDeduplication.getDedupFields();
+        try {
+            this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60;
+        } catch (Exception e) {
+            LOG.error("de-duplication intervalSec {} parse error, use 30 min instead", alertDeduplication.getDedupIntervalMin(), e.getMessage());
+            this.dedupIntervalSec = 1800;
+        }
+        this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
+                this.dedupIntervalSec, TimeUnit.SECONDS).build();
+
+        LOG.info("initialize DefaultDeduplicator with dedupIntervalSec={}, customDedupFields={}", dedupIntervalSec, customDedupFields);
+    }
+
+    public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
+                               String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) {
+        setDedupIntervalMin(intervalMin);
+        if (customDedupFields != null) {
+            this.customDedupFields = customDedupFields;
+        }
+        if (StringUtils.isNotBlank(dedupStateField)) {
+            this.dedupStateField = dedupStateField;
+        }
+        if (StringUtils.isNotBlank(dedupStateCloseValue)) {
+            this.dedupStateCloseValue = dedupStateCloseValue;
+        }
+        this.dedupCache = dedupCache;
+
+        withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
+            this.dedupIntervalSec, TimeUnit.SECONDS).build();
+
+        LOG.info("initialize DefaultDeduplicator with dedupIntervalSec={}, customDedupFields={}", dedupIntervalSec, customDedupFields);
+    }
+
+    /*
+     * @param key
+     * @return
+     */
+    private List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) {
+        if (StringUtils.isBlank(stateFiledValue)) {
+            // without state field, we cannot determine whether it is duplicated
+            // without custom filed values, we cannot determine whether it is duplicated
+            synchronized (withoutStatesCache) {
+                if (withoutStatesCache != null && withoutStatesCache.getIfPresent(key) != null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Alert event {} with key {} is skipped since it is duplicated", event, key);
+                    }
+                    return null;
+                } else if (withoutStatesCache != null) {
+                    withoutStatesCache.put(key, "");
+                }
+            }
+            return Arrays.asList(event);
+        }
+        return dedupCache.dedup(event, key, dedupStateField, stateFiledValue, dedupStateCloseValue);
+    }
+
+    public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
+        if (event == null) {
+            return null;
+        }
+        if (dedupIntervalSec <= 0) {
+            return Collections.singletonList(event);
+        }
+
+        // check custom field, and get the field values
+        StreamDefinition streamDefinition = event.getSchema();
+        HashMap<String, String> customFieldValues = new HashMap<>();
+        String stateFiledValue = null;
+        for (int i = 0; i < event.getData().length; i++) {
+            if (i > streamDefinition.getColumns().size()) {
+                if (LOG.isWarnEnabled()) {
+                    LOG.warn("output column does not found for event data, this indicate code error!");
+                }
+                continue;
+            }
+            String colName = streamDefinition.getColumns().get(i).getName();
+            Object colValue = event.getData()[i];
+
+            if (colName.equals(dedupStateField) && colValue != null) {
+                stateFiledValue = colValue.toString();
+            }
+
+            // make all of the field as unique key if no custom dedup field provided
+            if (colValue != null) {
+                if (customDedupFields == null || customDedupFields.size() <= 0) {
+                    if (streamDefinition.getColumns().get(i).getType().equals(StreamColumn.Type.STRING)) {
+                        customFieldValues.put(colName, colValue.toString());
+                    }
+                } else {
+                    for (String field : customDedupFields) {
+                        if (colName.equals(field)) {
+                            customFieldValues.put(field, colValue.toString());
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        List<AlertStreamEvent> outputEvents = checkDedup(event, new EventUniq(event.getStreamId(),
+            event.getPolicyId(), event.getCreatedTime(), customFieldValues), stateFiledValue);
+        if (outputEvents != null && outputEvents.size() > 0) {
+            return outputEvents;
+        } else if (LOG.isInfoEnabled()) {
+            LOG.info("Alert event is skipped because it's duplicated: {}", event.toString());
+        }
+        return null;
+    }
+
+    @Override
+    public void setDedupIntervalMin(String newDedupIntervalMin) {
+        if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
+            dedupIntervalSec = 0;
+            return;
+        }
+        try {
+            Period period = Period.parse(newDedupIntervalMin);
+            this.dedupIntervalSec = period.toStandardSeconds().getSeconds();
+        } catch (Exception e) {
+            LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e);
+            this.dedupIntervalSec = 0;
+        }
+    }
+
+    public AlertDeduplication getAlertDeduplication() {
+        return alertDeduplication;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java
new file mode 100644
index 0000000..1434bb7
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java
@@ -0,0 +1,82 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import com.google.common.base.Joiner;
+
+import java.util.HashMap;
+
+/**
+ * @since Mar 19, 2015.
+ */
+public class EventUniq {
+    public String streamId;
+    public String policyId;
+    public Long timestamp;     // event's createTimestamp
+    public long createdTime; // created time, for cache removal;
+    public HashMap<String, String> customFieldValues;
+    public boolean removable = false;
+
+    public EventUniq(String streamId, String policyId, long timestamp) {
+        this.streamId = streamId;
+        this.timestamp = timestamp;
+        this.policyId = policyId;
+        this.createdTime = System.currentTimeMillis();
+    }
+
+    public EventUniq(String streamId, String policyId, long timestamp, HashMap<String, String> customFieldValues) {
+        this.streamId = streamId;
+        this.timestamp = timestamp;
+        this.policyId = policyId;
+        this.createdTime = System.currentTimeMillis();
+        this.customFieldValues = customFieldValues;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof EventUniq) {
+            EventUniq au = (EventUniq) obj;
+            boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId);
+            if (this.customFieldValues != null && au.customFieldValues != null) {
+                result = result & this.customFieldValues.equals(au.customFieldValues);
+            }
+            return result;
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId);
+
+        if (customFieldValues != null) {
+            builder.append(customFieldValues);
+        }
+        return builder.build();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("EventUniq[streamId: %s, policyId: %s, timestamp: %s, removable: %s, customFieldValues: %s]",
+            streamId, policyId, timestamp, removable, Joiner.on(",").withKeyValueSeparator(">").join(customFieldValues));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index c5c9e04..771f736 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -26,6 +26,7 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
+import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator;
 import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator;
 import org.slf4j.Logger;
 
@@ -93,7 +94,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
             }
             serializer = (IEventSerializer) obj;
         } catch (Exception e) {
-            getLogger().error(String.format("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage()), e);
+            getLogger().error("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage(), e);
             serializer = new StringEventSerializer(conf);
         }
     }
@@ -105,7 +106,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
 
     @Override
     public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
-        if (null != deduplicator) {
+        if (null != deduplicator && !event.isDuplicationChecked()) {
             return deduplicator.dedup(event);
         } else {
             return Collections.singletonList(event);

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
deleted file mode 100644
index 54d551e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.engine.coordinator.AlertDeduplication;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class DefaultDeduplicator implements AlertDeduplicator {
-
-    private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
-
-    private long dedupIntervalSec;
-    private List<String> customDedupFields = new ArrayList<>();
-    private String dedupStateField;
-    private String dedupStateCloseValue;
-
-    private DedupCache dedupCache;
-
-    private Cache<EventUniq, String> withoutStatesCache;
-
-    public DefaultDeduplicator() {
-        this.dedupIntervalSec = 0;
-    }
-
-    public DefaultDeduplicator(String intervalMin) {
-        setDedupIntervalMin(intervalMin);
-    }
-
-    public DefaultDeduplicator(long intervalMin) {
-        this.dedupIntervalSec = intervalMin;
-    }
-
-    public DefaultDeduplicator(AlertDeduplication alertDeduplication) {
-        this.customDedupFields = alertDeduplication.getDedupFields();
-        this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60;
-        this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
-                this.dedupIntervalSec, TimeUnit.SECONDS).build();
-    }
-
-    public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
-                               String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) {
-        setDedupIntervalMin(intervalMin);
-        if (customDedupFields != null) {
-            this.customDedupFields = customDedupFields;
-        }
-        if (StringUtils.isNotBlank(dedupStateField)) {
-            this.dedupStateField = dedupStateField;
-        }
-        if (StringUtils.isNotBlank(dedupStateCloseValue)) {
-            this.dedupStateCloseValue = dedupStateCloseValue;
-        }
-        this.dedupCache = dedupCache;
-
-        withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
-            this.dedupIntervalSec, TimeUnit.SECONDS).build();
-    }
-
-    /*
-     * @param key
-     * @return
-     */
-    private List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) {
-        if (StringUtils.isBlank(stateFiledValue)) {
-            // without state field, we cannot determine whether it is duplicated
-            // without custom filed values, we cannot determine whether it is duplicated
-            synchronized (withoutStatesCache) {
-                if (withoutStatesCache != null && withoutStatesCache.getIfPresent(key) != null) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Alert event {} with key {} is skipped since it is duplicated", event, key);
-                    }
-                    return null;
-                } else if (withoutStatesCache != null) {
-                    withoutStatesCache.put(key, "");
-                }
-            }
-            return Arrays.asList(event);
-        }
-        return dedupCache.dedup(event, key, dedupStateField, stateFiledValue, dedupStateCloseValue);
-    }
-
-    public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
-        if (event == null) {
-            return null;
-        }
-        // check custom field, and get the field values
-        StreamDefinition streamDefinition = event.getSchema();
-        HashMap<String, String> customFieldValues = new HashMap<>();
-        String stateFiledValue = null;
-        for (int i = 0; i < event.getData().length; i++) {
-            if (i > streamDefinition.getColumns().size()) {
-                if (LOG.isWarnEnabled()) {
-                    LOG.warn("output column does not found for event data, this indicate code error!");
-                }
-                continue;
-            }
-            String colName = streamDefinition.getColumns().get(i).getName();
-            Object colValue = event.getData()[i];
-
-            if (colName.equals(dedupStateField) && colValue != null) {
-                stateFiledValue = colValue.toString();
-            }
-
-            // make all of the field as unique key if no custom dedup field provided
-            if (colValue != null) {
-                if (customDedupFields == null || customDedupFields.size() <= 0) {
-                    customFieldValues.put(colName, colValue.toString());
-                } else {
-                    for (String field : customDedupFields) {
-                        if (colName.equals(field)) {
-                            customFieldValues.put(field, colValue.toString());
-                            break;
-                        }
-                    }
-                }
-            }
-        }
-
-        List<AlertStreamEvent> outputEvents = checkDedup(event, new EventUniq(event.getStreamId(),
-            event.getPolicyId(), event.getCreatedTime(), customFieldValues), stateFiledValue);
-        if (outputEvents != null && outputEvents.size() > 0) {
-            return outputEvents;
-        } else if (LOG.isInfoEnabled()) {
-            LOG.info("Alert event is skipped because it's duplicated: {}", event.toString());
-        }
-        return null;
-    }
-
-    @Override
-    public void setDedupIntervalMin(String newDedupIntervalMin) {
-        if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
-            dedupIntervalSec = 0;
-            return;
-        }
-        try {
-            Period period = Period.parse(newDedupIntervalMin);
-            this.dedupIntervalSec = period.toStandardSeconds().getSeconds();
-        } catch (Exception e) {
-            LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e);
-            this.dedupIntervalSec = 0;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
deleted file mode 100644
index 511abcd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-/**
- *
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import com.google.common.base.Joiner;
-
-import java.util.HashMap;
-
-/**
- * @since Mar 19, 2015.
- */
-public class EventUniq {
-    public String streamId;
-    public String policyId;
-    public Long timestamp;     // event's createTimestamp
-    public long createdTime; // created time, for cache removal;
-    public HashMap<String, String> customFieldValues;
-    public boolean removable = false;
-
-    public EventUniq(String streamId, String policyId, long timestamp) {
-        this.streamId = streamId;
-        this.timestamp = timestamp;
-        this.policyId = policyId;
-        this.createdTime = System.currentTimeMillis();
-    }
-
-    public EventUniq(String streamId, String policyId, long timestamp, HashMap<String, String> customFieldValues) {
-        this.streamId = streamId;
-        this.timestamp = timestamp;
-        this.policyId = policyId;
-        this.createdTime = System.currentTimeMillis();
-        this.customFieldValues = customFieldValues;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof EventUniq) {
-            EventUniq au = (EventUniq) obj;
-            boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId);
-            if (this.customFieldValues != null && au.customFieldValues != null) {
-                result = result & this.customFieldValues.equals(au.customFieldValues);
-            }
-            return result;
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId);
-
-        if (customFieldValues != null) {
-            builder.append(customFieldValues);
-        }
-        return builder.build();
-    }
-
-    @Override
-    public String toString() {
-        return String.format("EventUniq[streamId: %s, policyId: %s, timestamp: %s, removable: %s, customFieldValues: %s]",
-            streamId, policyId, timestamp, removable, Joiner.on(",").withKeyValueSeparator(">").join(customFieldValues));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index d6829d6..39c577c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -28,22 +28,17 @@ import org.apache.eagle.alert.engine.StreamContextImpl;
 import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
-import org.apache.eagle.alert.engine.publisher.PipeStreamFilter;
+import org.apache.eagle.alert.engine.publisher.*;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupKey;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
+import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator;
 import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
 import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -53,7 +48,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
     private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
     private volatile Map<String, PolicyDefinition> policyDefinitionMap;
     private volatile Map<String, StreamDefinition> streamDefinitionMap;
-    private volatile Map<String, DefaultDeduplicator> deduplicatorMap = new ConcurrentHashMap<>();
+    private volatile Map<DedupKey, DefaultDeduplicator> deduplicatorMap = new ConcurrentHashMap<>();
     private AlertTemplateEngine alertTemplateEngine;
 
     private boolean logEventEnabled;
@@ -90,13 +85,16 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
             if (logEventEnabled) {
                 LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event);
             }
-            if (deduplicatorMap != null && deduplicatorMap.containsKey(event.getPolicyId())) {
-                List<AlertStreamEvent> eventList = deduplicatorMap.get(event.getPolicyId()).dedup(event);
+            DedupKey dedupKey = new DedupKey(event.getPolicyId(), event.getStreamId());
+            if (deduplicatorMap != null && deduplicatorMap.containsKey(dedupKey)) {
+                List<AlertStreamEvent> eventList = deduplicatorMap.get(dedupKey).dedup(event);
                 if (eventList == null || eventList.isEmpty()) {
                     collector.ack(input);
                     return;
                 }
+                event.setDuplicationChecked(true);
             }
+
             AlertStreamEvent filteredEvent = alertFilter.filter(event);
             if (filteredEvent != null) {
                 alertPublisher.nextEvent(partition, filteredEvent);
@@ -161,8 +159,15 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
         for (Map.Entry<String, PolicyDefinition> entry : pds.entrySet()) {
             try {
                 this.alertTemplateEngine.register(entry.getValue());
-                if (entry.getValue().getDeduplication() != null) {
-                    this.deduplicatorMap.put(entry.getKey(), new DefaultDeduplicator(entry.getValue().getDeduplication()));
+                List<AlertDeduplication> alertDeduplications = entry.getValue().getAlertDeduplications();
+                if (alertDeduplications != null && alertDeduplications.size() > 0) {
+                    for (AlertDeduplication deduplication : alertDeduplications) {
+                        DedupKey dedupKey = new DedupKey(entry.getKey(), deduplication.getOutputStreamId());
+                        if (!deduplicatorMap.containsKey(dedupKey)
+                                || !deduplicatorMap.get(dedupKey).getAlertDeduplication().equals(deduplication)) {
+                            deduplicatorMap.put(dedupKey, new DefaultDeduplicator(deduplication));
+                        }
+                    }
                 }
             } catch (Throwable throwable) {
                 LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable);
@@ -172,8 +177,10 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
         for (String policyId : policyToRemove) {
             try {
                 this.alertTemplateEngine.unregister(policyId);
-                if (deduplicatorMap != null && deduplicatorMap.containsKey(policyId)) {
-                    deduplicatorMap.remove(policyId);
+                for (DedupKey dedupKey : deduplicatorMap.keySet()) {
+                    if (dedupKey.getPolicyId().equals(policyId)) {
+                        deduplicatorMap.remove(dedupKey);
+                    }
                 }
             } catch (Throwable throwable) {
                 LOG.error("Failed to unregister policy {} from template engine", policyId, throwable);
@@ -231,4 +238,5 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
             return this.alertTemplateEngine.filter(event);
         }
     }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
index 5bf0410..95e679d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -27,7 +27,6 @@ import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java
new file mode 100644
index 0000000..7fc886d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DedupKeyTest {
+
+    @Test
+    public void test() {
+        Map<DedupKey, Integer> testMap = new HashMap<>();
+        DedupKey key1 = new DedupKey("policy1", "stream1");
+        update(testMap, key1);
+        update(testMap, key1);
+
+        DedupKey key2 = new DedupKey("policy2", "stream2");
+        update(testMap, key2);
+
+        Assert.assertTrue(testMap.get(key1) == 1);
+        Assert.assertTrue(testMap.get(key2) == 0);
+
+        DedupKey key3 = new DedupKey("policy1", "stream1");
+        update(testMap, key3);
+
+        Assert.assertTrue(testMap.get(key3) == 2);
+    }
+
+    private void update(Map<DedupKey, Integer> map, DedupKey key) {
+        if (map.containsKey(key)) {
+            map.put(key, map.get(key) + 1);
+        } else {
+            map.put(key, 0);
+        }
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
index c48df9a..f839474 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
@@ -22,7 +22,6 @@ import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 297b790..96da4c9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -22,7 +22,6 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
index 247f332..51d054c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
@@ -18,7 +18,6 @@ package org.apache.eagle.alert.engine.publisher.dedup;
 
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.junit.Ignore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 46517fe..2cd2183 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,22 +18,23 @@
 
 package org.apache.eagle.alert.engine.router;
 
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupKey;
+import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
+import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
 import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
 import org.apache.eagle.alert.engine.runner.MapComparator;
 import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
@@ -300,4 +301,76 @@ public class TestAlertPublisherBolt {
             pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event3);
 
     }
+
+    @Test
+    public void testOnAlertPolicyChange() throws IllegalAccessException, NoSuchFieldException {
+        AlertDeduplication deduplication = new AlertDeduplication();
+        deduplication.setDedupIntervalMin("1");
+        deduplication.setOutputStreamId("stream");
+
+        PolicyDefinition policy1 = new PolicyDefinition();
+        policy1.setName("policy1");
+        policy1.getAlertDeduplications().add(deduplication);
+
+        Map<String, PolicyDefinition> pds1 = new HashMap<>();
+        pds1.put("policy1", policy1);
+
+        PolicyDefinition policy2 = new PolicyDefinition();
+        policy2.setName("policy2");
+        policy2.getAlertDeduplications().add(deduplication);
+
+        Map<String, PolicyDefinition> pds2 = new HashMap<>();
+        pds2.put("policy2", policy2);
+
+        AlertPublisherBolt bolt = new AlertPublisherBolt("publisher", null, null);
+
+        Field field = AlertPublisherBolt.class.getDeclaredField("alertTemplateEngine");
+        field.setAccessible(true);
+        AlertTemplateEngine engine = AlertTemplateProvider.createAlertTemplateEngine();
+        engine.init(null);
+        field.set(bolt, engine);
+
+        DedupKey dedupKey1 = new DedupKey("policy1", "stream");
+        DedupKey dedupKey2 = new DedupKey("policy2", "stream");
+        bolt.onAlertPolicyChange(pds1, null);
+        Map<DedupKey, DefaultDeduplicator> deduplicatorMap = getAlertDeduplicator(bolt);
+        Assert.assertTrue(deduplicatorMap.containsKey(dedupKey1));
+
+        // remove policy1 and add policy2
+        bolt.onAlertPolicyChange(pds2, null);
+        deduplicatorMap = getAlertDeduplicator(bolt);
+        Assert.assertTrue(deduplicatorMap.containsKey(dedupKey2));
+        Assert.assertFalse(deduplicatorMap.containsKey(dedupKey1));
+
+        // add new policy policy1 in pds2
+        pds2.put("policy1", policy1);
+        bolt.onAlertPolicyChange(pds2, null);
+        deduplicatorMap = getAlertDeduplicator(bolt);
+        Assert.assertTrue(deduplicatorMap.containsKey(dedupKey1));
+        Assert.assertTrue(deduplicatorMap.containsKey(dedupKey2));
+        Assert.assertTrue(deduplicatorMap.get(dedupKey1).getAlertDeduplication()
+                .equals(deduplicatorMap.get(dedupKey2).getAlertDeduplication()));
+
+        // update policy1 alertDeduplication
+        AlertDeduplication deduplication1 = new AlertDeduplication();
+        deduplication1.setOutputStreamId("stream");
+        deduplication1.setDedupIntervalMin("2");
+        policy1.getAlertDeduplications().clear();
+        policy1.getAlertDeduplications().add(deduplication1);
+        pds2.put("policy1", policy1);
+        bolt.onAlertPolicyChange(pds2, null);
+        deduplicatorMap = getAlertDeduplicator(bolt);
+        Assert.assertTrue(deduplicatorMap.containsKey(new DedupKey("policy2", "stream")));
+        Assert.assertTrue(deduplicatorMap.containsKey(new DedupKey("policy1", "stream")));
+        Assert.assertFalse(deduplicatorMap.get(dedupKey1).getAlertDeduplication()
+                .equals(deduplicatorMap.get(dedupKey2).getAlertDeduplication()));
+
+    }
+
+
+    private Map<DedupKey, DefaultDeduplicator> getAlertDeduplicator(AlertPublisherBolt bolt) throws NoSuchFieldException, IllegalAccessException {
+        Field field = AlertPublisherBolt.class.getDeclaredField("deduplicatorMap");
+        field.setAccessible(true);
+        return (Map<DedupKey, DefaultDeduplicator>) field.get(bolt);
+    }
 }


Mime
View raw message