eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [1/2] incubator-eagle git commit: EAGLE-576: dedup enhancements
Date Wed, 28 Sep 2016 08:05:00 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master afb897940 -> a1285351d


EAGLE-576: dedup enhancements

1. per publishment per cache, without global cache
2. support dedup without state field
3. emit raw alert into configured namespace


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/eb262861
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/eb262861
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/eb262861

Branch: refs/heads/master
Commit: eb26286126c34272967d1159d07b0d645157c61a
Parents: 1fa490e
Author: Xiancheng Li <xiancheng.li@ebay.com>
Authored: Wed Sep 28 08:54:43 2016 +0800
Committer: Xiancheng Li <xiancheng.li@ebay.com>
Committed: Wed Sep 28 14:25:01 2016 +0800

----------------------------------------------------------------------
 .../engine/publisher/PublishConstants.java      |   3 +
 .../engine/publisher/dedup/DedupCache.java      |  63 ++++---
 .../publisher/impl/AbstractPublishPlugin.java   |   3 +-
 .../publisher/impl/AlertKafkaPublisher.java     |  29 ++-
 .../publisher/impl/DefaultDeduplicator.java     |  32 +++-
 .../publisher/dedup/DedupCacheStoreTest.java    |   2 +-
 .../engine/publisher/dedup/DedupCacheTest.java  |   2 +-
 .../dedup/DefaultDedupWithoutStateTest.java     | 183 +++++++++++++++++++
 .../dedup/DefaultDeduplicatorTest.java          |   2 +-
 9 files changed, 270 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
