eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject incubator-eagle git commit: [EAGEL-587] : AlertEngine : simplify state-based dedup to have only deupvalue for given dedup key
Date Tue, 04 Oct 2016 16:07:53 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master c5279cab1 -> 9339c2c90


[EAGEL-587] : AlertEngine : simplify state-based dedup to have only deupvalue for given dedup
key

Author: ralphsu

This closes #472


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9339c2c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9339c2c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9339c2c9

Branch: refs/heads/master
Commit: 9339c2c90932d9bc9708f68939d0d5528f163303
Parents: c5279ca
Author: Ralph, Su <suliangfei@gmail.com>
Authored: Wed Oct 5 00:07:24 2016 +0800
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Wed Oct 5 00:07:24 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/coordinator/Publishment.java   |  9 +++
 .../alert/engine/model/AlertStreamEvent.java    | 18 +----
 .../eagle/alert/engine/model/StreamEvent.java   | 13 ++-
 .../engine/publisher/dedup/DedupCache.java      | 84 +++++++++++++-------
 .../engine/publisher/dedup/DedupValue.java      | 26 +++++-
 .../publisher/dedup/MongoDedupEventsStore.java  | 25 +++---
 .../publisher/dedup/TransformerUtils.java       | 27 +++----
 .../publisher/impl/AbstractPublishPlugin.java   | 13 ++-
 .../publisher/impl/AlertKafkaPublisher.java     | 23 +++---
 .../publisher/impl/DefaultDeduplicator.java     | 23 +++---
 .../publisher/dedup/DedupCacheStoreTest.java    |  2 +-
 .../engine/publisher/dedup/DedupCacheTest.java  |  2 +-
 .../dedup/DefaultDedupWithoutStateTest.java     |  2 +-
 .../dedup/DefaultDeduplicatorTest.java          |  2 +-
 .../publisher/dedup/MongoDedupStoreTest.java    | 10 ++-
 .../publisher/dedup/TestDeduplicator.java       |  2 +-
 16 files changed, 160 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index bb00291..1843d8f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -35,6 +35,7 @@ public class Publishment {
     private String dedupIntervalMin;
     private List<String> dedupFields;
     private String dedupStateField;
+    private String dedupStateCloseValue;
     private OverrideDeduplicatorSpec overrideDeduplicator;
     private Map<String, String> properties;
     // the class name to extend the IEventSerializer interface
@@ -56,6 +57,14 @@ public class Publishment {
         this.dedupStateField = dedupStateField;
     }
 
+    public String getDedupStateCloseValue() {
+        return dedupStateCloseValue;
+    }
+
+    public void setDedupStateCloseValue(String dedupStateCloseValue) {
+        this.dedupStateCloseValue = dedupStateCloseValue;
+    }
+
     public OverrideDeduplicatorSpec getOverrideDeduplicator() {
         return overrideDeduplicator;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index 5dc99cf..a503dcf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -16,10 +16,9 @@
  */
 package org.apache.eagle.alert.engine.model;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.utils.DateTimeUtil;
-
+import org.apache.commons.lang3.StringUtils;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -34,21 +33,6 @@ public class AlertStreamEvent extends StreamEvent {
     private String createdBy;
     private long createdTime;
 
-    public AlertStreamEvent() {
-    }
-
-    public AlertStreamEvent(AlertStreamEvent event) {
-        this.policyId = event.policyId;
-        this.schema = event.schema;
-        this.createdBy = event.createdBy;
-        this.createdTime = event.createdTime;
-        this.setTimestamp(event.getTimestamp());
-        this.setData(new Object[event.data.length]);
-        System.arraycopy(event.data, 0, this.data, 0, event.data.length);
-        this.setStreamId(event.getStreamId());
-        this.setMetaVersion(event.getMetaVersion());
-    }
-
     public void setPolicyId(String policyId) {
         this.policyId = policyId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
index 693050d..d91b001 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
@@ -16,11 +16,10 @@
  */
 package org.apache.eagle.alert.engine.model;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.utils.DateTimeUtil;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,10 +32,10 @@ import java.util.Objects;
 public class StreamEvent implements Serializable {
     private static final long serialVersionUID = 2765116509856609763L;
 
-    protected String streamId;
-    protected Object[] data;
-    protected long timestamp;
-    protected String metaVersion;
+    private String streamId;
+    private Object[] data;
+    private long timestamp;
+    private String metaVersion;
 
     public StreamEvent() {
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index b332047..2080441 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -42,6 +42,7 @@ public class DedupCache {
     private static final long CACHE_MAX_EVENT_QUEUE_SIZE = 10;
 
     public static final String DEDUP_COUNT = "dedupCount";
+    public static final String DOC_ID = "docId";
     public static final String DEDUP_FIRST_OCCURRENCE = "dedupFirstOccurrenceTime";
 
     private static final DedupEventsStoreType type = DedupEventsStoreType.Mongo;
@@ -118,8 +119,9 @@ public class DedupCache {
     }
 
     public List<AlertStreamEvent> dedup(AlertStreamEvent event, EventUniq eventEniq,
-                                        String dedupStateField, String stateFieldValue) {
-        DedupValue[] dedupValues = this.addOrUpdate(eventEniq, stateFieldValue);
+                                        String dedupStateField, String stateFieldValue,
+                                        String stateCloseValue) {
+        DedupValue[] dedupValues = this.addOrUpdate(eventEniq, event, stateFieldValue, stateCloseValue);
         if (dedupValues != null) {
             // any of dedupValues won't be null
             if (dedupValues.length == 2) {
@@ -136,14 +138,14 @@ public class DedupCache {
         return null;
     }
 
-    public synchronized DedupValue[] addOrUpdate(EventUniq eventEniq, String stateFieldValue)
{
+    public synchronized DedupValue[] addOrUpdate(EventUniq eventEniq, AlertStreamEvent event,
String stateFieldValue, String stateCloseValue) {
         Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = this.getEvents();
         if (!events.containsKey(eventEniq)
             || (events.containsKey(eventEniq)
-            && events.get(eventEniq).size() > 0
-            && !StringUtils.equalsIgnoreCase(stateFieldValue,
-            events.get(eventEniq).getLast().getStateFieldValue()))) {
-            DedupValue[] dedupValues = this.add(eventEniq, stateFieldValue);
+                && events.get(eventEniq).size() > 0
+                && !StringUtils.equalsIgnoreCase(stateFieldValue,
+                    events.get(eventEniq).getLast().getStateFieldValue()))) {
+            DedupValue[] dedupValues = this.add(eventEniq, event, stateFieldValue, stateCloseValue);
             return dedupValues;
         } else {
             // update count
@@ -152,34 +154,23 @@ public class DedupCache {
         }
     }
 
-    private DedupValue[] add(EventUniq eventEniq, String stateFieldValue) {
+    private DedupValue[] add(EventUniq eventEniq, AlertStreamEvent event, String stateFieldValue,
String stateCloseValue) {
         DedupValue dedupValue = null;
-        DedupValue lastDedupValue = null;
         if (!events.containsKey(eventEniq)) {
-            dedupValue = new DedupValue();
-            dedupValue.setFirstOccurrence(eventEniq.timestamp);
-            dedupValue.setStateFieldValue(stateFieldValue);
-            ConcurrentLinkedDeque<DedupValue> dedupValues = new ConcurrentLinkedDeque<DedupValue>();
+            dedupValue = createDedupValue(eventEniq, event, stateFieldValue);
+            ConcurrentLinkedDeque<DedupValue> dedupValues = new ConcurrentLinkedDeque<>();
             dedupValues.add(dedupValue);
             // skip the event which put failed due to concurrency
             events.put(eventEniq, dedupValues);
             LOG.info("{} Add new dedup key {}, and value {}", this.publishName, eventEniq,
dedupValues);
         } else if (!StringUtils.equalsIgnoreCase(stateFieldValue,
-            events.get(eventEniq).getLast().getStateFieldValue())) {
-            lastDedupValue = events.get(eventEniq).getLast();
-            dedupValue = new DedupValue();
-            dedupValue.setFirstOccurrence(eventEniq.timestamp);
-            dedupValue.setStateFieldValue(stateFieldValue);
-            ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
-            if (dedupValues.size() > CACHE_MAX_EVENT_QUEUE_SIZE) {
-                dedupValues = new ConcurrentLinkedDeque<DedupValue>();
-                dedupValues.add(lastDedupValue);
-                LOG.info("{} Reset dedup key {} to value {} since meets maximum {}",
-                    this.publishName, eventEniq, dedupValue, CACHE_MAX_EVENT_QUEUE_SIZE);
-            }
-            dedupValues.add(dedupValue);
+                events.get(eventEniq).getLast().getStateFieldValue())) {
+            // existing a de-dup value, try update or reset
+            DedupValue lastDedupValue = events.get(eventEniq).getLast();
+            dedupValue = updateDedupValue(lastDedupValue, eventEniq, event, stateFieldValue,
stateCloseValue);
             LOG.info("{} Update dedup key {}, and value {}", this.publishName, eventEniq,
dedupValue);
         }
+
         if (dedupValue != null) {
             DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, this.config,
this.publishName);
             accessor.add(eventEniq, events.get(eventEniq));
@@ -189,11 +180,43 @@ public class DedupCache {
         if (dedupValue == null) {
             return null;
         }
-        if (lastDedupValue != null) {
-            return new DedupValue[] {lastDedupValue, dedupValue};
+        return new DedupValue[] {dedupValue};
+    }
+
+    private DedupValue updateDedupValue(DedupValue lastDedupValue, EventUniq eventEniq, AlertStreamEvent
event, String stateFieldValue, String stateCloseValue) {
+        if (lastDedupValue.getFirstOccurrence() >= eventEniq.timestamp) {
+            // if dedup value happens later then event, dedup state changes.
+            return null;
+        }
+
+        if (lastDedupValue.getStateFieldValue().equals(stateCloseValue)
+                && eventEniq.timestamp < lastDedupValue.getCloseTime()) {
+            DedupValue dv = createDedupValue(eventEniq, event, stateFieldValue);
+            lastDedupValue.resetTo(dv);
         } else {
-            return new DedupValue[] {dedupValue};
+            // update lastDedupValue, set closeTime when close
+            lastDedupValue.setStateFieldValue(stateFieldValue);
+            if (stateFieldValue.equals(stateCloseValue)) {
+                lastDedupValue.setCloseTime(eventEniq.timestamp); // when close an event,
set closeTime for further check
+            }
         }
+        return lastDedupValue;
+    }
+
+    private DedupValue createDedupValue(EventUniq eventEniq, AlertStreamEvent event, String
stateFieldValue) {
+        DedupValue dedupValue;
+        dedupValue = new DedupValue();
+        dedupValue.setFirstOccurrence(eventEniq.timestamp);
+        int idx = event.getSchema().getColumnIndex(DOC_ID);
+        if (idx >= 0 ) {
+            dedupValue.setDocId(event.getData()[idx].toString());
+        } else {
+            dedupValue.setDocId("");
+        }
+        dedupValue.setCount(1);
+        dedupValue.setCloseTime(0);
+        dedupValue.setStateFieldValue(stateFieldValue);
+        return dedupValue;
     }
 
     public void persistUpdatedEventUniq(EventUniq eventEniq) {
@@ -251,6 +274,9 @@ public class DedupCache {
             if (Objects.equal(colName, DEDUP_FIRST_OCCURRENCE)) {
                 event.getData()[i] = dedupValue.getFirstOccurrence();
             }
+            if (Objects.equal(colName, DOC_ID)) {
+                event.getData()[i] = dedupValue.getDocId();
+            }
         }
         return event;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
index ec1af0d..c1b3573 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupValue.java
@@ -23,12 +23,18 @@ public class DedupValue {
     private long firstOccurrence;
     private String stateFieldValue;
     private long count;
+    private long closeTime;
+    private String docId;
 
     public DedupValue() {
     }
 
-    public DedupValue(String stateFieldValue) {
-        this.stateFieldValue = stateFieldValue;
+    public void resetTo(DedupValue dv) {
+        this.docId = dv.docId;
+        this.firstOccurrence = dv.firstOccurrence;
+        this.count = dv.count;
+        this.closeTime = dv.closeTime;
+        this.stateFieldValue = dv.stateFieldValue;
     }
 
     public long getCount() {
@@ -39,6 +45,14 @@ public class DedupValue {
         this.count = count;
     }
 
+    public String getDocId() {
+        return docId;
+    }
+
+    public void setDocId(String docId) {
+        this.docId = docId;
+    }
+
     public long getFirstOccurrence() {
         return firstOccurrence;
     }
@@ -47,6 +61,14 @@ public class DedupValue {
         this.firstOccurrence = firstOccurence;
     }
 
+    public void setCloseTime(long closeTime) {
+        this.closeTime = closeTime;
+    }
+
+    public long getCloseTime() {
+        return closeTime;
+    }
+
     public String getStateFieldValue() {
         return stateFieldValue;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
index 4140793..35281bf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
@@ -16,20 +16,6 @@
  */
 package org.apache.eagle.alert.engine.publisher.dedup;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.bson.BsonDocument;
-import org.bson.BsonInt32;
-import org.bson.BsonInt64;
-import org.bson.BsonString;
-import org.bson.Document;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.mongodb.Block;
@@ -40,6 +26,15 @@ import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.IndexOptions;
 import com.mongodb.client.model.InsertOneOptions;
 import com.typesafe.config.Config;
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.bson.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 public class MongoDedupEventsStore implements DedupEventsStore {
 
@@ -47,6 +42,7 @@ public class MongoDedupEventsStore implements DedupEventsStore {
 
     public static final String DEDUP_ID = "dedupId";
     public static final String DEDUP_STREAM_ID = "streamId";
+    public static final String DOC_ID = "docId";
     public static final String DEDUP_POLICY_ID = "policyId";
     public static final String DEDUP_CREATE_TIME = "createdTime";
     public static final String DEDUP_TIMESTAMP = "timestamp";
@@ -56,6 +52,7 @@ public class MongoDedupEventsStore implements DedupEventsStore {
     public static final String DEDUP_STATE_FIELD_VALUE = "stateFieldValue";
     public static final String DEDUP_COUNT = "count";
     public static final String DEDUP_FIRST_OCCURRENCE = "firstOccurrence";
+    public static final String DEDUP_CLOSE_TIME = "closeTime";
     public static final String DEDUP_PUBLISH_ID = "publishId";
 
     private static final ObjectMapper mapper = new ObjectMapper();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
index 5c18867..1ce6898 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
@@ -16,19 +16,15 @@
  */
 package org.apache.eagle.alert.engine.publisher.dedup;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.bson.*;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.bson.BsonArray;
-import org.bson.BsonBoolean;
-import org.bson.BsonDocument;
-import org.bson.BsonInt64;
-import org.bson.BsonString;
-
 public class TransformerUtils {
 
     public static final String MAP_KEY = "key";
@@ -64,6 +60,10 @@ public class TransformerUtils {
                     MongoDedupEventsStore.DEDUP_COUNT).getValue());
                 dedupValue.setFirstOccurrence(dedupValuesDoc.getInt64(
                     MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE).getValue());
+                dedupValue.setCloseTime(dedupValuesDoc.getInt64(
+                    MongoDedupEventsStore.DEDUP_CLOSE_TIME).getValue());
+                dedupValue.setDocId(dedupValuesDoc.getString(
+                    MongoDedupEventsStore.DOC_ID).getValue());
                 dedupValues.add(dedupValue);
             }
             String publishId = doc.getString(MongoDedupEventsStore.DEDUP_PUBLISH_ID).getValue();
@@ -96,12 +96,11 @@ public class TransformerUtils {
             List<BsonDocument> dedupValuesDocs = new ArrayList<BsonDocument>();
             for (DedupValue dedupValue : entity.getDedupValues()) {
                 BsonDocument dedupValuesDoc = new BsonDocument();
-                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE,
-                    new BsonString(dedupValue.getStateFieldValue()));
-                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_COUNT,
-                    new BsonInt64(dedupValue.getCount()));
-                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE,
-                    new BsonInt64(dedupValue.getFirstOccurrence()));
+                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_STATE_FIELD_VALUE, new BsonString(dedupValue.getStateFieldValue()));
+                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_COUNT, new BsonInt64(dedupValue.getCount()));
+                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE,new BsonInt64(dedupValue.getFirstOccurrence()));
+                dedupValuesDoc.put(MongoDedupEventsStore.DEDUP_CLOSE_TIME, new BsonInt64(dedupValue.getCloseTime()));
+                dedupValuesDoc.put(MongoDedupEventsStore.DOC_ID, new BsonString(dedupValue.getDocId()));
                 dedupValuesDocs.add(dedupValuesDoc);
             }
             doc.put(MongoDedupEventsStore.DEDUP_VALUES, new BsonArray(dedupValuesDocs));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index 585d92b..f2030da 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -16,10 +16,8 @@
  */
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.codec.IEventSerializer;
 import org.apache.eagle.alert.engine.coordinator.OverrideDeduplicatorSpec;
@@ -31,8 +29,9 @@ import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
 import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator;
 import org.slf4j.Logger;
 
-import com.google.common.base.Joiner;
-import com.typesafe.config.Config;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * @since Jun 3, 2016.
@@ -73,7 +72,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin
{
             }
         } else {
             this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(),
-                publishment.getDedupFields(), publishment.getDedupStateField(), dedupCache);
+                publishment.getDedupFields(), publishment.getDedupStateField(), publishment.getDedupStateCloseValue(),
dedupCache);
             this.pubName = publishment.getName();
         }
         String serializerClz = publishment.getSerializer();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index cd39ab6..e5c351b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -18,7 +18,14 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import com.typesafe.config.Config;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
@@ -27,13 +34,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import com.typesafe.config.Config;
 
 public class AlertKafkaPublisher extends AbstractPublishPlugin {
 
@@ -76,10 +77,8 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
             LOG.warn("Namespace column {} is not found, the found index {} is invalid",
                 namespaceLabel, namespaceColumnIndex);
         } else {
-            // copy raw event to be duped
-            AlertStreamEvent newEvent = new AlertStreamEvent(event);
-            newEvent.getData()[namespaceColumnIndex] = namespaceValue;
-            outputEvents.add(newEvent);
+            event.getData()[namespaceColumnIndex] = namespaceValue;
+            outputEvents.add(event);
         }
 
         List<AlertStreamEvent> dedupResults = dedup(event);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
index 54ff346..2d99208 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -17,12 +17,8 @@
  */
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
@@ -32,8 +28,11 @@ import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 public class DefaultDeduplicator implements AlertDeduplicator {
 
@@ -42,6 +41,7 @@ public class DefaultDeduplicator implements AlertDeduplicator {
     private long dedupIntervalSec;
     private List<String> customDedupFields = new ArrayList<>();
     private String dedupStateField;
+    private String dedupStateCloseValue;
 
     private DedupCache dedupCache;
 
@@ -60,7 +60,7 @@ public class DefaultDeduplicator implements AlertDeduplicator {
     }
 
     public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
-                               String dedupStateField, DedupCache dedupCache) {
+                               String dedupStateField, String dedupStateCloseValue, DedupCache
dedupCache) {
         setDedupIntervalMin(intervalMin);
         if (customDedupFields != null) {
             this.customDedupFields = customDedupFields;
@@ -68,6 +68,9 @@ public class DefaultDeduplicator implements AlertDeduplicator {
         if (StringUtils.isNotBlank(dedupStateField)) {
             this.dedupStateField = dedupStateField;
         }
+        if (StringUtils.isNoneBlank(dedupStateCloseValue)) {
+            this.dedupStateCloseValue = dedupStateCloseValue;
+        }
         this.dedupCache = dedupCache;
 
         withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
@@ -94,7 +97,7 @@ public class DefaultDeduplicator implements AlertDeduplicator {
             }
             return Arrays.asList(event);
         }
-        return dedupCache.dedup(event, key, dedupStateField, stateFiledValue);
+        return dedupCache.dedup(event, key, dedupStateField, stateFiledValue, dedupStateCloseValue);
     }
 
     public List<AlertStreamEvent> dedup(AlertStreamEvent event) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
index 54aedb8..25518de 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
@@ -53,7 +53,7 @@ public class DedupCacheStoreTest extends MongoDependencyBaseTest {
 		System.setProperty("config.resource", "/application-mongo-statestore.conf");
 		Config config = ConfigFactory.load();
 		DedupCache cache = new DedupCache(config, "testPublishment");
-		cache.addOrUpdate(eventUniq, (String) event.getData()[event.getSchema().getColumnIndex("state")]);
+		cache.addOrUpdate(eventUniq, event, (String) event.getData()[event.getSchema().getColumnIndex("state")],
"closed");
 		
 		DedupEventsStore accessor = DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo,
config, "testPublishment");
 		Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = accessor.getEvents();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
index d3dc717..d1c8457 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -71,7 +71,7 @@ public class DedupCacheTest {
 			List<AlertStreamEvent> result = dedupCache.dedup(event, 
 					new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), dedupFieldValues),

 					"state", 
-					(String) event.getData()[event.getSchema().getColumnIndex("state")]);
+					(String) event.getData()[event.getSchema().getColumnIndex("state")], "closed");
 			System.out.println((i + 1) + " >>>> " + ToStringBuilder.reflectionToString(result));
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
index 456b1ea..4d42c12 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
@@ -44,7 +44,7 @@ public class DefaultDedupWithoutStateTest {
 		Config config = ConfigFactory.load();
 		DedupCache dedupCache = new DedupCache(config, "testPublishment");
 		DefaultDeduplicator deduplicator = new DefaultDeduplicator(
-				"PT10S", Arrays.asList(new String[] { "alertKey" }), null, dedupCache);
+				"PT10S", Arrays.asList(new String[] { "alertKey" }), null, null, dedupCache);
 		
 		StreamDefinition stream = createStream();
 		PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 0556e3d..72aef16 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -42,7 +42,7 @@ public class DefaultDeduplicatorTest extends MongoDependencyBaseTest {
 		Config config = ConfigFactory.load();
 		DedupCache dedupCache = new DedupCache(config, "testPublishment");
 		DefaultDeduplicator deduplicator = new DefaultDeduplicator(
-				"PT1M", Arrays.asList(new String[] { "alertKey" }), "state", dedupCache);
+				"PT1M", Arrays.asList(new String[] { "alertKey" }), "state", "close", dedupCache);
 		
 		StreamDefinition stream = createStream();
 		PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
index dc05342..ed44267 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
@@ -16,15 +16,15 @@
  */
 package org.apache.eagle.alert.engine.publisher.dedup;
 
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.junit.Assert;
+import org.junit.Test;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.junit.Assert;
-import org.junit.Test;
-
 public class MongoDedupStoreTest extends MongoDependencyBaseTest {
 
 	@Test
@@ -44,6 +44,8 @@ public class MongoDedupStoreTest extends MongoDependencyBaseTest {
 		DedupValue one = new DedupValue();
 		one.setStateFieldValue("OPEN");
 		one.setCount(2);
+		one.setCloseTime(0);
+		one.setDocId("doc-id-...");
 		one.setFirstOccurrence(System.currentTimeMillis());
 		dedupStateValues.add(one);
 		store.add(eventEniq, dedupStateValues);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9339c2c9/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
index e60ed2c..247f332 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
@@ -71,7 +71,7 @@ public class TestDeduplicator extends ExtendedDeduplicator {
         LOG.info("event key: " + eventkey);
         LOG.info("dedup field: " + this.getDedupStateField());
         LOG.info("dedup value: " + stateFiledValue);
-        List<AlertStreamEvent> result = this.getDedupCache().dedup(event, eventkey,
this.getDedupStateField(), stateFiledValue);
+        List<AlertStreamEvent> result = this.getDedupCache().dedup(event, eventkey,
this.getDedupStateField(), stateFiledValue, "closed");
         return result;
     }
 



Mime
View raw message