eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject incubator-eagle git commit: remove patch file
Date Wed, 30 Dec 2015 05:05:06 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master feaeabc18 -> dea3112d3


remove patch file


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/dea3112d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/dea3112d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/dea3112d

Branch: refs/heads/master
Commit: dea3112d3772b8242f5831d50959b19ab6981c97
Parents: feaeabc
Author: yonzhang <yonzhang@ebay.com>
Authored: Tue Dec 29 21:04:54 2015 -0800
Committer: yonzhang <yonzhang@ebay.com>
Committed: Tue Dec 29 21:04:54 2015 -0800

----------------------------------------------------------------------
 38.patch.1 | 1011 -------------------------------------------------------
 1 file changed, 1011 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dea3112d/38.patch.1
----------------------------------------------------------------------
diff --git a/38.patch.1 b/38.patch.1
deleted file mode 100644
index 2e1dbf4..0000000
--- a/38.patch.1
+++ /dev/null
@@ -1,1011 +0,0 @@
-From dccd015e7e2c22d19360b552745e8524cd55b294 Mon Sep 17 00:00:00 2001
-From: yonzhang <yonzhang@ebay.com>
-Date: Tue, 22 Dec 2015 14:57:47 -0800
-Subject: [PATCH 1/3] test siddhi aggregation and snapshot and restore
-
----
- .../apache/eagle/alert/state/TestAggregation.java  |  98 +++++++++++++
- .../state/TestSiddhiStateSnapshotAndRestore.java   | 152 ++++++++++++++++++++-
- 2 files changed, 245 insertions(+), 5 deletions(-)
- create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
-
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
-new file mode 100644
-index 0000000..adcd728
---- /dev/null
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
-@@ -0,0 +1,98 @@
-+/*
-+ *
-+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
-+ *  * contributor license agreements.  See the NOTICE file distributed with
-+ *  * this work for additional information regarding copyright ownership.
-+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
-+ *  * (the "License"); you may not use this file except in compliance with
-+ *  * the License.  You may obtain a copy of the License at
-+ *  *
-+ *  *    http://www.apache.org/licenses/LICENSE-2.0
-+ *  *
-+ *  * Unless required by applicable law or agreed to in writing, software
-+ *  * distributed under the License is distributed on an "AS IS" BASIS,
-+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ *  * See the License for the specific language governing permissions and
-+ *  * limitations under the License.
-+ *
-+ */
-+
-+package org.apache.eagle.alert.state;
-+
-+import java.util.concurrent.atomic.AtomicInteger;
-+
-+import org.junit.Test;
-+import org.wso2.siddhi.core.ExecutionPlanRuntime;
-+import org.wso2.siddhi.core.SiddhiManager;
-+import org.wso2.siddhi.core.event.Event;
-+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-+import org.wso2.siddhi.core.stream.input.InputHandler;
-+
-+import junit.framework.Assert;
-+import org.wso2.siddhi.core.util.EventPrinter;
-+
-+public class TestAggregation {
-+    @Test
-+    public void test01DownSampling() throws Exception {
-+        String stream = "define stream jmxMetric(cpu double, memory int, bytesIn int, bytesOut long, timestamp long);";
-+        String query = "@info(name = 'downSample') "
-+                + "from jmxMetric#window.timeBatch(1 sec) "
-+                + "select "
-+                + " min(cpu) as minCpu, max(cpu) as maxCpu, avg(cpu) as avgCpu, "
-+                + " min(memory) as minMem, max(memory) as maxMem, avg(memory) as avgMem, "
-+                + " min(bytesIn) as minBytesIn, max(bytesIn) as maxBytesIn, avg(bytesIn) as avgBytesIn, sum(bytesIn) as totalBytesIn, "
-+                + " min(bytesOut) as minBytesOut, max(bytesOut) as maxBytesOut, avg(bytesOut) as avgBytesOut, sum(bytesOut) as totalBytesOut, "
-+                + " timestamp as timeWindowEnds "
-+                + " INSERT  INTO tmp;";
-+
-+        SiddhiManager sm = new SiddhiManager();
-+        ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
-+
-+        final AtomicInteger counter = new AtomicInteger();
-+        plan.addCallback("downSample", new QueryCallback() {
-+            @Override
-+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-+                EventPrinter.print(timeStamp, inEvents, removeEvents);
-+                int count = counter.incrementAndGet();
-+                if (count == 1) {
-+                    Assert.assertEquals(6000L, inEvents[0].getData(9));
-+                } else if(count == 2) {
-+                    Assert.assertEquals(6000L, inEvents[0].getData(9));
-+                }
-+            }
-+        });
-+        InputHandler input = plan.getInputHandler("jmxMetric");
-+
-+        plan.start();
-+        sendEvent(input);
-+        Thread.sleep(100);
-+        sendEvent(input);
-+        Thread.sleep(1000);
-+        sendEvent(input);
-+        Thread.sleep(1000);
-+        sendEvent(input);
-+        Thread.sleep(200);
-+        plan.shutdown();
-+    }
-+
-+    // send 3 events
-+    private void sendEvent(InputHandler input) throws Exception {
-+        int len = 3;
-+        Event[] events = new Event[len];
-+        for (int i = 0; i < len; i++) {
-+            long externalTs = System.currentTimeMillis();
-+            // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
-+            events[i] = new Event(externalTs + i, new Object[] {
-+                    15.0,
-+                    15,
-+                    1000,
-+                    2000L,
-+                    externalTs + i
-+            });
-+        }
-+
-+        for (Event e : events) {
-+            input.send(e);
-+        }
-+    }
-+}
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
-index 7c01a90..3dd43ac 100644
---- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
-@@ -21,6 +21,7 @@
- 
- import org.apache.eagle.common.DateTimeUtil;
- import org.junit.Assert;
-+import org.junit.Before;
- import org.junit.Test;
- import org.wso2.siddhi.core.ExecutionPlanRuntime;
- import org.wso2.siddhi.core.SiddhiManager;
-@@ -28,6 +29,8 @@
- import org.wso2.siddhi.core.query.output.callback.QueryCallback;
- import org.wso2.siddhi.core.stream.input.InputHandler;
- import org.wso2.siddhi.core.util.EventPrinter;
-+import org.wso2.siddhi.core.util.persistence.InMemoryPersistenceStore;
-+import org.wso2.siddhi.core.util.persistence.PersistenceStore;
- 
- import java.io.FileInputStream;
- import java.io.FileOutputStream;
-@@ -241,10 +244,10 @@ public void testTimeSlideWindow() throws Exception{
-     private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby(){
-         SiddhiManager siddhiManager = new SiddhiManager();
-         String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
--        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
-+        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,30 sec)"
-                 + " select user, timeStamp, count(user) as cnt"
--//                + " group by user"
--//                + " having cnt > 2"
-+                + " group by user"
-+                + " having cnt > 2"
-                + " insert all events into outputStream;";
- 
-         ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-@@ -295,14 +298,61 @@ public void testExternalTimeSlideWindowWithGroupby() throws Exception{
-         restoredRuntime.shutdown();
-     }
- 
-+    private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby_2(SiddhiManager siddhiManager){
-+        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
-+        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,300 sec)"
-+                + " select user, timeStamp, count(user) as cnt"
-+                + " group by user"
-+                + " insert all events into outputStream;";
-+
-+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-+
-+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-+            @Override
-+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-+                EventPrinter.print(timeStamp, inEvents, removeEvents);
-+            }
-+        });
-+        return executionPlanRuntime;
-+    }
-+
-+    @Test
-+    public void testExternalTimeSlideWindowWithGroupby_2() throws Exception{
-+        SiddhiManager siddhiManager = new SiddhiManager();
-+        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
-+        siddhiManager.setPersistenceStore(persistenceStore);
-+
-+        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
-+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
-+        executionPlanRuntime.start();
-+        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
-+        inputHandler.send(new Object[]{curTime, "user", "open"});
-+        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
-+        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
-+        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
-+        Thread.sleep(100);
-+        executionPlanRuntime.persist();
-+        executionPlanRuntime.shutdown();
-+        ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
-+        inputHandler = restoredRuntime.getInputHandler("testStream");
-+        restoredRuntime.start();
-+        restoredRuntime.restoreLastRevision();
-+        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
-+        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
-+        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
-+        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
-+        Thread.sleep(1000);
-+        restoredRuntime.shutdown();
-+    }
-+
-     private ExecutionPlanRuntime setupRuntimeForInternalTimeSlideWindowWithGroupby(){
-         SiddhiManager siddhiManager = new SiddhiManager();
-         String cseEventStream = "define stream testStream (user string, cmd string);";
--        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(100 sec)"
-+        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(5 sec)"
-                 + " select user, count(user) as cnt"
-                 + " group by user"
-                 + " having cnt > 2"
--                + " insert all events into outputStream;";
-+                + " insert events into outputStream;";
- 
-         ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
- 
-@@ -356,4 +406,96 @@ public void testInternalTimeSlideWindowWithGroupby() throws Exception{
-         input.close();
-         restoredRuntime.shutdown();
-     }
-+
-+    private int count;
-+    private boolean eventArrived;
-+
-+    @Before
-+    public void init() {
-+        count = 0;
-+        eventArrived = false;
-+    }
-+
-+    @Test
-+    public void persistenceTest7() throws InterruptedException {
-+        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
-+
-+        SiddhiManager siddhiManager = new SiddhiManager();
-+        siddhiManager.setPersistenceStore(persistenceStore);
-+
-+        String executionPlan = "" +
-+                "@plan:name('Test') " +
-+                "" +
-+                "define stream StockStream (symbol string, price float, volume int, timestamp long);" +
-+                "" +
-+                "@info(name = 'query1')" +
-+                "from StockStream#window.externalTime(timestamp,30 sec) " +
-+                "select symbol, price, sum(volume) as totalVol, count(symbol) as cnt " +
-+                "group by symbol " +
-+                "insert into OutStream ";
-+
-+        QueryCallback queryCallback = new QueryCallback() {
-+            @Override
-+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-+                EventPrinter.print(timeStamp, inEvents, removeEvents);
-+                eventArrived = true;
-+                for (Event inEvent : inEvents) {
-+                    count++;
-+                    Assert.assertTrue("IBM".equals(inEvent.getData(0)) || "WSO2".equals(inEvent.getData(0)));
-+                    if (count == 5) {
-+                        Assert.assertEquals(400l, inEvent.getData(2));
-+                    }
-+                    if (count == 6) {
-+                        Assert.assertEquals(200l, inEvent.getData(2));
-+                    }
-+                }
-+            }
-+        };
-+
-+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-+        executionPlanRuntime.addCallback("query1", queryCallback);
-+
-+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("StockStream");
-+        executionPlanRuntime.start();
-+        long currentTime = 0;
-+
-+        inputHandler.send(new Object[]{"IBM", 75.1f, 100, currentTime + 1000});
-+        Thread.sleep(100);
-+        inputHandler.send(new Object[]{"WSO2", 75.2f, 100, currentTime + 2000});
-+        Thread.sleep(100);
-+        inputHandler.send(new Object[]{"IBM", 75.3f, 100, currentTime + 3000});
-+
-+        Thread.sleep(500);
-+        Assert.assertTrue(eventArrived);
-+        Assert.assertEquals(3, count);
-+
-+        //persisting
-+        Thread.sleep(500);
-+        executionPlanRuntime.persist();
-+
-+        //restarting execution plan
-+        Thread.sleep(500);
-+        executionPlanRuntime.shutdown();
-+        executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-+        executionPlanRuntime.addCallback("query1", queryCallback);
-+        inputHandler = executionPlanRuntime.getInputHandler("StockStream");
-+        executionPlanRuntime.start();
-+
-+        //loading
-+        executionPlanRuntime.restoreLastRevision();
-+
-+        inputHandler.send(new Object[]{"IBM", 75.4f, 100, currentTime + 4000});
-+        Thread.sleep(100);
-+        inputHandler.send(new Object[]{"IBM", 75.5f, 100, currentTime + 5000});
-+        Thread.sleep(100);
-+        inputHandler.send(new Object[]{"WSO2", 75.6f, 100, currentTime + 6000});
-+
-+        //shutdown execution plan
-+        Thread.sleep(500);
-+        executionPlanRuntime.shutdown();
-+
-+        Assert.assertEquals(count, 6);
-+        Assert.assertEquals(true, eventArrived);
-+
-+    }
- }
-
-From f711c8a472375fc07e4ccec9985f08670df299a0 Mon Sep 17 00:00:00 2001
-From: yonzhang <yonzhang@ebay.com>
-Date: Tue, 22 Dec 2015 23:08:58 -0800
-Subject: [PATCH 2/3] policy distribution reporter
-
----
- .../eagle/alert/policy/DynamicPolicyLoader.java    | 25 ++++++
- .../policy/PolicyDistStatsDAOLogReporter.java      | 47 +++++++++++
- .../policy/PolicyDistributionReportMethods.java    | 27 +++++++
- .../alert/policy/PolicyDistributionStats.java      | 74 +++++++++++++++++
- .../alert/policy/PolicyDistributionStatsDAO.java   | 26 ++++++
- .../policy/PolicyEvaluatorServiceProvider.java     |  2 +-
- .../org/apache/eagle/executor/AlertExecutor.java   | 28 ++++++-
- .../policy/TestPolicyDistributionUpdater.java      | 93 ++++++++++++++++++++++
- .../eagle/alert/policy/TestPolicyPartitioner.java  | 29 +++++++
- 9 files changed, 347 insertions(+), 4 deletions(-)
- create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
- create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
- create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
- create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
- create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
- create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
-
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
-index 707cd30..b9c74db 100644
---- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
-@@ -42,13 +42,20 @@
- import com.sun.jersey.client.impl.CopyOnWriteHashMap;
- import com.typesafe.config.Config;
- 
-+/**
-+ * JVM level singleton, so multiple alert executor may share the same policy loader
-+ */
- public class DynamicPolicyLoader {
- 	private static final Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
- 	
- 	private final int defaultInitialDelayMillis = 30*1000;
- 	private final int defaultDelayMillis = 60*1000;
- 	private final boolean defaultIgnoreDeleteFromSource = true;
-+    /**
-+     * one alertExecutorId may be paralleled by data, that is why there is a list of PolicyLifecycleMethods
-+     */
- 	private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>> policyChangeListeners = new CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>();
-+    private volatile CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>> policyDistributionUpdaters = new CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>>();
- 	private static DynamicPolicyLoader instance = new DynamicPolicyLoader();
- 	private volatile boolean initialized = false;
- 	
-@@ -60,6 +67,15 @@ public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMetho
- 			policyChangeListeners.get(alertExecutorId).add(alertExecutor);
- 		}
- 	}
-+
-+    public void addPolicyDistributionUpdateListener(String alertExecutorId, PolicyDistributionReportMethods policyDistUpdater){
-+        synchronized(policyDistributionUpdaters) {
-+            if(policyDistributionUpdaters.get(alertExecutorId) == null) {
-+                policyDistributionUpdaters.put(alertExecutorId, new ArrayList<PolicyDistributionReportMethods>());
-+            }
-+            policyDistributionUpdaters.get(alertExecutorId).add(policyDistUpdater);
-+        }
-+    }
- 	
- 	public static DynamicPolicyLoader getInstance(){
- 		return instance;
-@@ -128,6 +144,14 @@ public void handleEvent(EventType eventType, PollResult lastResult,
- 						}
- 					}
- 				}
-+
-+                // notify policyDistributionUpdaters
-+                for(Map.Entry<String, List<PolicyDistributionReportMethods>> entry : policyDistributionUpdaters.entrySet()){
-+                    String alertExecutorId = entry.getKey();
-+                    for(PolicyDistributionReportMethods policyDistributionUpdateMethod : entry.getValue()){
-+                        policyDistributionUpdateMethod.report();
-+                    }
-+                }
- 			}
- 			private String trimPartitionNum(String alertExecutorId){
- 				int i = alertExecutorId.lastIndexOf('_');
-@@ -205,6 +229,7 @@ public PollResult poll(boolean initial, Object checkPoint) throws Exception {
- 			}
- 			
- 			cachedAlertDefs = newAlertDefs;
-+
- 			return PollResult.createIncremental(added, changed, deleted, new Date().getTime());
- 		}
- 	}
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
-new file mode 100644
-index 0000000..47bfea2
---- /dev/null
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
-@@ -0,0 +1,47 @@
-+/*
-+ *
-+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
-+ *  * contributor license agreements.  See the NOTICE file distributed with
-+ *  * this work for additional information regarding copyright ownership.
-+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
-+ *  * (the "License"); you may not use this file except in compliance with
-+ *  * the License.  You may obtain a copy of the License at
-+ *  *
-+ *  *    http://www.apache.org/licenses/LICENSE-2.0
-+ *  *
-+ *  * Unless required by applicable law or agreed to in writing, software
-+ *  * distributed under the License is distributed on an "AS IS" BASIS,
-+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ *  * See the License for the specific language governing permissions and
-+ *  * limitations under the License.
-+ *
-+ */
-+
-+package org.apache.eagle.alert.policy;
-+
-+import org.slf4j.Logger;
-+import org.slf4j.LoggerFactory;
-+
-+import java.util.Set;
-+
-+/**
-+ * just append log
-+ */
-+public class PolicyDistStatsDAOLogReporter implements PolicyDistributionStatsDAO{
-+    private static Logger LOG = LoggerFactory.getLogger(PolicyDistStatsDAOLogReporter.class);
-+
-+    @Override
-+    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
-+        if(policyIds != null){
-+            StringBuilder sb = new StringBuilder();
-+            sb.append("policyDistirbutionStats for " + policyGroupId + "[" + "total: " + policyIds.size() + ", ");
-+            for(String policyId : policyIds){
-+                sb.append(policyId + ",");
-+            }
-+            sb.append("]");
-+            LOG.info(sb.toString());
-+        }else{
-+            LOG.warn("No policies are assigned to " + policyGroupId);
-+        }
-+    }
-+}
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
-new file mode 100644
-index 0000000..1cc14cc
---- /dev/null
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
-@@ -0,0 +1,27 @@
-+/*
-+ *
-+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
-+ *  * contributor license agreements.  See the NOTICE file distributed with
-+ *  * this work for additional information regarding copyright ownership.
-+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
-+ *  * (the "License"); you may not use this file except in compliance with
-+ *  * the License.  You may obtain a copy of the License at
-+ *  *
-+ *  *    http://www.apache.org/licenses/LICENSE-2.0
-+ *  *
-+ *  * Unless required by applicable law or agreed to in writing, software
-+ *  * distributed under the License is distributed on an "AS IS" BASIS,
-+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ *  * See the License for the specific language governing permissions and
-+ *  * limitations under the License.
-+ *
-+ */
-+
-+package org.apache.eagle.alert.policy;
-+
-+/**
-+ * framework will call report method, it is AlertExecutor's responsibility to report policy distribution information
-+ */
-+public interface PolicyDistributionReportMethods {
-+    public void report();
-+}
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
-new file mode 100644
-index 0000000..d6bbec9
---- /dev/null
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
-@@ -0,0 +1,74 @@
-+/*
-+ *
-+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
-+ *  * contributor license agreements.  See the NOTICE file distributed with
-+ *  * this work for additional information regarding copyright ownership.
-+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
-+ *  * (the "License"); you may not use this file except in compliance with
-+ *  * the License.  You may obtain a copy of the License at
-+ *  *
-+ *  *    http://www.apache.org/licenses/LICENSE-2.0
-+ *  *
-+ *  * Unless required by applicable law or agreed to in writing, software
-+ *  * distributed under the License is distributed on an "AS IS" BASIS,
-+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ *  * See the License for the specific language governing permissions and
-+ *  * limitations under the License.
-+ *
-+ */
-+
-+package org.apache.eagle.alert.policy;
-+
-+/**
-+ * fields for a policy distribution statistics
-+ */
-+public class PolicyDistributionStats {
-+    private String policyGroupId;   // normally groupId is alertExecutorId
-+    private String policyId;
-+    private boolean markDown;       // true if this policy is marked down, false otherwise
-+    private double weight;          // comprehensive factors for policy overhead
-+
-+    public String getPolicyId() {
-+        return policyId;
-+    }
-+
-+    public void setPolicyId(String policyId) {
-+        this.policyId = policyId;
-+    }
-+
-+    public boolean isMarkDown() {
-+        return markDown;
-+    }
-+
-+    public void setMarkDown(boolean markDown) {
-+        this.markDown = markDown;
-+    }
-+
-+    public double getWeight() {
-+        return weight;
-+    }
-+
-+    public void setWeight(double weight) {
-+        this.weight = weight;
-+    }
-+
-+    public String getPolicyGroupId() {
-+        return policyGroupId;
-+    }
-+
-+    public void setPolicyGroupId(String policyGroupId) {
-+        this.policyGroupId = policyGroupId;
-+    }
-+
-+    public String toString(){
-+        StringBuilder sb = new StringBuilder();
-+        sb.append("policyId:");
-+        sb.append(policyId);
-+        sb.append(", markDown:");
-+        sb.append(markDown);
-+        sb.append(", weight:");
-+        sb.append(weight);
-+
-+        return sb.toString();
-+    }
-+}
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
-new file mode 100644
-index 0000000..5abe303
---- /dev/null
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
-@@ -0,0 +1,26 @@
-+/*
-+ *
-+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
-+ *  * contributor license agreements.  See the NOTICE file distributed with
-+ *  * this work for additional information regarding copyright ownership.
-+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
-+ *  * (the "License"); you may not use this file except in compliance with
-+ *  * the License.  You may obtain a copy of the License at
-+ *  *
-+ *  *    http://www.apache.org/licenses/LICENSE-2.0
-+ *  *
-+ *  * Unless required by applicable law or agreed to in writing, software
-+ *  * distributed under the License is distributed on an "AS IS" BASIS,
-+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ *  * See the License for the specific language governing permissions and
-+ *  * limitations under the License.
-+ *
-+ */
-+
-+package org.apache.eagle.alert.policy;
-+
-+import java.util.Set;
-+
-+public interface PolicyDistributionStatsDAO {
-+    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds);
-+}
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
-index 412010c..03060b9 100644
---- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
-@@ -30,7 +30,7 @@
-  *   - if policy is created, then invoke onPolicyCreated
-  *   - if policy is deleted, then invoke onPolicyDeleted
-  *   - if policy is updated, then invoke onPolicyUpdated
-- * - for policy update, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
-+ * - for policy report, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
-  * - specify # of executors for this alert executor id
-  * - dynamically balance # of policies evaluated by each alert executor
-  *   - use zookeeper to balance. eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
-index 8b928c3..887a881 100644
---- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
-@@ -23,6 +23,7 @@
- import org.apache.eagle.alert.common.AlertConstants;
- import org.apache.eagle.alert.config.AbstractPolicyDefinition;
- import org.apache.eagle.alert.dao.AlertDefinitionDAO;
-+import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
- import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl;
- import org.apache.eagle.alert.entity.AlertAPIEntity;
- import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-@@ -49,7 +50,7 @@
- import java.util.Map;
- import java.util.Map.Entry;
- 
--public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler {
-+public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler, PolicyDistributionReportMethods {
- 	private static final long serialVersionUID = 1L;
- 
- 	private static final Logger LOG = LoggerFactory.getLogger(AlertExecutor.class);
-@@ -107,6 +108,10 @@ public PolicyPartitioner getPolicyPartitioner() {
- 	public AlertDefinitionDAO getAlertDefinitionDao() {
- 		return alertDefinitionDao;
- 	}
-+
-+    public Map<String, PolicyEvaluator> getPolicyEvaluators(){
-+        return policyEvaluators;
-+    }
- 	
- 	@Override
- 	public void prepareConfig(Config config) {
-@@ -135,10 +140,19 @@ public void initMetricReportor() {
- 		dimensionsMap = new HashMap<>();
- 	}
- 
-+    /**
-+     * for unit test purpose only
-+     * @param config
-+     * @return
-+     */
-+    public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
-+        return new AlertStreamSchemaDAOImpl(config);
-+    }
-+
- 	@Override
- 	public void init() {
- 		// initialize StreamMetadataManager before it is used
--		StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAOImpl(config));
-+		StreamMetadataManager.getInstance().init(config, getAlertStreamSchemaDAO(config));
- 		// for each AlertDefinition, to create a PolicyEvaluator
- 		Map<String, PolicyEvaluator> tmpPolicyEvaluators = new HashMap<String, PolicyEvaluator>();
- 		
-@@ -169,7 +183,9 @@ else if (initialAlertDefs.get(alertExecutorId) != null) {
- 		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
- 		
- 		policyLoader.init(initialAlertDefs, alertDefinitionDao, config);
--		policyLoader.addPolicyChangeListener(alertExecutorId + "_" + partitionSeq, this);
-+        String fullQualifiedAlertExecutorId = alertExecutorId + "_" + partitionSeq;
-+		policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this);
-+        policyLoader.addPolicyDistributionUpdateListener(fullQualifiedAlertExecutorId, this);
- 		LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions);
-         LOG.info("All policy evaluators: " + policyEvaluators);
- 		
-@@ -370,4 +386,10 @@ public void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts) {
- 			}
- 		}
- 	}
-+
-+    @Override
-+    public void report() {
-+        PolicyDistStatsDAOLogReporter appender = new PolicyDistStatsDAOLogReporter();
-+        appender.reportPolicyMembership(alertExecutorId, policyEvaluators.keySet());
-+    }
- }
-\ No newline at end of file
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
-new file mode 100644
-index 0000000..579b9f0
---- /dev/null
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
-@@ -0,0 +1,93 @@
-+/*
-+ *
-+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
-+ *  * contributor license agreements.  See the NOTICE file distributed with
-+ *  * this work for additional information regarding copyright ownership.
-+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
-+ *  * (the "License"); you may not use this file except in compliance with
-+ *  * the License.  You may obtain a copy of the License at
-+ *  *
-+ *  *    http://www.apache.org/licenses/LICENSE-2.0
-+ *  *
-+ *  * Unless required by applicable law or agreed to in writing, software
-+ *  * distributed under the License is distributed on an "AS IS" BASIS,
-+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ *  * See the License for the specific language governing permissions and
-+ *  * limitations under the License.
-+ *
-+ */
-+
-+package org.apache.eagle.alert.policy;
-+
-+import com.typesafe.config.Config;
-+import com.typesafe.config.ConfigFactory;
-+import junit.framework.Assert;
-+import org.apache.eagle.alert.common.AlertConstants;
-+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
-+import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl;
-+import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
-+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-+import org.apache.eagle.executor.AlertExecutor;
-+import org.apache.eagle.service.client.EagleServiceConnector;
-+import org.junit.Test;
-+import org.slf4j.Logger;
-+import org.slf4j.LoggerFactory;
-+
-+import java.util.Arrays;
-+import java.util.HashMap;
-+import java.util.List;
-+import java.util.Map;
-+
-+public class TestPolicyDistributionUpdater {
-+    private static Logger LOG = LoggerFactory.getLogger(TestPolicyDistributionUpdater.class);
-+
-+    @Test
-+    public void testPolicyDistributionReporter() throws Exception{
-+        AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, null)) {
-+            @Override
-+            public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(String site, String dataSource) throws Exception {
-+                final AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
-+                entity.setTags(new HashMap<String, String>() {{
-+                    put(AlertConstants.POLICY_TYPE, "siddhiCEPEngine");
-+                    put(AlertConstants.POLICY_ID, "policyId_1");
-+                }});
-+                Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
-+                map.put("alertExecutorId_1", new HashMap<String, AlertDefinitionAPIEntity>() {{
-+                    put("policyId_1", entity);
-+                }});
-+                entity.setPolicyDef("{\"type\":\"siddhiCEPEngine\",\"expression\":\"from testStream select name insert into outputStream ;\"}");
-+                return map;
-+            }
-+        };
-+
-+        AlertExecutor alertExecutor = new AlertExecutor("alertExecutorId_1", new DefaultPolicyPartitioner(), 1, 0, alertDao, new String[]{"testStream"}){
-+            public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
-+                return new AlertStreamSchemaDAO(){
-+                    @Override
-+                    public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
-+                        AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-+                        entity.setTags(new HashMap<String, String>(){{
-+                            put("dataSource", "UnitTest");
-+                            put("streamName", "testStream");
-+                            put("attrName", "name");
-+                        }});
-+                        entity.setAttrType("string");
-+                        return Arrays.asList(entity);
-+                    }
-+                };
-+            }
-+
-+            @Override
-+            public void report() {
-+                Assert.assertEquals(1, getPolicyEvaluators().size());
-+                LOG.info("successuflly reported");
-+            }
-+        };
-+
-+        Config config = ConfigFactory.load();
-+        alertExecutor.prepareConfig(config);
-+        alertExecutor.init();
-+        Thread.sleep(100);
-+    }
-+}
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
-new file mode 100644
-index 0000000..b04d206
---- /dev/null
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
-@@ -0,0 +1,29 @@
-+/*
-+ * Licensed to the Apache Software Foundation (ASF) under one or more
-+ * contributor license agreements.  See the NOTICE file distributed with
-+ * this work for additional information regarding copyright ownership.
-+ * The ASF licenses this file to You under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License.  You may obtain a copy of the License at
-+ *
-+ *    http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+package org.apache.eagle.alert.policy;
-+
-+import org.junit.Test;
-+
-+public class TestPolicyPartitioner {
-+    @Test
-+    public void test(){
-+        DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
-+        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
-+        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
-+        System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
-+    }
-+}
-
-From 68a4e0be8e6dc07bbb57f7c95141d4824a4ba1f7 Mon Sep 17 00:00:00 2001
-From: yonzhang <yonzhang@ebay.com>
-Date: Wed, 23 Dec 2015 12:01:17 -0800
-Subject: [PATCH 3/3] fine tune policy distro
-
----
- .../eagle/alert/policy/DynamicPolicyLoader.java    |  5 +--
- .../policy/PolicyDistStatsDAOLogReporter.java      | 47 --------------------
- .../alert/policy/PolicyDistroStatsLogReporter.java | 50 ++++++++++++++++++++++
- .../org/apache/eagle/executor/AlertExecutor.java   |  6 +--
- .../org/apache/eagle/service/hbase/Tables.java     |  1 +
- 5 files changed, 56 insertions(+), 53 deletions(-)
- delete mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
- create mode 100644 eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
-
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
-index b9c74db..0e76deb 100644
---- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
-@@ -52,7 +52,7 @@
- 	private final int defaultDelayMillis = 60*1000;
- 	private final boolean defaultIgnoreDeleteFromSource = true;
-     /**
--     * one alertExecutorId may be paralleled by data, that is why there is a list of PolicyLifecycleMethods
-+     * one alertExecutor may have multiple instances, that is why there is a list of PolicyLifecycleMethods
-      */
- 	private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>> policyChangeListeners = new CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>();
-     private volatile CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>> policyDistributionUpdaters = new CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>>();
-@@ -68,7 +68,7 @@ public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMetho
- 		}
- 	}
- 
--    public void addPolicyDistributionUpdateListener(String alertExecutorId, PolicyDistributionReportMethods policyDistUpdater){
-+    public void addPolicyDistributionReporter(String alertExecutorId, PolicyDistributionReportMethods policyDistUpdater){
-         synchronized(policyDistributionUpdaters) {
-             if(policyDistributionUpdaters.get(alertExecutorId) == null) {
-                 policyDistributionUpdaters.put(alertExecutorId, new ArrayList<PolicyDistributionReportMethods>());
-@@ -147,7 +147,6 @@ public void handleEvent(EventType eventType, PollResult lastResult,
- 
-                 // notify policyDistributionUpdaters
-                 for(Map.Entry<String, List<PolicyDistributionReportMethods>> entry : policyDistributionUpdaters.entrySet()){
--                    String alertExecutorId = entry.getKey();
-                     for(PolicyDistributionReportMethods policyDistributionUpdateMethod : entry.getValue()){
-                         policyDistributionUpdateMethod.report();
-                     }
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
-deleted file mode 100644
-index 47bfea2..0000000
---- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
-+++ /dev/null
-@@ -1,47 +0,0 @@
--/*
-- *
-- *  * Licensed to the Apache Software Foundation (ASF) under one or more
-- *  * contributor license agreements.  See the NOTICE file distributed with
-- *  * this work for additional information regarding copyright ownership.
-- *  * The ASF licenses this file to You under the Apache License, Version 2.0
-- *  * (the "License"); you may not use this file except in compliance with
-- *  * the License.  You may obtain a copy of the License at
-- *  *
-- *  *    http://www.apache.org/licenses/LICENSE-2.0
-- *  *
-- *  * Unless required by applicable law or agreed to in writing, software
-- *  * distributed under the License is distributed on an "AS IS" BASIS,
-- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- *  * See the License for the specific language governing permissions and
-- *  * limitations under the License.
-- *
-- */
--
--package org.apache.eagle.alert.policy;
--
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--import java.util.Set;
--
--/**
-- * just append log
-- */
--public class PolicyDistStatsDAOLogReporter implements PolicyDistributionStatsDAO{
--    private static Logger LOG = LoggerFactory.getLogger(PolicyDistStatsDAOLogReporter.class);
--
--    @Override
--    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
--        if(policyIds != null){
--            StringBuilder sb = new StringBuilder();
--            sb.append("policyDistirbutionStats for " + policyGroupId + "[" + "total: " + policyIds.size() + ", ");
--            for(String policyId : policyIds){
--                sb.append(policyId + ",");
--            }
--            sb.append("]");
--            LOG.info(sb.toString());
--        }else{
--            LOG.warn("No policies are assigned to " + policyGroupId);
--        }
--    }
--}
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
-new file mode 100644
-index 0000000..96a28f1
---- /dev/null
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
-@@ -0,0 +1,50 @@
-+/*
-+ *
-+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
-+ *  * contributor license agreements.  See the NOTICE file distributed with
-+ *  * this work for additional information regarding copyright ownership.
-+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
-+ *  * (the "License"); you may not use this file except in compliance with
-+ *  * the License.  You may obtain a copy of the License at
-+ *  *
-+ *  *    http://www.apache.org/licenses/LICENSE-2.0
-+ *  *
-+ *  * Unless required by applicable law or agreed to in writing, software
-+ *  * distributed under the License is distributed on an "AS IS" BASIS,
-+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ *  * See the License for the specific language governing permissions and
-+ *  * limitations under the License.
-+ *
-+ */
-+
-+package org.apache.eagle.alert.policy;
-+
-+import org.slf4j.Logger;
-+import org.slf4j.LoggerFactory;
-+
-+import java.util.Set;
-+
-+/**
-+ * just append log
-+ */
-+public class PolicyDistroStatsLogReporter implements PolicyDistributionStatsDAO{
-+    private static Logger LOG = LoggerFactory.getLogger(PolicyDistroStatsLogReporter.class);
-+
-+    @Override
-+    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
-+        if(policyIds != null){
-+            StringBuilder sb = new StringBuilder();
-+            sb.append("policyDistributionStats for " + policyGroupId +", total: " + policyIds.size() + ", [");
-+            for(String policyId : policyIds){
-+                sb.append(policyId + ",");
-+            }
-+            if(policyIds.size() > 0){
-+                sb.deleteCharAt(sb.length()-1);
-+            }
-+            sb.append("]");
-+            LOG.info(sb.toString());
-+        }else{
-+            LOG.warn("No policies are assigned to " + policyGroupId);
-+        }
-+    }
-+}
-diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
-index 887a881..124633c 100644
---- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
-+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
-@@ -185,7 +185,7 @@ else if (initialAlertDefs.get(alertExecutorId) != null) {
- 		policyLoader.init(initialAlertDefs, alertDefinitionDao, config);
-         String fullQualifiedAlertExecutorId = alertExecutorId + "_" + partitionSeq;
- 		policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this);
--        policyLoader.addPolicyDistributionUpdateListener(fullQualifiedAlertExecutorId, this);
-+        policyLoader.addPolicyDistributionReporter(fullQualifiedAlertExecutorId, this);
- 		LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions);
-         LOG.info("All policy evaluators: " + policyEvaluators);
- 		
-@@ -389,7 +389,7 @@ public void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts) {
- 
-     @Override
-     public void report() {
--        PolicyDistStatsDAOLogReporter appender = new PolicyDistStatsDAOLogReporter();
--        appender.reportPolicyMembership(alertExecutorId, policyEvaluators.keySet());
-+        PolicyDistroStatsLogReporter appender = new PolicyDistroStatsLogReporter();
-+        appender.reportPolicyMembership(alertExecutorId + "_" + partitionSeq, policyEvaluators.keySet());
-     }
- }
-\ No newline at end of file
-diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
-index a31e27e..f8941bc 100644
---- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
-+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
-@@ -40,6 +40,7 @@ public Tables(){
-         // for security
-         tables.add("hiveResourceSensitivity");
-         tables.add("fileSensitivity");
-+        tables.add("ipzone");
-         tables.add("mlmodel");
-         tables.add("userprofile");
-     }


Mime
View raw message