index ce57a6e..7408779 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
@@ -50,5 +50,8 @@ public class PublishConstants {
     public static final String ALERT_EMAIL_TIMESTAMP = "alertTime";
     public static final String ALERT_EMAIL_POLICY = "policyId";
     public static final String ALERT_EMAIL_CREATOR = "creator";
+    
+    public static final String RAW_ALERT_NAMESPACE_LABEL = "rawAlertNamespaceLabel";
+    public static final String RAW_ALERT_NAMESPACE_VALUE = "rawAlertNamespaceValue";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index fc2d6e6..503a1ce 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -48,38 +48,45 @@ public class DedupCache {
     private long lastUpdated = -1;
     private Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = new ConcurrentHashMap<EventUniq,
ConcurrentLinkedDeque<DedupValue>>();
 
+    private static final ConcurrentLinkedDeque<DedupCache> caches = new ConcurrentLinkedDeque<DedupCache>();
+
     private Config config;
 
-    private static DedupCache INSTANCE;
-
-    public static synchronized DedupCache getInstance(Config config) {
-        if (INSTANCE == null) {
-            INSTANCE = new DedupCache();
-            INSTANCE.config = config;
-
-            // create daemon to clean up old removable events periodically
-            ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(1, new
ThreadFactory() {
-                @Override
-                public Thread newThread(Runnable r) {
-                    Thread t = new Thread(r);
-                    t.setDaemon(true);
-                    return t;
-                }
-            });
-            scheduleSrv.scheduleAtFixedRate(new Runnable() {
-                @Override
-                public void run() {
-                    HashSet<EventUniq> eventUniqs = new HashSet<EventUniq>(INSTANCE.getEvents().keySet());
-                    for (EventUniq one : eventUniqs) {
-                        if (one.removable && one.createdTime < System.currentTimeMillis()
- 3600000 * 24) {
-                            INSTANCE.removeEvent(one);
-                            LOG.info("Remove dedup key {} from cache & db", one);
+    public DedupCache(Config config) {
+        this.config = config;
+        // only happens during startup, won't introduce perf issue here
+        synchronized (caches) {
+            if (caches.size() == 0) {
+                // create daemon to clean up old removable events periodically
+                ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(1,
new ThreadFactory() {
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        Thread t = new Thread(r);
+                        t.setDaemon(true);
+                        return t;
+                    }
+                });
+                scheduleSrv.scheduleAtFixedRate(new Runnable() {
+                    @Override
+                    public void run() {
+                        for (DedupCache cache : caches) {
+                            if (cache == null || cache.getEvents() == null) {
+                                continue;
+                            }
+                            HashSet<EventUniq> eventUniqs = new HashSet<EventUniq>(cache.getEvents().keySet());
+                            for (EventUniq one : eventUniqs) {
+                                if (one.removable && one.createdTime < System.currentTimeMillis()
- 3600000 * 24) {
+                                    cache.removeEvent(one);
+                                    LOG.info("Remove dedup key {} from cache & db", one);
+                                }
+                            }
                         }
                     }
-                }
-            }, 5, 60, TimeUnit.MINUTES);
+                }, 5, 60, TimeUnit.MINUTES);
+            }
+            caches.add(this);
         }
-        return INSTANCE;
+        LOG.info("Create daemon to clean up old removable events periodicall");
     }
 
     public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
@@ -201,7 +208,7 @@ public class DedupCache {
             DedupValue dedupValue = dedupValues.getLast();
             dedupValue.setCount(dedupValue.getCount() + 1);
             String updateMsg = String.format(
-                "Update count for dedup key {}, value %s and count %s", eventEniq,
+                "Update count for dedup key %s, value %s and count %s", eventEniq,
                 dedupValue.getStateFieldValue(), dedupValue.getCount());
             if (dedupValue.getCount() > 0 && dedupValue.getCount() % 100 == 0)
{
                 LOG.info(updateMsg);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index 743ce91..3e293cd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -47,8 +47,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin
{
     @SuppressWarnings("rawtypes")
     @Override
     public void init(Config config, Publishment publishment, Map conf) throws Exception {
-        DedupCache dedupCache = DedupCache.getInstance(config);
-
+        DedupCache dedupCache = new DedupCache(config);
         OverrideDeduplicatorSpec spec = publishment.getOverrideDeduplicator();
         if (spec != null && StringUtils.isNotBlank(spec.getClassName())) {
             try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index 27314bf..e5c351b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,17 +29,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import com.typesafe.config.Config;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import com.typesafe.config.Config;
 
 public class AlertKafkaPublisher extends AbstractPublishPlugin {
 
@@ -49,6 +45,8 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
     private KafkaProducer producer;
     private String brokerList;
     private String topic;
+    private String namespaceLabel;
+    private String namespaceValue;
 
     @Override
     @SuppressWarnings("rawtypes")
@@ -60,6 +58,8 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
             brokerList = kafkaConfig.get(PublishConstants.BROKER_LIST).trim();
             producer = KafkaProducerManager.INSTANCE.getProducer(brokerList, kafkaConfig);
             topic = kafkaConfig.get(PublishConstants.TOPIC).trim();
+            namespaceLabel = kafkaConfig.getOrDefault(PublishConstants.RAW_ALERT_NAMESPACE_LABEL,
"namespace");
+            namespaceValue = kafkaConfig.getOrDefault(PublishConstants.RAW_ALERT_NAMESPACE_VALUE,
"network");
         }
     }
 
@@ -70,9 +70,20 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
             LOG.warn("KafkaProducer is null due to the incorrect configurations");
             return;
         }
-        List<AlertStreamEvent> outputEvents = dedup(event);
-        if (outputEvents == null) {
-            return;
+        List<AlertStreamEvent> outputEvents = new ArrayList<AlertStreamEvent>();
+
+        int namespaceColumnIndex = event.getSchema().getColumnIndex(namespaceLabel);
+        if (namespaceColumnIndex < 0 || namespaceColumnIndex >= event.getData().length)
{
+            LOG.warn("Namespace column {} is not found, the found index {} is invalid",
+                namespaceLabel, namespaceColumnIndex);
+        } else {
+            event.getData()[namespaceColumnIndex] = namespaceValue;
+            outputEvents.add(event);
+        }
+
+        List<AlertStreamEvent> dedupResults = dedup(event);
+        if (dedupResults != null) {
+            outputEvents.addAll(dedupResults);
         }
         PublishStatus status = new PublishStatus();
         try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
index d284da5..54ff346 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -31,19 +32,23 @@ import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
 public class DefaultDeduplicator implements AlertDeduplicator {
 
     private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
 
-    @SuppressWarnings("unused")
-    private long dedupIntervalMin;
+    private long dedupIntervalSec;
     private List<String> customDedupFields = new ArrayList<>();
     private String dedupStateField;
 
     private DedupCache dedupCache;
 
+    private Cache<EventUniq, String> withoutStatesCache;
+
     public DefaultDeduplicator() {
-        this.dedupIntervalMin = 0;
+        this.dedupIntervalSec = 0;
     }
 
     public DefaultDeduplicator(String intervalMin) {
@@ -51,7 +56,7 @@ public class DefaultDeduplicator implements AlertDeduplicator {
     }
 
     public DefaultDeduplicator(long intervalMin) {
-        this.dedupIntervalMin = intervalMin;
+        this.dedupIntervalSec = intervalMin;
     }
 
     public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
@@ -64,6 +69,9 @@ public class DefaultDeduplicator implements AlertDeduplicator {
             this.dedupStateField = dedupStateField;
         }
         this.dedupCache = dedupCache;
+
+        withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
+            this.dedupIntervalSec, TimeUnit.SECONDS).build();
     }
 
     /*
@@ -74,6 +82,16 @@ public class DefaultDeduplicator implements AlertDeduplicator {
         if (StringUtils.isBlank(stateFiledValue)) {
             // without state field, we cannot determine whether it is duplicated
             // without custom filed values, we cannot determine whether it is duplicated
+            synchronized (withoutStatesCache) {
+                if (withoutStatesCache != null && withoutStatesCache.getIfPresent(key)
!= null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Alert event {} with key {} is skipped since it is duplicated",
event, key);
+                    }
+                    return null;
+                } else if (withoutStatesCache != null) {
+                    withoutStatesCache.put(key, "");
+                }
+            }
             return Arrays.asList(event);
         }
         return dedupCache.dedup(event, key, dedupStateField, stateFiledValue);
@@ -126,15 +144,15 @@ public class DefaultDeduplicator implements AlertDeduplicator {
     @Override
     public void setDedupIntervalMin(String newDedupIntervalMin) {
         if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
-            dedupIntervalMin = 0;
+            dedupIntervalSec = 0;
             return;
         }
         try {
             Period period = Period.parse(newDedupIntervalMin);
-            this.dedupIntervalMin = period.toStandardMinutes().getMinutes();
+            this.dedupIntervalSec = period.toStandardSeconds().getSeconds();
         } catch (Exception e) {
             LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead",
e);
-            this.dedupIntervalMin = 0;
+            this.dedupIntervalSec = 0;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
index c830aca..5e56bb7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
@@ -50,7 +50,7 @@ public class DedupCacheStoreTest extends MongoDependencyBaseTest {
 		
 		System.setProperty("config.resource", "/application-mongo-statestore.conf");
 		Config config = ConfigFactory.load();
-		DedupCache cache = DedupCache.getInstance(config);
+		DedupCache cache = new DedupCache(config);
 		cache.addOrUpdate(eventUniq, (String) event.getData()[event.getSchema().getColumnIndex("state")]);
 		
 		DedupEventsStore accessor = DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo,
config);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
index 779abb0..e996376 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -55,7 +55,7 @@ public class DedupCacheTest {
 	@Test
 	public void testNormal() throws Exception {
 		Config config = ConfigFactory.load();
-		DedupCache dedupCache = DedupCache.getInstance(config);
+		DedupCache dedupCache = new DedupCache(config);
 		
 		StreamDefinition stream = createStream();
 		PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
new file mode 100644
index 0000000..57c113b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class DefaultDedupWithoutStateTest {
+
+	@Test
+	public void testNormal() throws Exception {
+		//String intervalMin, List<String> customDedupFields, String dedupStateField, String
dedupStateCloseValue
+		// assume state: OPEN, WARN, CLOSE
+		System.setProperty("config.resource", "/application-mongo-statestore.conf");
+		Config config = ConfigFactory.load();
+		DedupCache dedupCache = new DedupCache(config);
+		DefaultDeduplicator deduplicator = new DefaultDeduplicator(
+				"PT10S", Arrays.asList(new String[] { "alertKey" }), null, dedupCache);
+		
+		StreamDefinition stream = createStream();
+		PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
+		
+		int[] hostIndex = new int[] { 1, 2, 3 };
+		String[] states = new String[] { "OPEN", "WARN", "CLOSE" };
+		Random random = new Random();
+		
+		final ConcurrentLinkedDeque<AlertStreamEvent> nonDedupResult = new ConcurrentLinkedDeque<AlertStreamEvent>();
+		
+		for (int i = 0; i < 100; i ++) {
+			new Thread(new Runnable() {
+
+				@Override
+				public void run() {
+					int index = hostIndex[random.nextInt(3)];
+					AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
+							System.currentTimeMillis(), "host" + index, 
+							String.format("testPolicy-host%s-01", index), 
+							states[random.nextInt(3)], 0, 0
+					});
+					List<AlertStreamEvent> result = deduplicator.dedup(e1);
+					if (result != null) {
+						System.out.println(">>>" + Joiner.on(",").join(result));
+						nonDedupResult.addAll(result);
+					} else {
+						System.out.println(">>>" + result);
+					}
+				}
+				
+			}).start();
+		}
+		
+		Thread.sleep(1000);
+		
+		System.out.println("old size: " + nonDedupResult.size());
+		Assert.assertTrue(nonDedupResult.size() > 0 && nonDedupResult.size() <= 3);
+		
+		Thread.sleep(15000);
+		
+		for (int i = 0; i < 100; i ++) {
+			new Thread(new Runnable() {
+
+				@Override
+				public void run() {
+					int index = hostIndex[random.nextInt(3)];
+					AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
+							System.currentTimeMillis(), "host" + index, 
+							String.format("testPolicy-host%s-01", index), 
+							states[random.nextInt(3)], 0, 0
+					});
+					List<AlertStreamEvent> result = deduplicator.dedup(e1);
+					if (result != null) {
+						System.out.println(">>>" + Joiner.on(",").join(result));
+						nonDedupResult.addAll(result);
+					} else {
+						System.out.println(">>>" + result);
+					}
+				}
+				
+			}).start();
+		}
+		
+		Thread.sleep(1000);
+		
+		System.out.println("new size: " + nonDedupResult.size());
+		Assert.assertTrue(nonDedupResult.size() > 3 && nonDedupResult.size() <= 6);
+	}
+	
+	private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[]
data) {
+		AlertStreamEvent event = new AlertStreamEvent();
+		event.setPolicyId(policy.getName());
+		event.setSchema(stream);
+		event.setStreamId(stream.getStreamId());
+		event.setTimestamp(System.currentTimeMillis());
+		event.setCreatedTime(System.currentTimeMillis());
+		event.setData(data);
+		return event;
+	}
+	
+	private StreamDefinition createStream() {
+		StreamDefinition sd = new StreamDefinition();
+		StreamColumn tsColumn = new StreamColumn();
+		tsColumn.setName("timestamp");
+		tsColumn.setType(StreamColumn.Type.LONG);
+		
+		StreamColumn hostColumn = new StreamColumn();
+		hostColumn.setName("host");
+		hostColumn.setType(StreamColumn.Type.STRING);
+		
+		StreamColumn alertKeyColumn = new StreamColumn();
+		alertKeyColumn.setName("alertKey");
+		alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+		StreamColumn stateColumn = new StreamColumn();
+		stateColumn.setName("state");
+		stateColumn.setType(StreamColumn.Type.STRING);
+		
+		// dedupCount, dedupFirstOccurrence
+		
+		StreamColumn dedupCountColumn = new StreamColumn();
+		dedupCountColumn.setName("dedupCount");
+		dedupCountColumn.setType(StreamColumn.Type.LONG);
+		
+		StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+		dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
+		dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+		
+		sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn,
dedupFirstOccurrenceColumn));
+		sd.setDataSource("testDatasource");
+		sd.setStreamId("testStream");
+		sd.setDescription("test stream");
+		return sd;
+	}
+	
+	private PolicyDefinition createPolicy(String streamName, String policyName) {
+		PolicyDefinition pd = new PolicyDefinition();
+		PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+		//expression, something like "PT5S,dynamic,1,host"
+		def.setValue("test");
+		def.setType("siddhi");
+		pd.setDefinition(def);
+		pd.setInputStreams(Arrays.asList("inputStream"));
+		pd.setOutputStreams(Arrays.asList("outputStream"));
+		pd.setName(policyName);
+		pd.setDescription(String.format("Test policy for stream %s", streamName));
+		
+		StreamPartition sp = new StreamPartition();
+		sp.setStreamId(streamName);
+		sp.setColumns(Arrays.asList("host"));
+		sp.setType(StreamPartition.Type.GROUPBY);
+		pd.addPartition(sp);
+		return pd;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eb262861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 81ba35e..3a17e53 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -40,7 +40,7 @@ public class DefaultDeduplicatorTest extends MongoDependencyBaseTest {
 		// assume state: OPEN, WARN, CLOSE
 		System.setProperty("config.resource", "/application-mongo-statestore.conf");
 		Config config = ConfigFactory.load();
-		DedupCache dedupCache = DedupCache.getInstance(config);
+		DedupCache dedupCache = new DedupCache(config);
 		DefaultDeduplicator deduplicator = new DefaultDeduplicator(
 				"PT1M", Arrays.asList(new String[] { "alertKey" }), "state", dedupCache);
 		


Mime
View raw message