eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [46/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"
Date Thu, 19 Nov 2015 10:47:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/DeduplicatorConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/DeduplicatorConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/DeduplicatorConfig.java
new file mode 100644
index 0000000..584abc7
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/DeduplicatorConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+import java.io.Serializable;
+
+public class DeduplicatorConfig implements Serializable{
+	private static final long serialVersionUID = 1L;
+	public int getAlertDedupIntervalMin() {
+		return alertDedupIntervalMin;
+	}
+	public void setAlertDedupIntervalMin(int alertDedupIntervalMin) {
+		this.alertDedupIntervalMin = alertDedupIntervalMin;
+	}
+	public int getEmailDedupIntervalMin() {
+		return emailDedupIntervalMin;
+	}
+	public void setEmailDedupIntervalMin(int emailDedupIntervalMin) {
+		this.emailDedupIntervalMin = emailDedupIntervalMin;
+	}
+	private int alertDedupIntervalMin;
+	private int emailDedupIntervalMin;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
new file mode 100644
index 0000000..dd18af7
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+public class EmailNotificationConfig extends NotificationConfig{
+	private static final long serialVersionUID = 1L;
+	private String sender;
+	private String recipients;
+	private String tplFileName;
+	private String subject;
+	public String getSubject() {
+		return subject;
+	}
+	public void setSubject(String subject) {
+		this.subject = subject;
+	}
+	public String getRecipients() {
+		return recipients;
+	}
+	public void setRecipients(String recipients) {
+		this.recipients = recipients;
+	}
+	public String getSender() {
+		return sender;
+	}
+	public void setSender(String sender) {
+		this.sender = sender;
+	}
+	public String getTplFileName() {
+		return tplFileName;
+	}
+	public void setTplFileName(String tplFileName) {
+		this.tplFileName = tplFileName;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java
new file mode 100644
index 0000000..76f2d56
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+import java.io.Serializable;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "flavor", visible=true)
+public class NotificationConfig implements Serializable{
+	private static final long serialVersionUID = 1L;
+	private String id;
+	private String flavor;
+
+	public String getId() {
+		return id;
+	}
+
+	public void setId(String id) {
+		this.id = id;
+	}
+
+	public String getFlavor() {
+		return flavor;
+	}
+
+	public void setFlavor(String flavor) {
+		this.flavor = flavor;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/Remediation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/Remediation.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/Remediation.java
new file mode 100644
index 0000000..ec52508
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/Remediation.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+import java.io.Serializable;
+
+public class Remediation implements Serializable{
+	private static final long serialVersionUID = 1L;
+	private String id;
+
+	public String getId() {
+		return id;
+	}
+
+	public void setId(String id) {
+		this.id = id;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
new file mode 100644
index 0000000..b846418
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dedup;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.config.DeduplicatorConfig;
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.alert.policy.DynamicPolicyLoader;
+import org.apache.eagle.alert.policy.PolicyLifecycleMethods;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.datastream.Tuple2;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods {
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(AlertDeduplicationExecutorBase.class);
+	protected Config config;
+	protected DEDUP_TYPE dedupType;
+
+	private List<String> alertExecutorIdList;
+	private volatile CopyOnWriteHashMap<String, DefaultDeduplicator<AlertAPIEntity>> alertDedups;
+	private AlertDefinitionDAO dao;
+
+	public enum DEDUP_TYPE {
+		ENTITY,
+		EMAIL
+	}
+
+	public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, DEDUP_TYPE dedupType, AlertDefinitionDAO dao){
+		this.alertExecutorIdList = alertExecutorIdList;
+		this.dedupType = dedupType;
+		this.dao = dao;
+	}
+	
+	@Override
+	public void prepareConfig(Config config) {
+		this.config = config;
+	}
+	
+	public DefaultDeduplicator<AlertAPIEntity> createAlertDedup(AlertDefinitionAPIEntity alertDef) {
+		DeduplicatorConfig dedupConfig = null;
+		try {
+			dedupConfig = JsonSerDeserUtils.deserialize(alertDef.getDedupeDef(), DeduplicatorConfig.class);
+		}
+		catch (Exception ex) {
+			LOG.warn("Initial dedupConfig error, " + ex.getMessage());
+		}
+
+        if (dedupConfig != null) {
+			if (dedupType.equals(DEDUP_TYPE.ENTITY)) {
+				return new DefaultDeduplicator<>(dedupConfig.getAlertDedupIntervalMin());
+			} else if (dedupType.equals(DEDUP_TYPE.EMAIL)) {
+				return new DefaultDeduplicator<>(dedupConfig.getEmailDedupIntervalMin());
+			}
+		}
+
+		return null;
+	}
+	
+	@Override
+	public void init() {		
+        String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
+        String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
+	    Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;	    	    
+	    try {
+	 		initialAlertDefs = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
+	    }
+	    catch (Exception ex) {
+ 			LOG.error("fail to initialize initialAlertDefs: ", ex);
+	        throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
+        }
+	    Map<String, DefaultDeduplicator<AlertAPIEntity>> tmpDeduplicators = new HashMap<String, DefaultDeduplicator<AlertAPIEntity>>();
+        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
+            LOG.warn("No alert definitions was found for site: "+site+", dataSource: "+dataSource);
+        } else {
+		    for (String alertExecutorId: alertExecutorIdList) {
+			    if(initialAlertDefs.containsKey(alertExecutorId)){
+                    for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){
+                       try {
+                          DefaultDeduplicator<AlertAPIEntity> deduplicator = createAlertDedup(alertDef);
+                          if (deduplicator != null)
+                              tmpDeduplicators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), deduplicator);
+                          else LOG.warn("The dedup interval is not set, alertDef: " + alertDef);
+                        }
+                        catch (Throwable t) {
+                            LOG.error("Got an exception when initial dedup config, probably dedup config is not set: " + t.getMessage() + "," + alertDef);
+                        }
+                    }
+                } else {
+                    LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
+                }
+		    }
+        }
+
+		alertDedups = new CopyOnWriteHashMap<>();
+		alertDedups.putAll(tmpDeduplicators);
+		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
+		policyLoader.init(initialAlertDefs, dao, config);
+		for (String alertExecutorId : alertExecutorIdList) {
+		 	policyLoader.addPolicyChangeListener(alertExecutorId, this);
+		}
+	}
+
+    @Override
+    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector){
+        String policyId = (String) input.get(0);
+        AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
+        DefaultDeduplicator<AlertAPIEntity> dedup;
+        synchronized(alertDedups) {
+            dedup = alertDedups.get(policyId);
+        }
+
+        List<AlertAPIEntity> ret = Arrays.asList(alertEntity);
+        if (dedup == null) {
+            LOG.warn("Dedup config for policyId " + policyId + " is not set or is not a valid config");
+        } else {
+            if (dedup.getDedupIntervalMin() == -1) {
+                LOG.warn("the dedup interval is set as -1, which mean all alerts should be deduped(skipped)");
+                return;
+            }
+            ret = dedup.dedup(ret);
+        }
+        for (AlertAPIEntity entity : ret) {
+            outputCollector.collect(new Tuple2(policyId, entity));
+        }
+    }
+
+	public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
+		if(LOG.isDebugEnabled()) LOG.debug("Alert dedup config to be added : " + added);
+		for(AlertDefinitionAPIEntity alertDef : added.values()){
+			LOG.info("Alert dedup config really added " + alertDef);
+			DefaultDeduplicator<AlertAPIEntity> dedup = createAlertDedup(alertDef);
+			if (dedup != null) {
+				synchronized(alertDedups) {		
+					alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup);
+				}
+			}
+		}
+	}
+	
+	public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
+		LOG.info("Alert dedup config changed : " + changed);
+		for(AlertDefinitionAPIEntity alertDef : changed.values()){
+			LOG.info("Alert dedup config really changed " + alertDef);
+			DefaultDeduplicator<AlertAPIEntity> dedup = createAlertDedup(alertDef);
+			if (dedup != null) {
+				synchronized(alertDedups) {
+					alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup);
+				}
+			}
+		}
+	}
+	
+	public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
+		LOG.info("alert dedup config deleted : " + deleted);
+		for(AlertDefinitionAPIEntity alertDef : deleted.values()){
+			LOG.info("alert dedup config deleted " + alertDef);
+			// no cleanup to do, just remove it
+			synchronized(alertDedups) {		
+				alertDedups.remove(alertDef.getTags().get(AlertConstants.POLICY_ID));
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
new file mode 100644
index 0000000..81cd78f
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dedup;
+
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+
+import java.util.List;
+
+public class AlertEmailDeduplicationExecutor extends AlertDeduplicationExecutorBase {
+
+	private static final long serialVersionUID = 1L;
+
+	public AlertEmailDeduplicationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
+		super(alertExecutorIdList, DEDUP_TYPE.EMAIL, dao);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
new file mode 100644
index 0000000..2974801
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dedup;
+
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+
+import java.util.List;
+
+public class AlertEntityDeduplicationExecutor extends AlertDeduplicationExecutorBase {
+
+	private static final long serialVersionUID = 1L;
+
+	public AlertEntityDeduplicationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
+		super(alertExecutorIdList, DEDUP_TYPE.ENTITY, dao);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
new file mode 100644
index 0000000..b968e38
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dedup;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+public class DefaultDeduplicator<T extends TaggedLogAPIEntity> implements EntityDeduplicator<T>{
+	protected long dedupIntervalMin;
+	protected Map<EntityTagsUniq, Long> entites = new HashMap<EntityTagsUniq, Long>();
+	public static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
+	
+	public static enum AlertDeduplicationStatus{
+		NEW,
+		DUPLICATED,
+		IGNORED
+	}
+	
+	public DefaultDeduplicator() {
+		this.dedupIntervalMin = 0;
+	}
+	
+	public DefaultDeduplicator(long intervalMin) {
+		this.dedupIntervalMin = intervalMin;
+	}
+	
+	public void clearOldCache() {
+		List<EntityTagsUniq> removedkeys = new ArrayList<EntityTagsUniq>();
+		for (Entry<EntityTagsUniq, Long> entry : entites.entrySet()) {
+			EntityTagsUniq entity = entry.getKey();
+			if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
+				removedkeys.add(entry.getKey());
+			}
+		}
+		for (EntityTagsUniq alertKey : removedkeys) {
+			entites.remove(alertKey);
+		}
+	}
+	
+	public AlertDeduplicationStatus checkDedup(EntityTagsUniq key){
+		long current = key.timestamp;
+		if(!entites.containsKey(key)){
+			entites.put(key, current);
+			return AlertDeduplicationStatus.NEW;
+		}
+		
+		long last = entites.get(key);
+		if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE){
+			entites.put(key, current);
+			return AlertDeduplicationStatus.DUPLICATED;
+		}
+		
+		return AlertDeduplicationStatus.IGNORED;
+	}
+	
+	public List<T> dedup(List<T> list) {
+		clearOldCache();
+		List<T> dedupList = new ArrayList<T>();
+        int totalCount = list.size();
+        int dedupedCount = 0;
+		for(T entity: list) {
+			if (entity.getTags() == null) {
+				if(LOG.isDebugEnabled()) LOG.debug("Tags is null, don't know how to deduplicate, do nothing");
+			} else {
+                AlertDeduplicationStatus status = checkDedup(new EntityTagsUniq(entity.getTags(), entity.getTimestamp()));
+                if (!status.equals(AlertDeduplicationStatus.IGNORED)) {
+                    dedupList.add(entity);
+                } else {
+                    dedupedCount++;
+                    if (LOG.isDebugEnabled())
+                        LOG.debug(String.format("Entity is skipped because it's duplicated: " + entity.toString()));
+                }
+            }
+		}
+
+        if(dedupedCount>0){
+            LOG.info(String.format("Skipped %s of %s alerts because they are duplicated",dedupedCount,totalCount));
+        }else if(LOG.isDebugEnabled()){
+            LOG.debug(String.format("Skipped %s of %s duplicated alerts",dedupedCount,totalCount));
+        }
+
+		return dedupList;
+	}
+
+	public EntityDeduplicator<T> setDedupIntervalMin(long dedupIntervalMin) {
+		this.dedupIntervalMin = dedupIntervalMin;
+		return this;
+	}
+	
+	public long getDedupIntervalMin() {
+		return dedupIntervalMin;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
new file mode 100644
index 0000000..e108b94
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dedup;
+
+import java.util.List;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+/**
+ * Dedup Eagle entities.
+ *
+ * @param <T> Eagle entity
+ */
+public interface EntityDeduplicator<T extends TaggedLogAPIEntity> {
+	
+	EntityDeduplicator<T> setDedupIntervalMin(long intervalMin);
+	
+	long getDedupIntervalMin();
+	
+	List<T> dedup(List<T> list);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
new file mode 100644
index 0000000..81c8ba6
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.alert.dedup;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since Mar 19, 2015
+ */
+public class EntityTagsUniq {
+	public Map<String, String> tags;
+	public Long timestamp;	 // entity's timestamp
+	public long createdTime; // entityTagsUniq's created time, for cache removal;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(EntityTagsUniq.class);
+	
+	public EntityTagsUniq(Map<String, String> tags, long timestamp) {
+		this.tags = new HashMap<String, String>(tags);
+		this.timestamp = timestamp;
+		this.createdTime = System.currentTimeMillis();
+	}
+	
+	@Override	
+	public boolean equals(Object obj) {		
+		if (obj instanceof EntityTagsUniq) {
+			EntityTagsUniq au = (EntityTagsUniq) obj;
+			if (tags.size() != au.tags.size()) return false;
+			for (Entry<String, String> keyValue : au.tags.entrySet()) {
+				boolean keyExist = tags.containsKey(keyValue.getKey());
+				// sanity check
+				if (tags.get(keyValue.getKey()) == null || keyValue.getValue() == null) {
+					return true;
+				}
+				if ( !keyExist || !tags.get(keyValue.getKey()).equals(keyValue.getValue())) {				
+					return false;
+				}
+			}
+			return true; 
+		}
+		return false;
+	}
+	
+	@Override
+	public int hashCode() {	
+		int hashCode = 0;
+		for (Map.Entry<String,String> entry : tags.entrySet()) {
+            if(entry.getValue() == null) {
+                LOG.warn("Tag value for key ["+entry.getKey()+"] is null, skipped for hash code");
+            }else {
+                try {
+                    hashCode ^= entry.getValue().hashCode();
+                } catch (Throwable t) {
+                    LOG.info("Got exception because of entry: " + entry, t);
+                }
+            }
+		}
+		return hashCode;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
new file mode 100644
index 0000000..2a61d76
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.alert.notification;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.common.AlertEmailSender;
+import org.apache.eagle.alert.email.AlertEmailComponent;
+import org.apache.eagle.alert.email.AlertEmailContext;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import com.typesafe.config.ConfigObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertEmailGenerator{
+	private String tplFile;
+	private String sender;
+	private String recipients;
+	private String subject;
+	private ConfigObject eagleProps;
+
+    private ThreadPoolExecutor executorPool;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
+
+    private final static long MAX_TIMEOUT_MS =60000;
+
+    public void sendAlertEmail(AlertAPIEntity entity) {
+		sendAlertEmail(entity, recipients, null);
+	}
+	
+	public void sendAlertEmail(AlertAPIEntity entity, String recipients) {
+		sendAlertEmail(entity, recipients, null);	
+	}
+	
+	public void sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
+		AlertEmailContext email = new AlertEmailContext();
+		
+		AlertEmailComponent component = new AlertEmailComponent();
+		component.setAlertContext(entity.getAlertContext());
+		List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
+		components.add(component);		
+		email.setComponents(components);
+		if (entity.getAlertContext().getProperty(AlertConstants.SUBJECT) != null) {
+			email.setSubject(entity.getAlertContext().getProperty(AlertConstants.SUBJECT));
+		}
+		else email.setSubject(subject);
+		email.setVelocityTplFile(tplFile);
+		email.setRecipients(recipients);
+		email.setCc(cc);
+		email.setSender(sender);
+		
+		/** asynchronized email sending */
+		@SuppressWarnings("rawtypes")
+        AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
+
+        if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
+
+        LOG.info("Sending email  in asynchronous to: "+recipients+", cc: "+cc);
+        Future future = this.executorPool.submit(thread);
+        try {
+            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            LOG.info(String.format("Successfully send email to %s", recipients));
+        } catch (InterruptedException | ExecutionException  e) {
+            LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
+        } catch (TimeoutException e) {
+            LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
+        }
+    }
+	
+	public String getTplFile() {
+		return tplFile;
+	}
+	
+	public void setTplFile(String tplFile) {
+		this.tplFile = tplFile;
+	}
+
+	public String getSender() {
+		return sender;
+	}
+
+	public void setSender(String sender) {
+		this.sender = sender;
+	}
+
+	public String getRecipients() {
+		return recipients;
+	}
+
+	public void setRecipients(String recipients) {
+		this.recipients = recipients;
+	}
+
+	public String getSubject() {
+		return subject;
+	}
+
+	public void setSubject(String subject) {
+		this.subject = subject;
+	}
+
+	public ConfigObject getEagleProps() {
+		return eagleProps;
+	}
+
+	public void setEagleProps(ConfigObject eagleProps) {
+		this.eagleProps = eagleProps;
+	}
+
+    public void setExecutorPool(ThreadPoolExecutor executorPool) {
+        this.executorPool = executorPool;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGeneratorBuilder.java
new file mode 100644
index 0000000..0303476
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGeneratorBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.notification;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import com.typesafe.config.ConfigObject;
+
+public class AlertEmailGeneratorBuilder {
+	private AlertEmailGenerator generator;
+	private AlertEmailGeneratorBuilder(){
+		generator = new AlertEmailGenerator();
+	}
+	public static AlertEmailGeneratorBuilder newBuilder(){
+		return new AlertEmailGeneratorBuilder();
+	}
+	public AlertEmailGeneratorBuilder withSubject(String subject){
+		generator.setSubject(subject);
+		return this;
+	}
+	public AlertEmailGeneratorBuilder withSender(String sender){
+		generator.setSender(sender);
+		return this;
+	}
+	public AlertEmailGeneratorBuilder withRecipients(String recipients){
+		generator.setRecipients(recipients);
+		return this;
+	}
+	public AlertEmailGeneratorBuilder withTplFile(String tplFile){
+		generator.setTplFile(tplFile);
+		return this;
+	}
+	public AlertEmailGeneratorBuilder withEagleProps(ConfigObject eagleProps) {
+		generator.setEagleProps(eagleProps);
+		return this;
+	}
+    public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
+        generator.setExecutorPool(threadPoolExecutor);
+        return this;
+    }
+
+    public AlertEmailGenerator build(){
+		return this.generator;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
new file mode 100644
index 0000000..4723dec
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.notification;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.config.EmailNotificationConfig;
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.alert.policy.DynamicPolicyLoader;
+import org.apache.eagle.alert.policy.PolicyLifecycleMethods;
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor1;
+import org.apache.eagle.datastream.Tuple1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * notify alert by email, sms or other means
+ * currently we only implements email notification
+ */
+public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods {
+
+	private static final long serialVersionUID = 1690354365435407034L;
+	private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class);
+	private Config config;
+
+	private List<String> alertExecutorIdList;
+	private volatile CopyOnWriteHashMap<String, List<AlertEmailGenerator>> alertEmailGeneratorsMap;
+	private AlertDefinitionDAO dao;
+
+    private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
+    private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
+    private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+
+    private transient ThreadPoolExecutor executorPool;
+
+    public AlertNotificationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
+		this.alertExecutorIdList = alertExecutorIdList;
+		this.dao = dao;
+	}
+	
+	public List<AlertEmailGenerator> createAlertEmailGenerator(AlertDefinitionAPIEntity alertDef) {
+		Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email"));
+		EmailNotificationConfig[] emailConfigs = new EmailNotificationConfig[0];
+		try {			
+			emailConfigs = JsonSerDeserUtils.deserialize(alertDef.getNotificationDef(), EmailNotificationConfig[].class, Arrays.asList(module));
+		}
+		catch (Exception ex) {
+			LOG.warn("Initial emailConfig error, wrong format or it's error " + ex.getMessage());
+		}
+		List<AlertEmailGenerator> gens = new ArrayList<AlertEmailGenerator>();
+		if (emailConfigs == null) {
+			return gens;		
+		}
+		for(EmailNotificationConfig emailConfig : emailConfigs) {
+			String tplFileName = emailConfig.getTplFileName();			
+			if (tplFileName == null || tplFileName.equals("")) { // empty tplFileName, use default tpl file name
+				tplFileName = "ALERT_DEFAULT.vm";
+			}
+			AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
+																withEagleProps(config.getObject("eagleProps")).
+																withSubject(emailConfig.getSubject()).
+																withSender(emailConfig.getSender()).
+																withRecipients(emailConfig.getRecipients()).
+																withTplFile(tplFileName).
+                                                                withExecutorPool(executorPool).
+																build();
+			gens.add(gen);
+		}
+		return gens;
+	}
+	
+	/**
+	 * 1. register both file and database configuration
+	 * 2. create email generator from configuration
+	 */
+    @Override
+	public void init(){
+        executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+
+		Map<String, List<AlertEmailGenerator>> tmpEmailGenerators = new HashMap<String, List<AlertEmailGenerator>> ();
+		
+        String site = config.getString("eagleProps.site");
+        String dataSource = config.getString("eagleProps.dataSource");
+	    Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
+	    try {
+	 		initialAlertDefs = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
+	    }
+	    catch (Exception ex) {
+ 			LOG.error("fail to initialize initialAlertDefs: ", ex);
+	        throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
+        }
+ 	   
+        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
+            LOG.warn("No alert definitions found for site: "+site+", dataSource: "+dataSource);
+        }
+        else {
+		    for (String alertExecutorId: alertExecutorIdList) {
+                if(initialAlertDefs.containsKey(alertExecutorId)) {
+                    for (AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()) {
+                        List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
+                        tmpEmailGenerators.put(alertDef.getTags().get("policyId"), gens);
+                    }
+                }else{
+                    LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
+                }
+		    }
+        }
+		
+		alertEmailGeneratorsMap = new CopyOnWriteHashMap<String, List<AlertEmailGenerator>>();
+		alertEmailGeneratorsMap.putAll(tmpEmailGenerators);				
+		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
+		policyLoader.init(initialAlertDefs, dao, config);
+		for (String alertExecutorId : alertExecutorIdList) {
+			policyLoader.addPolicyChangeListener(alertExecutorId, this);
+		}
+	}
+
+    @Override
+	public void prepareConfig(Config config) {
+		this.config = config;
+	}
+
+    @Override
+    public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
+        String policyId = (String) input.get(0);
+        AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
+        processAlerts(policyId, Arrays.asList(alertEntity));
+    }
+	
+	//TODO: add a thread pool for email sender?
+	private void processAlerts(String policyId, List<AlertAPIEntity> list) {
+		List<AlertEmailGenerator> generators;
+		synchronized(alertEmailGeneratorsMap) {		
+			generators = alertEmailGeneratorsMap.get(policyId);
+		}
+		if (generators == null) {
+			LOG.warn("Notification config of policyId " + policyId + " has been deleted");
+			return;
+		}
+		for (AlertAPIEntity entity : list) {
+			for(AlertEmailGenerator generator : generators){
+				generator.sendAlertEmail(entity);
+			}
+		}
+	}
+
+	@Override
+	public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
+		if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
+		for(AlertDefinitionAPIEntity alertDef : added.values()){
+			LOG.info("alert notification config really changed " + alertDef);
+			List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
+			synchronized(alertEmailGeneratorsMap) {		
+				alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
+			}
+		}		
+	}
+
+	@Override
+	public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
+		if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be added : " + changed);
+		for(AlertDefinitionAPIEntity alertDef : changed.values()){
+			LOG.info("alert notification config really added " + alertDef);
+			List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
+			synchronized(alertEmailGeneratorsMap) {					
+				alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
+			}
+		}			
+	}
+
+	@Override
+	public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
+		if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be deleted : " + deleted);
+		for(AlertDefinitionAPIEntity alertDef : deleted.values()){
+			LOG.info("alert notification config really deleted " + alertDef);
+			synchronized(alertEmailGeneratorsMap) {		
+				alertEmailGeneratorsMap.remove(alertDef.getTags().get("policyId"));
+			}
+		}		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/UrlBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/UrlBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/UrlBuilder.java
new file mode 100644
index 0000000..f4e248f
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/UrlBuilder.java
@@ -0,0 +1,47 @@
+package org.apache.eagle.alert.notification;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.common.EagleBase64Wrapper;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.RowkeyBuilder;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.mortbay.util.UrlEncoded;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UrlBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(UrlBuilder.class);
+
+    public static String getEncodedRowkey(AlertAPIEntity entity) throws Exception {
+        InternalLog log = HBaseInternalLogHelper.convertToInternalLog(entity, EntityDefinitionManager.getEntityDefinitionByEntityClass(entity.getClass()));
+        return EagleBase64Wrapper.encodeByteArray2URLSafeString(RowkeyBuilder.buildRowkey(log));
+    }
+
+    public static String buildAlertDetailUrl(String host, int port, AlertAPIEntity entity) {
+        String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/alertDetail/";
+        try {
+            return baseUrl + UrlEncoded.encodeString(getEncodedRowkey(entity));
+        }
+        catch (Exception ex) {
+            logger.error("Fail to populate encodedRowkey for alert Entity" + entity.toString());
+            return "N/A";
+        }
+    }
+
+    public static String buiildPolicyDetailUrl(String host, int port, Map<String, String> tags) {
+        String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/policyDetail?";
+        String format = "policy=%s&site=%s&executor=%s";
+        String policy = tags.get(AlertConstants.POLICY_ID);
+        String site = tags.get(EagleConfigConstants.SITE);
+        String alertExecutorID = tags.get(AlertConstants.ALERT_EXECUTOR_ID);
+        if (policy != null && site != null && alertExecutorID != null) {
+            return baseUrl + String.format(format, policy, site, alertExecutorID);
+        }
+        return "N/A";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
new file mode 100644
index 0000000..d6be567
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.persist;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor1;
+import org.apache.eagle.datastream.Tuple1;
+
+import java.util.Arrays;
+
+public class AlertPersistExecutor extends JavaStormStreamExecutor1<String> {
+
+	private static final long serialVersionUID = 1L;
+	private Config config;
+	private EaglePersist persist;
+
+	public AlertPersistExecutor(){
+	}
+    @Override
+	public void prepareConfig(Config config) {
+		this.config = config;		
+	}
+
+    @Override
+	public void init() {
+		String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+		int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+		String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME)
+				? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
+		String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)
+				? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
+		this.persist = new EaglePersist(host, port, username, password);
+	}
+
+    @Override
+    public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
+        persist.doPersist(Arrays.asList((AlertAPIEntity)(input.get(1))));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
new file mode 100644
index 0000000..ebba518
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.alert.persist;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class EaglePersist {
+		
+	private static Logger LOG = LoggerFactory.getLogger(EaglePersist.class);
+	private String eagleServiceHost;
+	private int eagleServicePort;
+	private String username;
+	private String password;
+
+	public EaglePersist(String eagleServiceHost, int eagleServicePort) {
+		this(eagleServiceHost, eagleServicePort, null, null);
+	}
+
+	public EaglePersist(String eagleServiceHost, int eagleServicePort, String username, String password) {
+		this.eagleServiceHost = eagleServiceHost;
+		this.eagleServicePort = eagleServicePort;
+		this.username = username;
+		this.password = password;
+	}
+	
+	public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
+		if (list.isEmpty()) return false;
+		LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
+		try {
+			IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
+			GenericServiceAPIResponseEntity<String> response = client.create(list);
+			client.close();
+			if (response.isSuccess()) {
+				LOG.info("Successfully create entities " + list.toString());
+				return true;
+			}
+			else {
+				LOG.error("Fail to create entities");
+				return false;
+			}
+		}
+		catch (Exception ex) {
+			LOG.error("Got an exception in persisting entities" + ex.getMessage(), ex);
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DefaultPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DefaultPolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DefaultPolicyPartitioner.java
new file mode 100644
index 0000000..3d7b4fb
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DefaultPolicyPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+
+public class DefaultPolicyPartitioner implements PolicyPartitioner{
+	@Override
+	public int partition(int numTotalPartitions, String policyType,
+			String policyId) {
+		final int prime = 31;
+		int result = 1;
+		result = result * prime + policyType.hashCode();
+		result = result < 0 ? result*-1 : result;
+		result = result * prime + policyId.hashCode();
+		result = result < 0 ? result*-1 : result;
+		return result % numTotalPartitions;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
new file mode 100644
index 0000000..707cd30
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import com.netflix.config.AbstractPollingScheduler;
+import com.netflix.config.ConcurrentCompositeConfiguration;
+import com.netflix.config.DynamicConfiguration;
+import com.netflix.config.FixedDelayPollingScheduler;
+import com.netflix.config.PollListener;
+import com.netflix.config.PollResult;
+import com.netflix.config.PolledConfigurationSource;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
+
+public class DynamicPolicyLoader {
+	private static final Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
+	
+	private final int defaultInitialDelayMillis = 30*1000;
+	private final int defaultDelayMillis = 60*1000;
+	private final boolean defaultIgnoreDeleteFromSource = true;
+	private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>> policyChangeListeners = new CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>();
+	private static DynamicPolicyLoader instance = new DynamicPolicyLoader();
+	private volatile boolean initialized = false;
+	
+	public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMethods alertExecutor){
+		synchronized(policyChangeListeners) {
+			if (policyChangeListeners.get(alertExecutorId) == null) {
+				policyChangeListeners.put(alertExecutorId, new ArrayList<PolicyLifecycleMethods>());
+			}
+			policyChangeListeners.get(alertExecutorId).add(alertExecutor);
+		}
+	}
+	
+	public static DynamicPolicyLoader getInstance(){
+		return instance;
+	}
+	
+	/**
+	 * singleton with init would be good for unit test as well, and it ensures that
+	 * initialization happens only once before you use it.  
+	 * @param config
+	 * @param dao
+	 */
+	public void init(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs, 
+			AlertDefinitionDAO dao, Config config){
+		if(!initialized){
+			synchronized(this){
+				if(!initialized){
+					internalInit(initialAlertDefs, dao, config);
+					initialized = true;
+				}
+			}
+		}
+	}
+	
+	/**
+	 * map from alertExecutorId+partitionId to AlertExecutor which implements PolicyLifecycleMethods
+	 * @param initialAlertDefs
+	 * @param dao
+	 * @param config
+	 */
+	private void internalInit(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs,
+			AlertDefinitionDAO dao, Config config){
+		if(!config.getBoolean("dynamicConfigSource.enabled")) {
+            return;
+        }
+		AbstractPollingScheduler scheduler = new FixedDelayPollingScheduler(
+                config.getInt("dynamicConfigSource.initDelayMillis"),
+                config.getInt("dynamicConfigSource.delayMillis"),
+                false
+        );
+
+		scheduler.addPollListener(new PollListener(){
+			@Override
+			public void handleEvent(EventType eventType, PollResult lastResult,
+					Throwable exception) {
+				if (lastResult == null) {
+					LOG.error("The lastResult is null, something must be wrong, probably the eagle service is dead!");
+					throw new RuntimeException("The lastResult is null, probably the eagle service is dead! ", exception);
+				}
+				Map<String, Object> added = lastResult.getAdded();
+				Map<String, Object> changed = lastResult.getChanged();
+				Map<String, Object> deleted = lastResult.getDeleted();
+				for(Map.Entry<String, List<PolicyLifecycleMethods>> entry : policyChangeListeners.entrySet()){
+					String alertExecutorId = entry.getKey();
+					for (PolicyLifecycleMethods policyLifecycleMethod : entry.getValue()) {
+						Map<String, AlertDefinitionAPIEntity> addedPolicies = (Map<String, AlertDefinitionAPIEntity>)added.get(trimPartitionNum(alertExecutorId));
+						if(addedPolicies != null && addedPolicies.size() > 0){
+							policyLifecycleMethod.onPolicyCreated(addedPolicies);
+						}
+						Map<String, AlertDefinitionAPIEntity> changedPolicies = (Map<String, AlertDefinitionAPIEntity>)changed.get(trimPartitionNum(alertExecutorId));
+						if(changedPolicies != null && changedPolicies.size() > 0){
+							policyLifecycleMethod.onPolicyChanged(changedPolicies);
+						}
+						Map<String, AlertDefinitionAPIEntity> deletedPolicies = (Map<String, AlertDefinitionAPIEntity>)deleted.get(trimPartitionNum(alertExecutorId));
+						if(deletedPolicies != null && deletedPolicies.size() > 0){
+							policyLifecycleMethod.onPolicyDeleted(deletedPolicies);
+						}
+					}
+				}
+			}
+			private String trimPartitionNum(String alertExecutorId){
+				int i = alertExecutorId.lastIndexOf('_');
+				if(i != -1){
+					return alertExecutorId.substring(0, i);
+				}
+				return alertExecutorId;
+			}
+		});
+		
+		ConcurrentCompositeConfiguration finalConfig = new ConcurrentCompositeConfiguration();
+		      
+		PolledConfigurationSource source = new DynamicPolicySource(initialAlertDefs, dao, config);
+
+		try{
+			DynamicConfiguration dbSourcedConfiguration = new DynamicConfiguration(source, scheduler);
+			finalConfig.addConfiguration(dbSourcedConfiguration);
+		}catch(Exception ex){
+			LOG.warn("Fail loading from DB, continue without DB sourced configuration", ex);
+		}
+	}
+	
+	public static class DynamicPolicySource implements PolledConfigurationSource{
+		private static Logger LOG = LoggerFactory.getLogger(DynamicPolicySource.class);
+		private Config config;
+		private AlertDefinitionDAO dao;
+		/**
+		 * mapping from alertExecutorId to list of policies 
+		 */
+		private Map<String, Map<String, AlertDefinitionAPIEntity>> cachedAlertDefs;
+		
+		public DynamicPolicySource(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs, AlertDefinitionDAO dao, Config config){
+			this.cachedAlertDefs = initialAlertDefs;
+			this.dao = dao;
+			this.config = config;
+		}
+
+		public PollResult poll(boolean initial, Object checkPoint) throws Exception {
+			LOG.info("Poll policy from eagle service " +  config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST) +
+					":" + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT) );
+			Map<String, Map<String, AlertDefinitionAPIEntity>> newAlertDefs = 
+					dao.findActiveAlertDefsGroupbyAlertExecutorId(config.getString("eagleProps.site"),
+                            config.getString("eagleProps.dataSource"));
+			
+			// compare runtime alertDefs with cachedAlertDefs and figure out what are added/deleted/updated
+			Map<String, Object> added = new HashMap<String, Object>();
+			Map<String, Object> changed = new HashMap<String, Object>();
+			Map<String, Object> deleted = new HashMap<String, Object>();
+			
+			Set<String> newAlertExecutorIds = newAlertDefs.keySet();
+			Set<String> cachedAlertExecutorIds = cachedAlertDefs.keySet();
+			
+			// dynamically adding new alert executor is not supported, because alert executor is pre-built while program starts up
+			Collection<String> addedAlertExecutorIds = CollectionUtils.subtract(newAlertExecutorIds, cachedAlertExecutorIds);
+			if(addedAlertExecutorIds != null && addedAlertExecutorIds.size() > 0){
+				LOG.warn("New alertExecutorIds are found : " + addedAlertExecutorIds);
+			}
+			
+			// if one alert executor is missing, it means all policy under that alert executor should be removed
+			Collection<String> deletedAlertExecutorIds = CollectionUtils.subtract(cachedAlertExecutorIds, newAlertExecutorIds);
+			if(deletedAlertExecutorIds != null && deletedAlertExecutorIds.size() > 0){
+				LOG.warn("Some alertExecutorIds are deleted : " + deletedAlertExecutorIds);
+				for(String deletedAlertExecutorId : deletedAlertExecutorIds){
+					deleted.put(deletedAlertExecutorId, cachedAlertDefs.get(deletedAlertExecutorId));
+				}
+			}
+			
+			// we need calculate added/updated/deleted policy for all executors which are not deleted
+//			Collection<String> updatedAlertExecutorIds = CollectionUtils.intersection(newAlertExecutorIds, cachedAlertExecutorIds);
+            Collection<String> updatedAlertExecutorIds = newAlertExecutorIds;
+			for(String updatedAlertExecutorId : updatedAlertExecutorIds){
+				Map<String, AlertDefinitionAPIEntity> newPolicies = newAlertDefs.get(updatedAlertExecutorId);
+				Map<String, AlertDefinitionAPIEntity> cachedPolicies = cachedAlertDefs.get(updatedAlertExecutorId);
+				PolicyComparator.compare(updatedAlertExecutorId, newPolicies, cachedPolicies, added, changed, deleted);
+			}
+			
+			cachedAlertDefs = newAlertDefs;
+			return PollResult.createIncremental(added, changed, deleted, new Date().getTime());
+		}
+	}
+	
+	public static class PolicyComparator{
+		public static void compare(String alertExecutorId, Map<String, AlertDefinitionAPIEntity> newPolicies, Map<String, AlertDefinitionAPIEntity> cachedPolicies, 
+				Map<String, Object> added, Map<String, Object> changed, Map<String, Object> deleted){
+			Set<String> newPolicyIds = newPolicies.keySet();
+            Set<String> cachedPolicyIds = cachedPolicies != null ? cachedPolicies.keySet() : new HashSet<String>();
+			Collection<String> addedPolicyIds = CollectionUtils.subtract(newPolicyIds, cachedPolicyIds);
+			Collection<String> deletedPolicyIds = CollectionUtils.subtract(cachedPolicyIds, newPolicyIds);
+			Collection<String> changedPolicyIds = CollectionUtils.intersection(cachedPolicyIds, newPolicyIds);
+			if(addedPolicyIds != null && addedPolicyIds.size() > 0){
+				Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
+				for(String addedPolicyId : addedPolicyIds){
+					tmp.put(addedPolicyId, newPolicies.get(addedPolicyId));
+				}
+				added.put(alertExecutorId, tmp);
+			}
+			if(deletedPolicyIds != null && deletedPolicyIds.size() > 0){
+				Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
+				for(String deletedPolicyId : deletedPolicyIds){
+					tmp.put(deletedPolicyId, cachedPolicies.get(deletedPolicyId));
+				}
+				deleted.put(alertExecutorId, tmp);
+			}
+			if(changedPolicyIds != null && changedPolicyIds.size() > 0){
+				Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
+				for(String changedPolicyId : changedPolicyIds){
+					// check if policy is really changed
+					if(!newPolicies.get(changedPolicyId).equals(cachedPolicies.get(changedPolicyId))){
+						tmp.put(changedPolicyId, newPolicies.get(changedPolicyId));
+					}
+				}
+				changed.put(alertExecutorId, tmp);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PartitionUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PartitionUtils.java
new file mode 100644
index 0000000..5bf546c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PartitionUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+
+public class PartitionUtils {
+	
+	public static boolean accept(AlertDefinitionAPIEntity alertDef, PolicyPartitioner partitioner, int numPartitions, int partitionSeq){
+		int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID));
+		if(targetPartitionSeq == partitionSeq)
+			return true;
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluator.java
new file mode 100644
index 0000000..63588b8
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.dataproc.core.ValuesArray;
+
+public interface PolicyEvaluator {
+	/**
+	 * take input and evaluate expression
+	 * input has 3 fields, first is siddhiAlertContext, second one is streamName, the third is map of attribute name/value
+	 * @param input
+	 * @throws Exception
+	 */
+	public void evaluate(ValuesArray input) throws Exception;
+	
+	/**
+	 * notify policy evaluator that policy is updated
+	 */
+	public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef);
+	
+	/**
+	 * notify policy evaluator that policy is deleted, here is cleanup work for this policy evaluator
+	 */
+	public void onPolicyDelete();
+	
+	/**
+	 * get additional context
+	 */	
+	public Map<String, String> getAdditionalContext();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
new file mode 100644
index 0000000..412010c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import java.util.List;
+
+import com.fasterxml.jackson.databind.Module;
+
+/**
+ * to provide extensibility, we need a clear differentiation between framework job and provider logic
+ * policy evaluator framework:
+ * - connect to eagle data source
+ * - read all policy definitions
+ * - compare with cached policy definitions
+ * - figure out if policy is created, deleted or updated
+ *   - if policy is created, then invoke onPolicyCreated
+ *   - if policy is deleted, then invoke onPolicyDeleted
+ *   - if policy is updated, then invoke onPolicyUpdated
+ * - for policy update, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
+ * - specify # of executors for this alert executor id
+ * - dynamically balance # of policies evaluated by each alert executor
+ *   - use zookeeper to balance. eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies
+ * 
+ * policy evaluator business features:
+ * - register mapping between policy type and PolicyEvaluator
+ * - create evaluator engine runtime when configuration is changed
+ *
+ */
+public interface PolicyEvaluatorServiceProvider {
+	String getPolicyType();
+	Class<? extends PolicyEvaluator> getPolicyEvaluator();
+	List<Module> getBindingModules();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyLifecycleMethods.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyLifecycleMethods.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyLifecycleMethods.java
new file mode 100644
index 0000000..66ecd6b
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyLifecycleMethods.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+
+public interface PolicyLifecycleMethods {
+	void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added);
+	void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed);
+	void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyManager.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyManager.java
new file mode 100644
index 0000000..5e5dc8d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyManager.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.Module;
+
+public class PolicyManager {
+	private final static Logger LOG = LoggerFactory.getLogger(PolicyManager.class);
+	private static PolicyManager instance = new PolicyManager();
+
+	private ServiceLoader<PolicyEvaluatorServiceProvider> loader;
+	
+	private Map<String, Class<? extends PolicyEvaluator>> policyEvaluators = new HashMap<String, Class<? extends PolicyEvaluator>>();
+	private Map<String, List<Module>> policyModules = new HashMap<String, List<Module>>();
+	
+	private PolicyManager(){
+		loader = ServiceLoader.load(PolicyEvaluatorServiceProvider.class);
+		Iterator<PolicyEvaluatorServiceProvider> iter = loader.iterator();
+		while(iter.hasNext()){
+			PolicyEvaluatorServiceProvider factory = iter.next();
+			LOG.info("Supported policy type : " + factory.getPolicyType());
+			policyEvaluators.put(factory.getPolicyType(), factory.getPolicyEvaluator());
+			policyModules.put(factory.getPolicyType(), factory.getBindingModules());
+		}
+	}
+	
+	public static PolicyManager getInstance(){
+		return instance;
+	}
+	
+	public Class<? extends PolicyEvaluator> getPolicyEvaluator(String policyType){
+		return policyEvaluators.get(policyType);
+	}
+	
+	public List<Module> getPolicyModules(String policyType){
+		return policyModules.get(policyType);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyPartitioner.java
new file mode 100644
index 0000000..2047256
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyPartitioner.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import java.io.Serializable;
+
+/**
+ * partition policies so that policies can be distributed into different alert evaluators
+ */
+public interface PolicyPartitioner extends Serializable {
+	int partition(int numTotalPartitions, String policyType, String policyId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/AttributeType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/AttributeType.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/AttributeType.java
new file mode 100644
index 0000000..36d8281
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/AttributeType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+/**
+ * @see org.wso2.siddhi.query.api.definition.Attribute.Type
+ */
+public enum AttributeType {
+	STRING,
+	LONG,
+	INTEGER,
+	BOOL,
+    FLOAT,
+    DOUBLE
+//    , OBJECT
+}



Mime
View raw message