eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [2/2] incubator-eagle git commit: EAGLE-99 policy distribution reporter report policy distribution stats into local log or eagle service https://issues.apache.org/jira/browse/EAGLE-99 Author: @yonzhang yonzhang2012@apache.org Reviewer: @ralphsu suliangfe
Date Wed, 30 Dec 2015 04:54:45 GMT
EAGLE-99 policy distribution reporter
report policy distribution stats into local log or eagle service
https://issues.apache.org/jira/browse/EAGLE-99
Author: @yonzhang yonzhang2012@apache.org
Reviewer: @ralphsu suliangfei@gmail.com
Closes: #38


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/feaeabc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/feaeabc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/feaeabc1

Branch: refs/heads/master
Commit: feaeabc18454bc760cbc247bba6a1ba80047b118
Parents: 89f788a
Author: yonzhang <yonzhang@ebay.com>
Authored: Tue Dec 29 20:54:33 2015 -0800
Committer: yonzhang <yonzhang@ebay.com>
Committed: Tue Dec 29 20:54:33 2015 -0800

----------------------------------------------------------------------
 38.patch.1                                      | 1011 ++++++++++++++++++
 .../eagle/alert/policy/DynamicPolicyLoader.java |   24 +
 .../policy/PolicyDistStatsDAOLogReporter.java   |   47 +
 .../policy/PolicyDistributionReportMethods.java |   27 +
 .../alert/policy/PolicyDistributionStats.java   |   74 ++
 .../policy/PolicyDistributionStatsDAO.java      |   26 +
 .../policy/PolicyDistroStatsLogReporter.java    |   50 +
 .../policy/PolicyEvaluatorServiceProvider.java  |    2 +-
 .../apache/eagle/executor/AlertExecutor.java    |   28 +-
 .../policy/TestPolicyDistributionUpdater.java   |   93 ++
 .../alert/policy/TestPolicyPartitioner.java     |   29 +
 .../eagle/alert/state/TestAggregation.java      |   98 ++
 .../TestSiddhiStateSnapshotAndRestore.java      |  152 ++-
 .../org/apache/eagle/service/hbase/Tables.java  |    1 +
 14 files changed, 1653 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/38.patch.1
----------------------------------------------------------------------
diff --git a/38.patch.1 b/38.patch.1
new file mode 100644
index 0000000..2e1dbf4
--- /dev/null
+++ b/38.patch.1
@@ -0,0 +1,1011 @@
+From dccd015e7e2c22d19360b552745e8524cd55b294 Mon Sep 17 00:00:00 2001
+From: yonzhang <yonzhang@ebay.com>
+Date: Tue, 22 Dec 2015 14:57:47 -0800
+Subject: [PATCH 1/3] test siddhi aggregation and snapshot and restore
+
+---
+ .../apache/eagle/alert/state/TestAggregation.java  |  98 +++++++++++++
+ .../state/TestSiddhiStateSnapshotAndRestore.java   | 152 ++++++++++++++++++++-
+ 2 files changed, 245 insertions(+), 5 deletions(-)
+ create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
+
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
+new file mode 100644
+index 0000000..adcd728
+--- /dev/null
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
+@@ -0,0 +1,98 @@
++/*
++ *
++ *  * Licensed to the Apache Software Foundation (ASF) under one or more
++ *  * contributor license agreements.  See the NOTICE file distributed with
++ *  * this work for additional information regarding copyright ownership.
++ *  * The ASF licenses this file to You under the Apache License, Version 2.0
++ *  * (the "License"); you may not use this file except in compliance with
++ *  * the License.  You may obtain a copy of the License at
++ *  *
++ *  *    http://www.apache.org/licenses/LICENSE-2.0
++ *  *
++ *  * Unless required by applicable law or agreed to in writing, software
++ *  * distributed under the License is distributed on an "AS IS" BASIS,
++ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ *  * See the License for the specific language governing permissions and
++ *  * limitations under the License.
++ *
++ */
++
++package org.apache.eagle.alert.state;
++
++import java.util.concurrent.atomic.AtomicInteger;
++
++import org.junit.Test;
++import org.wso2.siddhi.core.ExecutionPlanRuntime;
++import org.wso2.siddhi.core.SiddhiManager;
++import org.wso2.siddhi.core.event.Event;
++import org.wso2.siddhi.core.query.output.callback.QueryCallback;
++import org.wso2.siddhi.core.stream.input.InputHandler;
++
++import junit.framework.Assert;
++import org.wso2.siddhi.core.util.EventPrinter;
++
++public class TestAggregation {
++    @Test
++    public void test01DownSampling() throws Exception {
++        String stream = "define stream jmxMetric(cpu double, memory int, bytesIn int, bytesOut long, timestamp long);";
++        String query = "@info(name = 'downSample') "
++                + "from jmxMetric#window.timeBatch(1 sec) "
++                + "select "
++                + " min(cpu) as minCpu, max(cpu) as maxCpu, avg(cpu) as avgCpu, "
++                + " min(memory) as minMem, max(memory) as maxMem, avg(memory) as avgMem, "
++                + " min(bytesIn) as minBytesIn, max(bytesIn) as maxBytesIn, avg(bytesIn) as avgBytesIn, sum(bytesIn) as totalBytesIn, "
++                + " min(bytesOut) as minBytesOut, max(bytesOut) as maxBytesOut, avg(bytesOut) as avgBytesOut, sum(bytesOut) as totalBytesOut, "
++                + " timestamp as timeWindowEnds "
++                + " INSERT  INTO tmp;";
++
++        SiddhiManager sm = new SiddhiManager();
++        ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
++
++        final AtomicInteger counter = new AtomicInteger();
++        plan.addCallback("downSample", new QueryCallback() {
++            @Override
++            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
++                EventPrinter.print(timeStamp, inEvents, removeEvents);
++                int count = counter.incrementAndGet();
++                if (count == 1) {
++                    Assert.assertEquals(6000L, inEvents[0].getData(9));
++                } else if(count == 2) {
++                    Assert.assertEquals(6000L, inEvents[0].getData(9));
++                }
++            }
++        });
++        InputHandler input = plan.getInputHandler("jmxMetric");
++
++        plan.start();
++        sendEvent(input);
++        Thread.sleep(100);
++        sendEvent(input);
++        Thread.sleep(1000);
++        sendEvent(input);
++        Thread.sleep(1000);
++        sendEvent(input);
++        Thread.sleep(200);
++        plan.shutdown();
++    }
++
++    // send 3 events
++    private void sendEvent(InputHandler input) throws Exception {
++        int len = 3;
++        Event[] events = new Event[len];
++        for (int i = 0; i < len; i++) {
++            long externalTs = System.currentTimeMillis();
++            // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
++            events[i] = new Event(externalTs + i, new Object[] {
++                    15.0,
++                    15,
++                    1000,
++                    2000L,
++                    externalTs + i
++            });
++        }
++
++        for (Event e : events) {
++            input.send(e);
++        }
++    }
++}
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
+index 7c01a90..3dd43ac 100644
+--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
+@@ -21,6 +21,7 @@
+ 
+ import org.apache.eagle.common.DateTimeUtil;
+ import org.junit.Assert;
++import org.junit.Before;
+ import org.junit.Test;
+ import org.wso2.siddhi.core.ExecutionPlanRuntime;
+ import org.wso2.siddhi.core.SiddhiManager;
+@@ -28,6 +29,8 @@
+ import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+ import org.wso2.siddhi.core.stream.input.InputHandler;
+ import org.wso2.siddhi.core.util.EventPrinter;
++import org.wso2.siddhi.core.util.persistence.InMemoryPersistenceStore;
++import org.wso2.siddhi.core.util.persistence.PersistenceStore;
+ 
+ import java.io.FileInputStream;
+ import java.io.FileOutputStream;
+@@ -241,10 +244,10 @@ public void testTimeSlideWindow() throws Exception{
+     private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby(){
+         SiddhiManager siddhiManager = new SiddhiManager();
+         String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
+-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
++        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,30 sec)"
+                 + " select user, timeStamp, count(user) as cnt"
+-//                + " group by user"
+-//                + " having cnt > 2"
++                + " group by user"
++                + " having cnt > 2"
+                + " insert all events into outputStream;";
+ 
+         ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
+@@ -295,14 +298,61 @@ public void testExternalTimeSlideWindowWithGroupby() throws Exception{
+         restoredRuntime.shutdown();
+     }
+ 
++    private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby_2(SiddhiManager siddhiManager){
++        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
++        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,300 sec)"
++                + " select user, timeStamp, count(user) as cnt"
++                + " group by user"
++                + " insert all events into outputStream;";
++
++        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
++
++        executionPlanRuntime.addCallback("query1", new QueryCallback() {
++            @Override
++            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
++                EventPrinter.print(timeStamp, inEvents, removeEvents);
++            }
++        });
++        return executionPlanRuntime;
++    }
++
++    @Test
++    public void testExternalTimeSlideWindowWithGroupby_2() throws Exception{
++        SiddhiManager siddhiManager = new SiddhiManager();
++        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
++        siddhiManager.setPersistenceStore(persistenceStore);
++
++        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
++        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
++        executionPlanRuntime.start();
++        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
++        inputHandler.send(new Object[]{curTime, "user", "open"});
++        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
++        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
++        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
++        Thread.sleep(100);
++        executionPlanRuntime.persist();
++        executionPlanRuntime.shutdown();
++        ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
++        inputHandler = restoredRuntime.getInputHandler("testStream");
++        restoredRuntime.start();
++        restoredRuntime.restoreLastRevision();
++        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
++        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
++        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
++        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
++        Thread.sleep(1000);
++        restoredRuntime.shutdown();
++    }
++
+     private ExecutionPlanRuntime setupRuntimeForInternalTimeSlideWindowWithGroupby(){
+         SiddhiManager siddhiManager = new SiddhiManager();
+         String cseEventStream = "define stream testStream (user string, cmd string);";
+-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(100 sec)"
++        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(5 sec)"
+                 + " select user, count(user) as cnt"
+                 + " group by user"
+                 + " having cnt > 2"
+-                + " insert all events into outputStream;";
++                + " insert events into outputStream;";
+ 
+         ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
+ 
+@@ -356,4 +406,96 @@ public void testInternalTimeSlideWindowWithGroupby() throws Exception{
+         input.close();
+         restoredRuntime.shutdown();
+     }
++
++    private int count;
++    private boolean eventArrived;
++
++    @Before
++    public void init() {
++        count = 0;
++        eventArrived = false;
++    }
++
++    @Test
++    public void persistenceTest7() throws InterruptedException {
++        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
++
++        SiddhiManager siddhiManager = new SiddhiManager();
++        siddhiManager.setPersistenceStore(persistenceStore);
++
++        String executionPlan = "" +
++                "@plan:name('Test') " +
++                "" +
++                "define stream StockStream (symbol string, price float, volume int, timestamp long);" +
++                "" +
++                "@info(name = 'query1')" +
++                "from StockStream#window.externalTime(timestamp,30 sec) " +
++                "select symbol, price, sum(volume) as totalVol, count(symbol) as cnt " +
++                "group by symbol " +
++                "insert into OutStream ";
++
++        QueryCallback queryCallback = new QueryCallback() {
++            @Override
++            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
++                EventPrinter.print(timeStamp, inEvents, removeEvents);
++                eventArrived = true;
++                for (Event inEvent : inEvents) {
++                    count++;
++                    Assert.assertTrue("IBM".equals(inEvent.getData(0)) || "WSO2".equals(inEvent.getData(0)));
++                    if (count == 5) {
++                        Assert.assertEquals(400l, inEvent.getData(2));
++                    }
++                    if (count == 6) {
++                        Assert.assertEquals(200l, inEvent.getData(2));
++                    }
++                }
++            }
++        };
++
++        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
++        executionPlanRuntime.addCallback("query1", queryCallback);
++
++        InputHandler inputHandler = executionPlanRuntime.getInputHandler("StockStream");
++        executionPlanRuntime.start();
++        long currentTime = 0;
++
++        inputHandler.send(new Object[]{"IBM", 75.1f, 100, currentTime + 1000});
++        Thread.sleep(100);
++        inputHandler.send(new Object[]{"WSO2", 75.2f, 100, currentTime + 2000});
++        Thread.sleep(100);
++        inputHandler.send(new Object[]{"IBM", 75.3f, 100, currentTime + 3000});
++
++        Thread.sleep(500);
++        Assert.assertTrue(eventArrived);
++        Assert.assertEquals(3, count);
++
++        //persisting
++        Thread.sleep(500);
++        executionPlanRuntime.persist();
++
++        //restarting execution plan
++        Thread.sleep(500);
++        executionPlanRuntime.shutdown();
++        executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
++        executionPlanRuntime.addCallback("query1", queryCallback);
++        inputHandler = executionPlanRuntime.getInputHandler("StockStream");
++        executionPlanRuntime.start();
++
++        //loading
++        executionPlanRuntime.restoreLastRevision();
++
++        inputHandler.send(new Object[]{"IBM", 75.4f, 100, currentTime + 4000});
++        Thread.sleep(100);
++        inputHandler.send(new Object[]{"IBM", 75.5f, 100, currentTime + 5000});
++        Thread.sleep(100);
++        inputHandler.send(new Object[]{"WSO2", 75.6f, 100, currentTime + 6000});
++
++        //shutdown execution plan
++        Thread.sleep(500);
++        executionPlanRuntime.shutdown();
++
++        Assert.assertEquals(count, 6);
++        Assert.assertEquals(true, eventArrived);
++
++    }
+ }
+
+From f711c8a472375fc07e4ccec9985f08670df299a0 Mon Sep 17 00:00:00 2001
+From: yonzhang <yonzhang@ebay.com>
+Date: Tue, 22 Dec 2015 23:08:58 -0800
+Subject: [PATCH 2/3] policy distribution reporter
+
+---
+ .../eagle/alert/policy/DynamicPolicyLoader.java    | 25 ++++++
+ .../policy/PolicyDistStatsDAOLogReporter.java      | 47 +++++++++++
+ .../policy/PolicyDistributionReportMethods.java    | 27 +++++++
+ .../alert/policy/PolicyDistributionStats.java      | 74 +++++++++++++++++
+ .../alert/policy/PolicyDistributionStatsDAO.java   | 26 ++++++
+ .../policy/PolicyEvaluatorServiceProvider.java     |  2 +-
+ .../org/apache/eagle/executor/AlertExecutor.java   | 28 ++++++-
+ .../policy/TestPolicyDistributionUpdater.java      | 93 ++++++++++++++++++++++
+ .../eagle/alert/policy/TestPolicyPartitioner.java  | 29 +++++++
+ 9 files changed, 347 insertions(+), 4 deletions(-)
+ create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
+ create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
+ create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
+ create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
+ create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
+ create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
+
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
+index 707cd30..b9c74db 100644
+--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
+@@ -42,13 +42,20 @@
+ import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+ import com.typesafe.config.Config;
+ 
++/**
++ * JVM level singleton, so multiple alert executor may share the same policy loader
++ */
+ public class DynamicPolicyLoader {
+ 	private static final Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
+ 	
+ 	private final int defaultInitialDelayMillis = 30*1000;
+ 	private final int defaultDelayMillis = 60*1000;
+ 	private final boolean defaultIgnoreDeleteFromSource = true;
++    /**
++     * one alertExecutorId may be paralleled by data, that is why there is a list of PolicyLifecycleMethods
++     */
+ 	private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>> policyChangeListeners = new CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>();
++    private volatile CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>> policyDistributionUpdaters = new CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>>();
+ 	private static DynamicPolicyLoader instance = new DynamicPolicyLoader();
+ 	private volatile boolean initialized = false;
+ 	
+@@ -60,6 +67,15 @@ public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMetho
+ 			policyChangeListeners.get(alertExecutorId).add(alertExecutor);
+ 		}
+ 	}
++
++    public void addPolicyDistributionUpdateListener(String alertExecutorId, PolicyDistributionReportMethods policyDistUpdater){
++        synchronized(policyDistributionUpdaters) {
++            if(policyDistributionUpdaters.get(alertExecutorId) == null) {
++                policyDistributionUpdaters.put(alertExecutorId, new ArrayList<PolicyDistributionReportMethods>());
++            }
++            policyDistributionUpdaters.get(alertExecutorId).add(policyDistUpdater);
++        }
++    }
+ 	
+ 	public static DynamicPolicyLoader getInstance(){
+ 		return instance;
+@@ -128,6 +144,14 @@ public void handleEvent(EventType eventType, PollResult lastResult,
+ 						}
+ 					}
+ 				}
++
++                // notify policyDistributionUpdaters
++                for(Map.Entry<String, List<PolicyDistributionReportMethods>> entry : policyDistributionUpdaters.entrySet()){
++                    String alertExecutorId = entry.getKey();
++                    for(PolicyDistributionReportMethods policyDistributionUpdateMethod : entry.getValue()){
++                        policyDistributionUpdateMethod.report();
++                    }
++                }
+ 			}
+ 			private String trimPartitionNum(String alertExecutorId){
+ 				int i = alertExecutorId.lastIndexOf('_');
+@@ -205,6 +229,7 @@ public PollResult poll(boolean initial, Object checkPoint) throws Exception {
+ 			}
+ 			
+ 			cachedAlertDefs = newAlertDefs;
++
+ 			return PollResult.createIncremental(added, changed, deleted, new Date().getTime());
+ 		}
+ 	}
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
+new file mode 100644
+index 0000000..47bfea2
+--- /dev/null
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
+@@ -0,0 +1,47 @@
++/*
++ *
++ *  * Licensed to the Apache Software Foundation (ASF) under one or more
++ *  * contributor license agreements.  See the NOTICE file distributed with
++ *  * this work for additional information regarding copyright ownership.
++ *  * The ASF licenses this file to You under the Apache License, Version 2.0
++ *  * (the "License"); you may not use this file except in compliance with
++ *  * the License.  You may obtain a copy of the License at
++ *  *
++ *  *    http://www.apache.org/licenses/LICENSE-2.0
++ *  *
++ *  * Unless required by applicable law or agreed to in writing, software
++ *  * distributed under the License is distributed on an "AS IS" BASIS,
++ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ *  * See the License for the specific language governing permissions and
++ *  * limitations under the License.
++ *
++ */
++
++package org.apache.eagle.alert.policy;
++
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.Set;
++
++/**
++ * just append log
++ */
++public class PolicyDistStatsDAOLogReporter implements PolicyDistributionStatsDAO{
++    private static Logger LOG = LoggerFactory.getLogger(PolicyDistStatsDAOLogReporter.class);
++
++    @Override
++    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
++        if(policyIds != null){
++            StringBuilder sb = new StringBuilder();
++            sb.append("policyDistirbutionStats for " + policyGroupId + "[" + "total: " + policyIds.size() + ", ");
++            for(String policyId : policyIds){
++                sb.append(policyId + ",");
++            }
++            sb.append("]");
++            LOG.info(sb.toString());
++        }else{
++            LOG.warn("No policies are assigned to " + policyGroupId);
++        }
++    }
++}
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
+new file mode 100644
+index 0000000..1cc14cc
+--- /dev/null
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
+@@ -0,0 +1,27 @@
++/*
++ *
++ *  * Licensed to the Apache Software Foundation (ASF) under one or more
++ *  * contributor license agreements.  See the NOTICE file distributed with
++ *  * this work for additional information regarding copyright ownership.
++ *  * The ASF licenses this file to You under the Apache License, Version 2.0
++ *  * (the "License"); you may not use this file except in compliance with
++ *  * the License.  You may obtain a copy of the License at
++ *  *
++ *  *    http://www.apache.org/licenses/LICENSE-2.0
++ *  *
++ *  * Unless required by applicable law or agreed to in writing, software
++ *  * distributed under the License is distributed on an "AS IS" BASIS,
++ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ *  * See the License for the specific language governing permissions and
++ *  * limitations under the License.
++ *
++ */
++
++package org.apache.eagle.alert.policy;
++
++/**
++ * framework will call report method, it is AlertExecutor's responsibility to report policy distribution information
++ */
++public interface PolicyDistributionReportMethods {
++    public void report();
++}
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
+new file mode 100644
+index 0000000..d6bbec9
+--- /dev/null
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
+@@ -0,0 +1,74 @@
++/*
++ *
++ *  * Licensed to the Apache Software Foundation (ASF) under one or more
++ *  * contributor license agreements.  See the NOTICE file distributed with
++ *  * this work for additional information regarding copyright ownership.
++ *  * The ASF licenses this file to You under the Apache License, Version 2.0
++ *  * (the "License"); you may not use this file except in compliance with
++ *  * the License.  You may obtain a copy of the License at
++ *  *
++ *  *    http://www.apache.org/licenses/LICENSE-2.0
++ *  *
++ *  * Unless required by applicable law or agreed to in writing, software
++ *  * distributed under the License is distributed on an "AS IS" BASIS,
++ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ *  * See the License for the specific language governing permissions and
++ *  * limitations under the License.
++ *
++ */
++
++package org.apache.eagle.alert.policy;
++
++/**
++ * fields for a policy distribution statistics
++ */
++public class PolicyDistributionStats {
++    private String policyGroupId;   // normally groupId is alertExecutorId
++    private String policyId;
++    private boolean markDown;       // true if this policy is marked down, false otherwise
++    private double weight;          // comprehensive factors for policy overhead
++
++    public String getPolicyId() {
++        return policyId;
++    }
++
++    public void setPolicyId(String policyId) {
++        this.policyId = policyId;
++    }
++
++    public boolean isMarkDown() {
++        return markDown;
++    }
++
++    public void setMarkDown(boolean markDown) {
++        this.markDown = markDown;
++    }
++
++    public double getWeight() {
++        return weight;
++    }
++
++    public void setWeight(double weight) {
++        this.weight = weight;
++    }
++
++    public String getPolicyGroupId() {
++        return policyGroupId;
++    }
++
++    public void setPolicyGroupId(String policyGroupId) {
++        this.policyGroupId = policyGroupId;
++    }
++
++    public String toString(){
++        StringBuilder sb = new StringBuilder();
++        sb.append("policyId:");
++        sb.append(policyId);
++        sb.append(", markDown:");
++        sb.append(markDown);
++        sb.append(", weight:");
++        sb.append(weight);
++
++        return sb.toString();
++    }
++}
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
+new file mode 100644
+index 0000000..5abe303
+--- /dev/null
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
+@@ -0,0 +1,26 @@
++/*
++ *
++ *  * Licensed to the Apache Software Foundation (ASF) under one or more
++ *  * contributor license agreements.  See the NOTICE file distributed with
++ *  * this work for additional information regarding copyright ownership.
++ *  * The ASF licenses this file to You under the Apache License, Version 2.0
++ *  * (the "License"); you may not use this file except in compliance with
++ *  * the License.  You may obtain a copy of the License at
++ *  *
++ *  *    http://www.apache.org/licenses/LICENSE-2.0
++ *  *
++ *  * Unless required by applicable law or agreed to in writing, software
++ *  * distributed under the License is distributed on an "AS IS" BASIS,
++ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ *  * See the License for the specific language governing permissions and
++ *  * limitations under the License.
++ *
++ */
++
++package org.apache.eagle.alert.policy;
++
++import java.util.Set;
++
++public interface PolicyDistributionStatsDAO {
++    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds);
++}
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
+index 412010c..03060b9 100644
+--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
+@@ -30,7 +30,7 @@
+  *   - if policy is created, then invoke onPolicyCreated
+  *   - if policy is deleted, then invoke onPolicyDeleted
+  *   - if policy is updated, then invoke onPolicyUpdated
+- * - for policy update, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
++ * - for policy report, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
+  * - specify # of executors for this alert executor id
+  * - dynamically balance # of policies evaluated by each alert executor
+  *   - use zookeeper to balance. eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+index 8b928c3..887a881 100644
+--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+@@ -23,6 +23,7 @@
+ import org.apache.eagle.alert.common.AlertConstants;
+ import org.apache.eagle.alert.config.AbstractPolicyDefinition;
+ import org.apache.eagle.alert.dao.AlertDefinitionDAO;
++import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
+ import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl;
+ import org.apache.eagle.alert.entity.AlertAPIEntity;
+ import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+@@ -49,7 +50,7 @@
+ import java.util.Map;
+ import java.util.Map.Entry;
+ 
+-public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler {
++public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler, PolicyDistributionReportMethods {
+ 	private static final long serialVersionUID = 1L;
+ 
+ 	private static final Logger LOG = LoggerFactory.getLogger(AlertExecutor.class);
+@@ -107,6 +108,10 @@ public PolicyPartitioner getPolicyPartitioner() {
+ 	public AlertDefinitionDAO getAlertDefinitionDao() {
+ 		return alertDefinitionDao;
+ 	}
++
++    public Map<String, PolicyEvaluator> getPolicyEvaluators(){
++        return policyEvaluators;
++    }
+ 	
+ 	@Override
+ 	public void prepareConfig(Config config) {
+@@ -135,10 +140,19 @@ public void initMetricReportor() {
+ 		dimensionsMap = new HashMap<>();
+ 	}
+ 
++    /**
++     * for unit test purpose only
++     * @param config
++     * @return
++     */
++    public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
++        return new AlertStreamSchemaDAOImpl(config);
++    }
++
+ 	@Override
+ 	public void init() {
+ 		// initialize StreamMetadataManager before it is used
+-		StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAOImpl(config));
++		StreamMetadataManager.getInstance().init(config, getAlertStreamSchemaDAO(config));
+ 		// for each AlertDefinition, to create a PolicyEvaluator
+ 		Map<String, PolicyEvaluator> tmpPolicyEvaluators = new HashMap<String, PolicyEvaluator>();
+ 		
+@@ -169,7 +183,9 @@ else if (initialAlertDefs.get(alertExecutorId) != null) {
+ 		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
+ 		
+ 		policyLoader.init(initialAlertDefs, alertDefinitionDao, config);
+-		policyLoader.addPolicyChangeListener(alertExecutorId + "_" + partitionSeq, this);
++        String fullQualifiedAlertExecutorId = alertExecutorId + "_" + partitionSeq;
++		policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this);
++        policyLoader.addPolicyDistributionUpdateListener(fullQualifiedAlertExecutorId, this);
+ 		LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions);
+         LOG.info("All policy evaluators: " + policyEvaluators);
+ 		
+@@ -370,4 +386,10 @@ public void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts) {
+ 			}
+ 		}
+ 	}
++
++    @Override
++    public void report() {
++        PolicyDistStatsDAOLogReporter appender = new PolicyDistStatsDAOLogReporter();
++        appender.reportPolicyMembership(alertExecutorId, policyEvaluators.keySet());
++    }
+ }
+\ No newline at end of file
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
+new file mode 100644
+index 0000000..579b9f0
+--- /dev/null
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
+@@ -0,0 +1,93 @@
++/*
++ *
++ *  * Licensed to the Apache Software Foundation (ASF) under one or more
++ *  * contributor license agreements.  See the NOTICE file distributed with
++ *  * this work for additional information regarding copyright ownership.
++ *  * The ASF licenses this file to You under the Apache License, Version 2.0
++ *  * (the "License"); you may not use this file except in compliance with
++ *  * the License.  You may obtain a copy of the License at
++ *  *
++ *  *    http://www.apache.org/licenses/LICENSE-2.0
++ *  *
++ *  * Unless required by applicable law or agreed to in writing, software
++ *  * distributed under the License is distributed on an "AS IS" BASIS,
++ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ *  * See the License for the specific language governing permissions and
++ *  * limitations under the License.
++ *
++ */
++
++package org.apache.eagle.alert.policy;
++
++import com.typesafe.config.Config;
++import com.typesafe.config.ConfigFactory;
++import junit.framework.Assert;
++import org.apache.eagle.alert.common.AlertConstants;
++import org.apache.eagle.alert.dao.AlertDefinitionDAO;
++import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl;
++import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
++import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
++import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
++import org.apache.eagle.executor.AlertExecutor;
++import org.apache.eagle.service.client.EagleServiceConnector;
++import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.Arrays;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++public class TestPolicyDistributionUpdater {
++    private static Logger LOG = LoggerFactory.getLogger(TestPolicyDistributionUpdater.class);
++
++    @Test
++    public void testPolicyDistributionReporter() throws Exception{
++        AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, null)) {
++            @Override
++            public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(String site, String dataSource) throws Exception {
++                final AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
++                entity.setTags(new HashMap<String, String>() {{
++                    put(AlertConstants.POLICY_TYPE, "siddhiCEPEngine");
++                    put(AlertConstants.POLICY_ID, "policyId_1");
++                }});
++                Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
++                map.put("alertExecutorId_1", new HashMap<String, AlertDefinitionAPIEntity>() {{
++                    put("policyId_1", entity);
++                }});
++                entity.setPolicyDef("{\"type\":\"siddhiCEPEngine\",\"expression\":\"from testStream select name insert into outputStream ;\"}");
++                return map;
++            }
++        };
++
++        AlertExecutor alertExecutor = new AlertExecutor("alertExecutorId_1", new DefaultPolicyPartitioner(), 1, 0, alertDao, new String[]{"testStream"}){
++            public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
++                return new AlertStreamSchemaDAO(){
++                    @Override
++                    public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
++                        AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
++                        entity.setTags(new HashMap<String, String>(){{
++                            put("dataSource", "UnitTest");
++                            put("streamName", "testStream");
++                            put("attrName", "name");
++                        }});
++                        entity.setAttrType("string");
++                        return Arrays.asList(entity);
++                    }
++                };
++            }
++
++            @Override
++            public void report() {
++                Assert.assertEquals(1, getPolicyEvaluators().size());
++                LOG.info("successuflly reported");
++            }
++        };
++
++        Config config = ConfigFactory.load();
++        alertExecutor.prepareConfig(config);
++        alertExecutor.init();
++        Thread.sleep(100);
++    }
++}
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
+new file mode 100644
+index 0000000..b04d206
+--- /dev/null
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
+@@ -0,0 +1,29 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *    http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.eagle.alert.policy;
++
++import org.junit.Test;
++
++public class TestPolicyPartitioner {
++    @Test
++    public void test(){
++        DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
++        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
++        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
++        System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
++    }
++}
+
+From 68a4e0be8e6dc07bbb57f7c95141d4824a4ba1f7 Mon Sep 17 00:00:00 2001
+From: yonzhang <yonzhang@ebay.com>
+Date: Wed, 23 Dec 2015 12:01:17 -0800
+Subject: [PATCH 3/3] fine tune policy distro
+
+---
+ .../eagle/alert/policy/DynamicPolicyLoader.java    |  5 +--
+ .../policy/PolicyDistStatsDAOLogReporter.java      | 47 --------------------
+ .../alert/policy/PolicyDistroStatsLogReporter.java | 50 ++++++++++++++++++++++
+ .../org/apache/eagle/executor/AlertExecutor.java   |  6 +--
+ .../org/apache/eagle/service/hbase/Tables.java     |  1 +
+ 5 files changed, 56 insertions(+), 53 deletions(-)
+ delete mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
+ create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
+
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
+index b9c74db..0e76deb 100644
+--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
+@@ -52,7 +52,7 @@
+ 	private final int defaultDelayMillis = 60*1000;
+ 	private final boolean defaultIgnoreDeleteFromSource = true;
+     /**
+-     * one alertExecutorId may be paralleled by data, that is why there is a list of PolicyLifecycleMethods
++     * one alertExecutor may have multiple instances, that is why there is a list of PolicyLifecycleMethods
+      */
+ 	private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>> policyChangeListeners = new CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>();
+     private volatile CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>> policyDistributionUpdaters = new CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>>();
+@@ -68,7 +68,7 @@ public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMetho
+ 		}
+ 	}
+ 
+-    public void addPolicyDistributionUpdateListener(String alertExecutorId, PolicyDistributionReportMethods policyDistUpdater){
++    public void addPolicyDistributionReporter(String alertExecutorId, PolicyDistributionReportMethods policyDistUpdater){
+         synchronized(policyDistributionUpdaters) {
+             if(policyDistributionUpdaters.get(alertExecutorId) == null) {
+                 policyDistributionUpdaters.put(alertExecutorId, new ArrayList<PolicyDistributionReportMethods>());
+@@ -147,7 +147,6 @@ public void handleEvent(EventType eventType, PollResult lastResult,
+ 
+                 // notify policyDistributionUpdaters
+                 for(Map.Entry<String, List<PolicyDistributionReportMethods>> entry : policyDistributionUpdaters.entrySet()){
+-                    String alertExecutorId = entry.getKey();
+                     for(PolicyDistributionReportMethods policyDistributionUpdateMethod : entry.getValue()){
+                         policyDistributionUpdateMethod.report();
+                     }
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
+deleted file mode 100644
+index 47bfea2..0000000
+--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
++++ /dev/null
+@@ -1,47 +0,0 @@
+-/*
+- *
+- *  * Licensed to the Apache Software Foundation (ASF) under one or more
+- *  * contributor license agreements.  See the NOTICE file distributed with
+- *  * this work for additional information regarding copyright ownership.
+- *  * The ASF licenses this file to You under the Apache License, Version 2.0
+- *  * (the "License"); you may not use this file except in compliance with
+- *  * the License.  You may obtain a copy of the License at
+- *  *
+- *  *    http://www.apache.org/licenses/LICENSE-2.0
+- *  *
+- *  * Unless required by applicable law or agreed to in writing, software
+- *  * distributed under the License is distributed on an "AS IS" BASIS,
+- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- *  * See the License for the specific language governing permissions and
+- *  * limitations under the License.
+- *
+- */
+-
+-package org.apache.eagle.alert.policy;
+-
+-import org.slf4j.Logger;
+-import org.slf4j.LoggerFactory;
+-
+-import java.util.Set;
+-
+-/**
+- * just append log
+- */
+-public class PolicyDistStatsDAOLogReporter implements PolicyDistributionStatsDAO{
+-    private static Logger LOG = LoggerFactory.getLogger(PolicyDistStatsDAOLogReporter.class);
+-
+-    @Override
+-    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
+-        if(policyIds != null){
+-            StringBuilder sb = new StringBuilder();
+-            sb.append("policyDistirbutionStats for " + policyGroupId + "[" + "total: " + policyIds.size() + ", ");
+-            for(String policyId : policyIds){
+-                sb.append(policyId + ",");
+-            }
+-            sb.append("]");
+-            LOG.info(sb.toString());
+-        }else{
+-            LOG.warn("No policies are assigned to " + policyGroupId);
+-        }
+-    }
+-}
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
+new file mode 100644
+index 0000000..96a28f1
+--- /dev/null
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
+@@ -0,0 +1,50 @@
++/*
++ *
++ *  * Licensed to the Apache Software Foundation (ASF) under one or more
++ *  * contributor license agreements.  See the NOTICE file distributed with
++ *  * this work for additional information regarding copyright ownership.
++ *  * The ASF licenses this file to You under the Apache License, Version 2.0
++ *  * (the "License"); you may not use this file except in compliance with
++ *  * the License.  You may obtain a copy of the License at
++ *  *
++ *  *    http://www.apache.org/licenses/LICENSE-2.0
++ *  *
++ *  * Unless required by applicable law or agreed to in writing, software
++ *  * distributed under the License is distributed on an "AS IS" BASIS,
++ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ *  * See the License for the specific language governing permissions and
++ *  * limitations under the License.
++ *
++ */
++
++package org.apache.eagle.alert.policy;
++
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.Set;
++
++/**
++ * just append log
++ */
++public class PolicyDistroStatsLogReporter implements PolicyDistributionStatsDAO{
++    private static Logger LOG = LoggerFactory.getLogger(PolicyDistroStatsLogReporter.class);
++
++    @Override
++    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
++        if(policyIds != null){
++            StringBuilder sb = new StringBuilder();
++            sb.append("policyDistributionStats for " + policyGroupId +", total: " + policyIds.size() + ", [");
++            for(String policyId : policyIds){
++                sb.append(policyId + ",");
++            }
++            if(policyIds.size() > 0){
++                sb.deleteCharAt(sb.length()-1);
++            }
++            sb.append("]");
++            LOG.info(sb.toString());
++        }else{
++            LOG.warn("No policies are assigned to " + policyGroupId);
++        }
++    }
++}
+diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+index 887a881..124633c 100644
+--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
++++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+@@ -185,7 +185,7 @@ else if (initialAlertDefs.get(alertExecutorId) != null) {
+ 		policyLoader.init(initialAlertDefs, alertDefinitionDao, config);
+         String fullQualifiedAlertExecutorId = alertExecutorId + "_" + partitionSeq;
+ 		policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this);
+-        policyLoader.addPolicyDistributionUpdateListener(fullQualifiedAlertExecutorId, this);
++        policyLoader.addPolicyDistributionReporter(fullQualifiedAlertExecutorId, this);
+ 		LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions);
+         LOG.info("All policy evaluators: " + policyEvaluators);
+ 		
+@@ -389,7 +389,7 @@ public void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts) {
+ 
+     @Override
+     public void report() {
+-        PolicyDistStatsDAOLogReporter appender = new PolicyDistStatsDAOLogReporter();
+-        appender.reportPolicyMembership(alertExecutorId, policyEvaluators.keySet());
++        PolicyDistroStatsLogReporter appender = new PolicyDistroStatsLogReporter();
++        appender.reportPolicyMembership(alertExecutorId + "_" + partitionSeq, policyEvaluators.keySet());
+     }
+ }
+\ No newline at end of file
+diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
+index a31e27e..f8941bc 100644
+--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
++++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
+@@ -40,6 +40,7 @@ public Tables(){
+         // for security
+         tables.add("hiveResourceSensitivity");
+         tables.add("fileSensitivity");
++        tables.add("ipzone");
+         tables.add("mlmodel");
+         tables.add("userprofile");
+     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
index 707cd30..0e76deb 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
@@ -42,13 +42,20 @@ import com.netflix.config.PolledConfigurationSource;
 import com.sun.jersey.client.impl.CopyOnWriteHashMap;
 import com.typesafe.config.Config;
 
+/**
+ * JVM level singleton, so multiple alert executor may share the same policy loader
+ */
 public class DynamicPolicyLoader {
 	private static final Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
 	
 	private final int defaultInitialDelayMillis = 30*1000;
 	private final int defaultDelayMillis = 60*1000;
 	private final boolean defaultIgnoreDeleteFromSource = true;
+    /**
+     * one alertExecutor may have multiple instances, that is why there is a list of PolicyLifecycleMethods
+     */
 	private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>> policyChangeListeners = new CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>();
+    private volatile CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>> policyDistributionUpdaters = new CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>>();
 	private static DynamicPolicyLoader instance = new DynamicPolicyLoader();
 	private volatile boolean initialized = false;
 	
@@ -60,6 +67,15 @@ public class DynamicPolicyLoader {
 			policyChangeListeners.get(alertExecutorId).add(alertExecutor);
 		}
 	}
+
+    public void addPolicyDistributionReporter(String alertExecutorId, PolicyDistributionReportMethods policyDistUpdater){
+        synchronized(policyDistributionUpdaters) {
+            if(policyDistributionUpdaters.get(alertExecutorId) == null) {
+                policyDistributionUpdaters.put(alertExecutorId, new ArrayList<PolicyDistributionReportMethods>());
+            }
+            policyDistributionUpdaters.get(alertExecutorId).add(policyDistUpdater);
+        }
+    }
 	
 	public static DynamicPolicyLoader getInstance(){
 		return instance;
@@ -128,6 +144,13 @@ public class DynamicPolicyLoader {
 						}
 					}
 				}
+
+                // notify policyDistributionUpdaters
+                for(Map.Entry<String, List<PolicyDistributionReportMethods>> entry : policyDistributionUpdaters.entrySet()){
+                    for(PolicyDistributionReportMethods policyDistributionUpdateMethod : entry.getValue()){
+                        policyDistributionUpdateMethod.report();
+                    }
+                }
 			}
 			private String trimPartitionNum(String alertExecutorId){
 				int i = alertExecutorId.lastIndexOf('_');
@@ -205,6 +228,7 @@ public class DynamicPolicyLoader {
 			}
 			
 			cachedAlertDefs = newAlertDefs;
+
 			return PollResult.createIncremental(added, changed, deleted, new Date().getTime());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
new file mode 100644
index 0000000..47bfea2
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
@@ -0,0 +1,47 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.policy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/**
+ * just append log
+ */
+public class PolicyDistStatsDAOLogReporter implements PolicyDistributionStatsDAO{
+    private static Logger LOG = LoggerFactory.getLogger(PolicyDistStatsDAOLogReporter.class);
+
+    @Override
+    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
+        if(policyIds != null){
+            StringBuilder sb = new StringBuilder();
+            sb.append("policyDistirbutionStats for " + policyGroupId + "[" + "total: " + policyIds.size() + ", ");
+            for(String policyId : policyIds){
+                sb.append(policyId + ",");
+            }
+            sb.append("]");
+            LOG.info(sb.toString());
+        }else{
+            LOG.warn("No policies are assigned to " + policyGroupId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
new file mode 100644
index 0000000..1cc14cc
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
@@ -0,0 +1,27 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.policy;
+
+/**
+ * framework will call report method, it is AlertExecutor's responsibility to report policy distribution information
+ */
+public interface PolicyDistributionReportMethods {
+    public void report();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
new file mode 100644
index 0000000..d6bbec9
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
@@ -0,0 +1,74 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.policy;
+
+/**
+ * fields for a policy distribution statistics
+ */
+public class PolicyDistributionStats {
+    private String policyGroupId;   // normally groupId is alertExecutorId
+    private String policyId;
+    private boolean markDown;       // true if this policy is marked down, false otherwise
+    private double weight;          // comprehensive factors for policy overhead
+
+    public String getPolicyId() {
+        return policyId;
+    }
+
+    public void setPolicyId(String policyId) {
+        this.policyId = policyId;
+    }
+
+    public boolean isMarkDown() {
+        return markDown;
+    }
+
+    public void setMarkDown(boolean markDown) {
+        this.markDown = markDown;
+    }
+
+    public double getWeight() {
+        return weight;
+    }
+
+    public void setWeight(double weight) {
+        this.weight = weight;
+    }
+
+    public String getPolicyGroupId() {
+        return policyGroupId;
+    }
+
+    public void setPolicyGroupId(String policyGroupId) {
+        this.policyGroupId = policyGroupId;
+    }
+
+    public String toString(){
+        StringBuilder sb = new StringBuilder();
+        sb.append("policyId:");
+        sb.append(policyId);
+        sb.append(", markDown:");
+        sb.append(markDown);
+        sb.append(", weight:");
+        sb.append(weight);
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
new file mode 100644
index 0000000..5abe303
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
@@ -0,0 +1,26 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.policy;
+
+import java.util.Set;
+
+public interface PolicyDistributionStatsDAO {
+    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
new file mode 100644
index 0000000..96a28f1
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
@@ -0,0 +1,50 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.policy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/**
+ * just append log
+ */
+public class PolicyDistroStatsLogReporter implements PolicyDistributionStatsDAO{
+    private static Logger LOG = LoggerFactory.getLogger(PolicyDistroStatsLogReporter.class);
+
+    @Override
+    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
+        if(policyIds != null){
+            StringBuilder sb = new StringBuilder();
+            sb.append("policyDistributionStats for " + policyGroupId +", total: " + policyIds.size() + ", [");
+            for(String policyId : policyIds){
+                sb.append(policyId + ",");
+            }
+            if(policyIds.size() > 0){
+                sb.deleteCharAt(sb.length()-1);
+            }
+            sb.append("]");
+            LOG.info(sb.toString());
+        }else{
+            LOG.warn("No policies are assigned to " + policyGroupId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
index 412010c..03060b9 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
@@ -30,7 +30,7 @@ import com.fasterxml.jackson.databind.Module;
  *   - if policy is created, then invoke onPolicyCreated
  *   - if policy is deleted, then invoke onPolicyDeleted
  *   - if policy is updated, then invoke onPolicyUpdated
- * - for policy update, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
+ * - for policy report, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
  * - specify # of executors for this alert executor id
  * - dynamically balance # of policies evaluated by each alert executor
  *   - use zookeeper to balance. eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
index 8b928c3..124633c 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.alert.common.AlertConstants;
 import org.apache.eagle.alert.config.AbstractPolicyDefinition;
 import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
 import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl;
 import org.apache.eagle.alert.entity.AlertAPIEntity;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
@@ -49,7 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler {
+public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler, PolicyDistributionReportMethods {
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(AlertExecutor.class);
@@ -107,6 +108,10 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 	public AlertDefinitionDAO getAlertDefinitionDao() {
 		return alertDefinitionDao;
 	}
+
+    public Map<String, PolicyEvaluator> getPolicyEvaluators(){
+        return policyEvaluators;
+    }
 	
 	@Override
 	public void prepareConfig(Config config) {
@@ -135,10 +140,19 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 		dimensionsMap = new HashMap<>();
 	}
 
+    /**
+     * for unit test purpose only
+     * @param config
+     * @return
+     */
+    public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
+        return new AlertStreamSchemaDAOImpl(config);
+    }
+
 	@Override
 	public void init() {
 		// initialize StreamMetadataManager before it is used
-		StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAOImpl(config));
+		StreamMetadataManager.getInstance().init(config, getAlertStreamSchemaDAO(config));
 		// for each AlertDefinition, to create a PolicyEvaluator
 		Map<String, PolicyEvaluator> tmpPolicyEvaluators = new HashMap<String, PolicyEvaluator>();
 		
@@ -169,7 +183,9 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
 		
 		policyLoader.init(initialAlertDefs, alertDefinitionDao, config);
-		policyLoader.addPolicyChangeListener(alertExecutorId + "_" + partitionSeq, this);
+        String fullQualifiedAlertExecutorId = alertExecutorId + "_" + partitionSeq;
+		policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this);
+        policyLoader.addPolicyDistributionReporter(fullQualifiedAlertExecutorId, this);
 		LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions);
         LOG.info("All policy evaluators: " + policyEvaluators);
 		
@@ -370,4 +386,10 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 			}
 		}
 	}
+
+    @Override
+    public void report() {
+        PolicyDistroStatsLogReporter appender = new PolicyDistroStatsLogReporter();
+        appender.reportPolicyMembership(alertExecutorId + "_" + partitionSeq, policyEvaluators.keySet());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
new file mode 100644
index 0000000..579b9f0
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
@@ -0,0 +1,93 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.policy;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl;
+import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.executor.AlertExecutor;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestPolicyDistributionUpdater {
+    private static Logger LOG = LoggerFactory.getLogger(TestPolicyDistributionUpdater.class);
+
+    @Test
+    public void testPolicyDistributionReporter() throws Exception{
+        AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, null)) {
+            @Override
+            public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(String site, String dataSource) throws Exception {
+                final AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
+                entity.setTags(new HashMap<String, String>() {{
+                    put(AlertConstants.POLICY_TYPE, "siddhiCEPEngine");
+                    put(AlertConstants.POLICY_ID, "policyId_1");
+                }});
+                Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
+                map.put("alertExecutorId_1", new HashMap<String, AlertDefinitionAPIEntity>() {{
+                    put("policyId_1", entity);
+                }});
+                entity.setPolicyDef("{\"type\":\"siddhiCEPEngine\",\"expression\":\"from testStream select name insert into outputStream ;\"}");
+                return map;
+            }
+        };
+
+        AlertExecutor alertExecutor = new AlertExecutor("alertExecutorId_1", new DefaultPolicyPartitioner(), 1, 0, alertDao, new String[]{"testStream"}){
+            public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
+                return new AlertStreamSchemaDAO(){
+                    @Override
+                    public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
+                        AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
+                        entity.setTags(new HashMap<String, String>(){{
+                            put("dataSource", "UnitTest");
+                            put("streamName", "testStream");
+                            put("attrName", "name");
+                        }});
+                        entity.setAttrType("string");
+                        return Arrays.asList(entity);
+                    }
+                };
+            }
+
+            @Override
+            public void report() {
+                Assert.assertEquals(1, getPolicyEvaluators().size());
+                LOG.info("successuflly reported");
+            }
+        };
+
+        Config config = ConfigFactory.load();
+        alertExecutor.prepareConfig(config);
+        alertExecutor.init();
+        Thread.sleep(100);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
new file mode 100644
index 0000000..b04d206
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import org.junit.Test;
+
+public class TestPolicyPartitioner {
+    @Test
+    public void test(){
+        DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
+        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
+        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
+        System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/feaeabc1/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
new file mode 100644
index 0000000..adcd728
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
@@ -0,0 +1,98 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.state;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import junit.framework.Assert;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+public class TestAggregation {
+    @Test
+    public void test01DownSampling() throws Exception {
+        String stream = "define stream jmxMetric(cpu double, memory int, bytesIn int, bytesOut long, timestamp long);";
+        String query = "@info(name = 'downSample') "
+                + "from jmxMetric#window.timeBatch(1 sec) "
+                + "select "
+                + " min(cpu) as minCpu, max(cpu) as maxCpu, avg(cpu) as avgCpu, "
+                + " min(memory) as minMem, max(memory) as maxMem, avg(memory) as avgMem, "
+                + " min(bytesIn) as minBytesIn, max(bytesIn) as maxBytesIn, avg(bytesIn) as avgBytesIn, sum(bytesIn) as totalBytesIn, "
+                + " min(bytesOut) as minBytesOut, max(bytesOut) as maxBytesOut, avg(bytesOut) as avgBytesOut, sum(bytesOut) as totalBytesOut, "
+                + " timestamp as timeWindowEnds "
+                + " INSERT  INTO tmp;";
+
+        SiddhiManager sm = new SiddhiManager();
+        ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
+
+        final AtomicInteger counter = new AtomicInteger();
+        plan.addCallback("downSample", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+                int count = counter.incrementAndGet();
+                if (count == 1) {
+                    Assert.assertEquals(6000L, inEvents[0].getData(9));
+                } else if(count == 2) {
+                    Assert.assertEquals(6000L, inEvents[0].getData(9));
+                }
+            }
+        });
+        InputHandler input = plan.getInputHandler("jmxMetric");
+
+        plan.start();
+        sendEvent(input);
+        Thread.sleep(100);
+        sendEvent(input);
+        Thread.sleep(1000);
+        sendEvent(input);
+        Thread.sleep(1000);
+        sendEvent(input);
+        Thread.sleep(200);
+        plan.shutdown();
+    }
+
+    // send 3 events
+    private void sendEvent(InputHandler input) throws Exception {
+        int len = 3;
+        Event[] events = new Event[len];
+        for (int i = 0; i < len; i++) {
+            long externalTs = System.currentTimeMillis();
+            // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
+            events[i] = new Event(externalTs + i, new Object[] {
+                    15.0,
+                    15,
+                    1000,
+                    2000L,
+                    externalTs + i
+            });
+        }
+
+        for (Event e : events) {
+            input.send(e);
+        }
+    }
+}


Mime
View raw message