eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [15/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:23 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
deleted file mode 100644
index 5bf0410..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-public class DedupCacheTest {
-
-    @Test
-    public void testNormal() throws Exception {
-        Config config = ConfigFactory.load();
-        DedupCache dedupCache = new DedupCache(config, "testPublishment");
-
-        StreamDefinition stream = createStream();
-        PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
-
-        String[] states = new String[] {"OPEN", "WARN", "CLOSE"};
-        Random random = new Random();
-        for (int i = 0; i < 20; i++) {
-            AlertStreamEvent event = createEvent(stream, policy, new Object[] {
-                System.currentTimeMillis(), "host1", "testPolicy-host1-01", states[random.nextInt(3)], 0, 0
-            });
-            HashMap<String, String> dedupFieldValues = new HashMap<String, String>();
-            dedupFieldValues.put("alertKey", (String) event.getData()[event.getSchema().getColumnIndex("alertKey")]);
-            List<AlertStreamEvent> result = dedupCache.dedup(event,
-                new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), dedupFieldValues),
-                "state",
-                (String) event.getData()[event.getSchema().getColumnIndex("state")], "closed");
-            System.out.println((i + 1) + " >>>> " + ToStringBuilder.reflectionToString(result));
-        }
-
-        Assert.assertTrue(true);
-    }
-
-    private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
-        AlertStreamEvent event = new AlertStreamEvent();
-        event.setPolicyId(policy.getName());
-        event.setSchema(stream);
-        event.setStreamId(stream.getStreamId());
-        event.setTimestamp(System.currentTimeMillis());
-        event.setCreatedTime(System.currentTimeMillis());
-        event.setData(data);
-        return event;
-    }
-
-    private StreamDefinition createStream() {
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn tsColumn = new StreamColumn();
-        tsColumn.setName("timestamp");
-        tsColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("host");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn alertKeyColumn = new StreamColumn();
-        alertKeyColumn.setName("alertKey");
-        alertKeyColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn stateColumn = new StreamColumn();
-        stateColumn.setName("state");
-        stateColumn.setType(StreamColumn.Type.STRING);
-
-        // dedupCount, dedupFirstOccurrence
-
-        StreamColumn dedupCountColumn = new StreamColumn();
-        dedupCountColumn.setName("dedupCount");
-        dedupCountColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
-        dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
-        dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
-
-        sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
-        sd.setDataSource("testDatasource");
-        sd.setStreamId("testStream");
-        sd.setDescription("test stream");
-        return sd;
-    }
-
-    private PolicyDefinition createPolicy(String streamName, String policyName) {
-        PolicyDefinition pd = new PolicyDefinition();
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        //expression, something like "PT5S,dynamic,1,host"
-        def.setValue("test");
-        def.setType("siddhi");
-        pd.setDefinition(def);
-        pd.setInputStreams(Arrays.asList("inputStream"));
-        pd.setOutputStreams(Arrays.asList("outputStream"));
-        pd.setName(policyName);
-        pd.setDescription(String.format("Test policy for stream %s", streamName));
-
-        StreamPartition sp = new StreamPartition();
-        sp.setStreamId(streamName);
-        sp.setColumns(Arrays.asList("host"));
-        sp.setType(StreamPartition.Type.GROUPBY);
-        pd.addPartition(sp);
-        return pd;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
deleted file mode 100644
index c48df9a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import com.google.common.base.Joiner;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-import static org.apache.eagle.alert.engine.publisher.AlertPublisherTestHelper.*;
-
-public class DefaultDedupWithoutStateTest {
-
-    @Test
-    public void testNormal() throws Exception {
-        //String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue
-        // assume state: OPEN, WARN, CLOSE
-        System.setProperty("config.resource", "/application-mongo-statestore.conf");
-        Config config = ConfigFactory.load();
-        DedupCache dedupCache = new DedupCache(config, "testPublishment");
-        DefaultDeduplicator deduplicator = new DefaultDeduplicator(
-            "PT10S", Arrays.asList(new String[] {"alertKey"}), null, null, dedupCache);
-
-        StreamDefinition stream = createStream();
-        PolicyDefinition policy = createPolicyGroupByStreamId(stream.getStreamId(), "testPolicy");
-
-        int[] hostIndex = new int[] {1, 2, 3};
-        String[] states = new String[] {"OPEN", "WARN", "CLOSE"};
-        Random random = new Random();
-
-        final ConcurrentLinkedDeque<AlertStreamEvent> nonDedupResult = new ConcurrentLinkedDeque<AlertStreamEvent>();
-
-        for (int i = 0; i < 100; i++) {
-            new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    int index = hostIndex[random.nextInt(3)];
-                    AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
-                        System.currentTimeMillis(), "host" + index,
-                        String.format("testPolicy-host%s-01", index),
-                        states[random.nextInt(3)], 0, 0
-                    });
-                    List<AlertStreamEvent> result = deduplicator.dedup(e1);
-                    if (result != null) {
-                        System.out.println(">>>" + Joiner.on(",").join(result));
-                        nonDedupResult.addAll(result);
-                    } else {
-                        System.out.println(">>>" + result);
-                    }
-                }
-
-            }).start();
-        }
-
-        Thread.sleep(1000);
-
-        System.out.println("old size: " + nonDedupResult.size());
-        Assert.assertTrue(nonDedupResult.size() > 0 && nonDedupResult.size() <= 3);
-
-        Thread.sleep(15000);
-
-        for (int i = 0; i < 100; i++) {
-            new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    int index = hostIndex[random.nextInt(3)];
-                    AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
-                        System.currentTimeMillis(), "host" + index,
-                        String.format("testPolicy-host%s-01", index),
-                        states[random.nextInt(3)], 0, 0
-                    });
-                    List<AlertStreamEvent> result = deduplicator.dedup(e1);
-                    if (result != null) {
-                        System.out.println(">>>" + Joiner.on(",").join(result));
-                        nonDedupResult.addAll(result);
-                    } else {
-                        System.out.println(">>>" + result);
-                    }
-                }
-
-            }).start();
-        }
-
-        Thread.sleep(1000);
-
-        System.out.println("new size: " + nonDedupResult.size());
-        Assert.assertTrue(nonDedupResult.size() > 3 && nonDedupResult.size() <= 6);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
deleted file mode 100644
index 297b790..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.eagle.alert.engine.publisher.AlertPublisherTestHelper.*;
-
-public class DefaultDeduplicatorTest {
-
-    @Test
-    public void testNormal() throws Exception {
-        //String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue
-        // assume state: OPEN, WARN, CLOSE
-        System.setProperty("config.resource", "/application-mongo-statestore.conf");
-        Config config = ConfigFactory.load();
-        DedupCache dedupCache = new DedupCache(config, "testPublishment");
-        DefaultDeduplicator deduplicator = new DefaultDeduplicator(
-            "PT1M", Arrays.asList(new String[] {"alertKey"}), "state", "close", dedupCache);
-
-        StreamDefinition stream = createStream();
-        PolicyDefinition policy = createPolicyGroupByStreamId(stream.getStreamId(), "testPolicy");
-
-        AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
-            System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
-        });
-        AlertStreamEvent e2 = createEvent(stream, policy, new Object[] {
-            System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0
-        });
-        AlertStreamEvent e3 = createEvent(stream, policy, new Object[] {
-            System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
-        });
-        AlertStreamEvent e4 = createEvent(stream, policy, new Object[] {
-            System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0
-        });
-        AlertStreamEvent e5 = createEvent(stream, policy, new Object[] {
-            System.currentTimeMillis(), "host1", "testPolicy-host1-01", "CLOSE", 0, 0
-        });
-        AlertStreamEvent e6 = createEvent(stream, policy, new Object[] {
-            System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
-        });
-        AlertStreamEvent e7 = createEvent(stream, policy, new Object[] {
-            System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
-        });
-        AlertStreamEvent e8 = createEvent(stream, policy, new Object[] {
-            System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0
-        });
-
-        List<AlertStreamEvent> allResults = new ArrayList<AlertStreamEvent>();
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                List<AlertStreamEvent> result = deduplicator.dedup(e1);
-                if (result != null) {
-                    allResults.addAll(result);
-                }
-                System.out.println("1 >>>> " + ToStringBuilder.reflectionToString(result));
-            }
-        }).start();
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                List<AlertStreamEvent> result = deduplicator.dedup(e2);
-                if (result != null) {
-                    allResults.addAll(result);
-                }
-                System.out.println("2 >>>> " + ToStringBuilder.reflectionToString(result));
-            }
-        }).start();
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                List<AlertStreamEvent> result = deduplicator.dedup(e3);
-                if (result != null) {
-                    allResults.addAll(result);
-                }
-                System.out.println("3 >>>> " + ToStringBuilder.reflectionToString(result));
-            }
-        }).start();
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                List<AlertStreamEvent> result = deduplicator.dedup(e4);
-                if (result != null) {
-                    allResults.addAll(result);
-                }
-                System.out.println("4 >>>> " + ToStringBuilder.reflectionToString(result));
-            }
-        }).start();
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    Thread.sleep(500);
-                } catch (InterruptedException e) {
-                }
-
-                List<AlertStreamEvent> result = deduplicator.dedup(e5);
-                if (result != null) {
-                    allResults.addAll(result);
-                }
-                System.out.println("5 >>>> " + ToStringBuilder.reflectionToString(result));
-            }
-        }).start();
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                List<AlertStreamEvent> result = deduplicator.dedup(e6);
-                if (result != null) {
-                    allResults.addAll(result);
-                }
-                System.out.println("6 >>>> " + ToStringBuilder.reflectionToString(result));
-            }
-        }).start();
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                List<AlertStreamEvent> result = deduplicator.dedup(e7);
-                if (result != null) {
-                    allResults.addAll(result);
-                }
-                System.out.println("7 >>>> " + ToStringBuilder.reflectionToString(result));
-            }
-        }).start();
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                List<AlertStreamEvent> result = deduplicator.dedup(e8);
-                if (result != null) {
-                    allResults.addAll(result);
-                }
-                System.out.println("8 >>>> " + ToStringBuilder.reflectionToString(result));
-            }
-        }).start();
-
-        Thread.sleep(2000);
-
-        long maxCount = 0;
-        for (AlertStreamEvent event : allResults) {
-            Assert.assertNotNull(event.getData()[4]);
-            Assert.assertNotNull(event.getData()[5]);
-
-            if (((Long) event.getData()[4]) > maxCount) {
-                maxCount = (Long) event.getData()[4];
-                System.out.println(String.format(">>>>>%s: %s", event, maxCount));
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
deleted file mode 100644
index a788646..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
-import org.apache.eagle.alert.engine.router.TestAlertPublisherBolt;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-
-public class ExtendedDeduplicatorTest {
-
-    @Test
-    public void testNormal() throws Exception {
-        List<Publishment> pubs = loadEntities("/router/publishments-extended-deduplicator.json", Publishment.class);
-
-        AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
-        AlertStreamEvent event1 = createWithStreamDef("extended_dedup_host1", "extended_dedup_testapp1", "OPEN");
-        AlertStreamEvent event2 = createWithStreamDef("extended_dedup_host2", "extended_dedup_testapp1", "OPEN");
-        AlertStreamEvent event3 = createWithStreamDef("extended_dedup_host2", "extended_dedup_testapp2", "CLOSE");
-
-        Assert.assertNotNull(plugin.dedup(event1));
-        Assert.assertNull(plugin.dedup(event2));
-        Assert.assertNotNull(plugin.dedup(event3));
-
-    }
-
-    private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
-        ObjectMapper objectMapper = new ObjectMapper();
-        JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
-        List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type);
-        return l;
-    }
-
-    private AlertStreamEvent createWithStreamDef(String hostname, String appName, String state) {
-        AlertStreamEvent alert = new AlertStreamEvent();
-        PolicyDefinition policy = new PolicyDefinition();
-        policy.setName("perfmon_cpu_host_check");
-        alert.setPolicyId(policy.getName());
-        alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[] {appName, hostname, state});
-        alert.setStreamId("testAlertStream");
-        alert.setCreatedBy(this.toString());
-
-        // build stream definition
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn appColumn = new StreamColumn();
-        appColumn.setName("appname");
-        appColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("hostname");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn stateColumn = new StreamColumn();
-        stateColumn.setName("state");
-        stateColumn.setType(StreamColumn.Type.STRING);
-
-        sd.setColumns(Arrays.asList(appColumn, hostColumn, stateColumn));
-
-        alert.setSchema(sd);
-        return alert;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java
deleted file mode 100644
index 31f744e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.mongodb.MongoClient;
-
-import de.flapdoodle.embed.mongo.MongodExecutable;
-import de.flapdoodle.embed.mongo.MongodProcess;
-import de.flapdoodle.embed.mongo.MongodStarter;
-import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Net;
-import de.flapdoodle.embed.mongo.distribution.Version;
-import de.flapdoodle.embed.process.runtime.Network;
-
-public class SimpleEmbedMongo {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SimpleEmbedMongo.class);
-    
-    private MongoClient client;
-    private MongodExecutable mongodExe;
-    private MongodProcess mongod;
-
-    public void start() throws Exception {
-        MongodStarter starter = MongodStarter.getDefaultInstance();
-        mongodExe = starter.prepare(new MongodConfigBuilder().version(Version.V3_2_1)
-                .net(new Net(27017, Network.localhostIsIPv6())).build());
-        mongod = mongodExe.start();
-
-        client = new MongoClient("localhost");
-    }
-
-    public void shutdown() {
-
-        if (mongod != null) {
-            try {
-                mongod.stop();
-            }
-            catch (IllegalStateException e) {
-                // catch this exception for the unstable stopping mongodb
-                // reason: the exception is usually thrown out with below message format when stop() returns null value,
-                //         but actually this should have been captured in ProcessControl.stopOrDestroyProcess() by destroying
-                //         the process ultimately
-                if (e.getMessage() != null && e.getMessage().matches("^Couldn't kill.*process!.*")) {
-                    // if matches, do nothing, just ignore the exception
-                } else {
-                    LOG.warn(String.format("Ignored error for stopping mongod process, see stack trace: %s", ExceptionUtils.getStackTrace(e)));
-                }
-            }
-            mongodExe.stop();
-        }
-    }
-
-    public MongoClient getMongoClient() {
-        return client;
-    }
-
-	
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
deleted file mode 100644
index 247f332..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.dedup;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@Ignore
-public class TestDeduplicator extends ExtendedDeduplicator {
-
-    public TestDeduplicator(Config config, Map<String, String> properties, List<String> customDedupFields,
-			String dedupStateField, DedupCache dedupCache, String publishName) {
-		super(config, properties, customDedupFields, dedupStateField, dedupCache, publishName);
-	}
-
-	private static final Logger LOG = LoggerFactory.getLogger(TestDeduplicator.class);
-
-    @Override
-    public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
-        StreamDefinition streamDefinition = event.getSchema();
-        HashMap<String, String> customFieldValues = new HashMap<>();
-        String stateFiledValue = null;
-        for (int i = 0; i < event.getData().length; i++) {
-            if (i > streamDefinition.getColumns().size()) {
-                continue;
-            }
-            String colName = streamDefinition.getColumns().get(i).getName();
-
-            if (colName.equals(this.getDedupStateField())) {
-                stateFiledValue = event.getData()[i].toString();
-            }
-
-            // make all of the field as unique key if no custom dedup field provided
-            if (this.getCustomDedupFields() == null || this.getCustomDedupFields().size() <= 0) {
-                customFieldValues.put(colName, event.getData()[i].toString());
-            } else {
-                for (String field : this.getCustomDedupFields()) {
-                    if (colName.equals(field)) {
-                        customFieldValues.put(field, event.getData()[i].toString());
-                        break;
-                    }
-                }
-            }
-        }
-        LOG.info("event: " + event);
-        EventUniq eventkey = new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), customFieldValues);
-        LOG.info("event key: " + eventkey);
-        LOG.info("dedup field: " + this.getDedupStateField());
-        LOG.info("dedup value: " + stateFiledValue);
-        List<AlertStreamEvent> result = this.getDedupCache().dedup(event, eventkey, this.getDedupStateField(), stateFiledValue, "closed");
-        return result;
-    }
-
-    @Override
-    public void setDedupIntervalMin(String intervalMin) {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
deleted file mode 100644
index 7b1d494..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.template;
-
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.coordinator.AlertDefinition;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class VelocityAlertTemplateEngineTest {
-    @Test
-    public void testVelocityAlertTemplate () {
-        AlertTemplateEngine templateEngine = new VelocityAlertTemplateEngine();
-        templateEngine.init(ConfigFactory.load());
-        templateEngine.register(mockPolicy("testPolicy"));
-        AlertStreamEvent event = templateEngine.filter(mockAlertEvent("testPolicy"));
-        Assert.assertEquals("Alert (2016-11-30 07:31:15): cpu usage on hadoop of cluster test_cluster at localhost is 0.98, " +
-            "exceeding thread hold: 90%. (policy: testPolicy, description: Policy for monitoring cpu usage > 90%), " +
-            "definition: from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " +
-            "select site, metric, host, role, value insert into capacityUsageAlert", event.getBody());
-        Assert.assertEquals("Name Node Usage Exceed 90%, reach 98.0% now", event.getSubject());
-    }
-
-    @Test
-    public void testVelocityAlertTemplateWithoutTemplate () {
-        AlertTemplateEngine templateEngine = new VelocityAlertTemplateEngine();
-        templateEngine.init(ConfigFactory.load());
-        templateEngine.register(mockPolicyWithoutTemplate("testPolicyName"));
-        AlertStreamEvent event = templateEngine.filter(mockAlertEvent("testPolicyName"));
-        Assert.assertEquals("Message: Alert {site=test, stream=ALERT_STREAM,timestamp=2016-11-30 07:31:15,923," +
-            "data={site=test_cluster, role=hadoop, metric=cpu.usage, host=localhost, value=0.98}, " +
-            "policyId=testPolicyName, createdBy=junit, metaVersion=SAMPLE_META_VERSION} " +
-            "(Auto-generated alert message as template not defined in policy testPolicyName)", event.getBody());
-        Assert.assertEquals("testPolicyName", event.getSubject());
-    }
-
-    private static PolicyDefinition mockPolicy (String policyId) {
-        PolicyDefinition pd = new PolicyDefinition();
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        def.setValue("from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " +
-            "select site, metric, host, role, value insert into capacityUsageAlert");
-        def.setType("siddhi");
-        pd.setDefinition(def);
-        pd.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM"));
-        pd.setOutputStreams(Collections.singletonList("capacityUsageAlert"));
-        pd.setName(policyId);
-        pd.setDescription("Policy for monitoring cpu usage > 90%");
-        AlertDefinition alertDefinition = new AlertDefinition();
-        alertDefinition.setSubject("Name Node Usage Exceed 90%, reach #set($usage_per = $value * 100)$usage_per% now");
-        alertDefinition.setBody("Alert ($CREATED_TIME): cpu usage on $role of cluster $site at $host is $value, exceeding thread hold: 90%. "
-                + "(policy: $POLICY_ID, description: $POLICY_DESC), definition: $POLICY_DEFINITION");
-        pd.setAlertDefinition(alertDefinition);
-        return pd;
-    }
-    private static PolicyDefinition mockPolicyWithoutTemplate (String policyId) {
-        PolicyDefinition pd = new PolicyDefinition();
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        def.setValue("from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " +
-            "select site, metric, host, role, value insert into capacityUsageAlert");
-        def.setType("siddhi");
-        pd.setDefinition(def);
-        pd.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM"));
-        pd.setOutputStreams(Collections.singletonList("capacityUsageAlert"));
-        pd.setName(policyId);
-        pd.setDescription("Policy for monitoring cpu usage > 90%");
-        return pd;
-    }
-
-    private AlertStreamEvent mockAlertEvent (String policyId) {
-        AlertStreamEvent event = new AlertStreamEvent();
-        event.setSiteId("test");
-        event.setCreatedBy("junit");
-        event.setCreatedTime(1480491075923L);
-        event.setPolicyId(policyId);
-        event.setStreamId("ALERT_STREAM");
-        event.setSchema(mockAlertStreamDefinition("ALERT_STREAM"));
-        event.setMetaVersion("SAMPLE_META_VERSION");
-        event.setTimestamp(1480491075923L);
-        event.setData(new Object[]{"test_cluster", "cpu.usage", "localhost", "hadoop", 0.98});
-        event.ensureAlertId();
-        return event;
-    }
-
-    private StreamDefinition mockAlertStreamDefinition(String streamId){
-        StreamDefinition streamDefinition = new StreamDefinition();
-        streamDefinition.setStreamId(streamId);
-        streamDefinition.setSiteId("test_cluster");
-        List<StreamColumn> columns = new ArrayList<>();
-        StreamColumn column = new StreamColumn();
-        column.setName("site");
-        column.setType(StreamColumn.Type.STRING);
-        columns.add(column);
-        column = new StreamColumn();
-        column.setName("metric");
-        column.setType(StreamColumn.Type.STRING);
-        columns.add(column);
-        column = new StreamColumn();
-        column.setName("host");
-        column.setType(StreamColumn.Type.STRING);
-        columns.add(column);
-        column = new StreamColumn();
-        column.setName("role");
-        column.setType(StreamColumn.Type.STRING);
-        columns.add(column);
-        column = new StreamColumn();
-        column.setName("value");
-        column.setType(StreamColumn.Type.STRING);
-        columns.add(column);
-
-        streamDefinition.setColumns(columns);
-        return streamDefinition;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java
deleted file mode 100644
index 269cb71..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.publisher.template;
-
-import org.apache.velocity.exception.MethodInvocationException;
-import org.apache.velocity.exception.ParseErrorException;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class VelocityTemplateParserTest {
-    @Test
-    public void testParseVelocityTemplate() {
-        String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time";
-        VelocityTemplateParser parser = new VelocityTemplateParser(templateString);
-        Assert.assertEquals(5, parser.getReferenceNames().size());
-        Assert.assertArrayEquals(new String[]{"category", "reason", "REASON", "source", "created_time"}, parser.getReferenceNames().toArray());
-    }
-
-
-    @Test(expected = ParseErrorException.class)
-    public void testParseInvalidVelocityTemplate() {
-        String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time #if() #fi";
-        VelocityTemplateParser parser = new VelocityTemplateParser(templateString);
-        Assert.assertEquals(5, parser.getReferenceNames().size());
-        Assert.assertArrayEquals(new String[]{"category", "reason", "REASON", "source", "created_time"}, parser.getReferenceNames().toArray());
-    }
-
-    @Test
-    public void testValidateVelocityContext() {
-        String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time";
-        VelocityTemplateParser parser = new VelocityTemplateParser(templateString);
-        Map<String,Object> context = new HashMap<>();
-        context.put("category", "UNKNOWN");
-        context.put("reason", "timeout");
-        context.put("REASON", "IO error");
-        context.put("source","localhost");
-        context.put("created_time", "2016-11-30 05:52:47,053");
-        parser.validateContext(context);
-    }
-
-    @Test(expected = MethodInvocationException.class)
-    public void testValidateInvalidVelocityContext() {
-        String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time";
-        VelocityTemplateParser parser = new VelocityTemplateParser(templateString);
-        parser.validateContext(new HashMap<>());
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java
deleted file mode 100644
index b67f394..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.template;
-
-import org.apache.velocity.Template;
-import org.apache.velocity.VelocityContext;
-import org.apache.velocity.app.Velocity;
-import org.apache.velocity.app.VelocityEngine;
-import org.apache.velocity.runtime.RuntimeConstants;
-import org.apache.velocity.runtime.parser.node.ASTReference;
-import org.apache.velocity.runtime.parser.node.ASTprocess;
-import org.apache.velocity.runtime.resource.loader.StringResourceLoader;
-import org.apache.velocity.runtime.resource.util.StringResourceRepository;
-import org.apache.velocity.runtime.visitor.NodeViewMode;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class VelocityTemplateTest {
-    private static final Logger LOG = LoggerFactory.getLogger(VelocityTemplateTest.class);
-
-    @Test
-    public void testVelocityTemplate() {
-        String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time";
-        String resultString = "This alert ($category) was generated because timeout and IO error from localhost at 2016-11-30 05:52:47,053";
-        VelocityEngine engine = new VelocityEngine();
-        engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute");
-        engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName());
-        engine.setProperty(Velocity.RESOURCE_LOADER, "string");
-        engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName());
-        engine.addProperty("string.resource.loader.repository.static", "false");
-        // engine.addProperty("runtime.references.strict", "true");
-        engine.init();
-
-        StringResourceRepository repo = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT);
-        repo.putStringResource("alert_template", "");
-        repo.putStringResource("alert_template", templateString);
-
-        Assert.assertEquals(templateString, repo.getStringResource("alert_template").getBody());
-
-        VelocityContext context = new VelocityContext();
-        context.put("reason", "timeout");
-        context.put("REASON", "IO error");
-        context.put("source","localhost");
-        context.put("created_time", "2016-11-30 05:52:47,053");
-
-        Template velocityTemplate = engine.getTemplate("alert_template");
-        ASTprocess data = (ASTprocess) velocityTemplate.getData();
-        ReferenceContext referenceContext = new ReferenceContext();
-        data.jjtAccept(referenceContext,null);
-        Assert.assertEquals(5, referenceContext.getReferences().size());
-        StringWriter writer = new StringWriter();
-        velocityTemplate.merge(context, writer);
-        velocityTemplate.process();
-        Assert.assertEquals(resultString, writer.toString());
-    }
-
-    private class ReferenceContext extends NodeViewMode {
-        private List<ASTReference> references = new ArrayList<>();
-
-        @Override
-        public Object visit(ASTReference node, Object data) {
-            references.add(node);
-            return super.visit(node, data);
-        }
-
-        public List<ASTReference> getReferences() {
-            return this.references;
-        }
-
-        public List<String> getReferenceNames() {
-            return this.references.stream().map(ASTReference::getRootString).collect(Collectors.toList());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
deleted file mode 100644
index 284abc4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-import java.util.Map;
-
-/**
- * Created on 8/29/16.
- */
-public class CustomizedHandler implements PolicyStreamHandler {
-    private Collector<AlertStreamEvent> collector;
-    private PolicyHandlerContext context;
-    private Map<String, StreamDefinition> sds;
-
-    public CustomizedHandler(Map<String, StreamDefinition> sds) {
-        this.sds = sds;
-    }
-
-    @Override
-    public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
-        this.collector = collector;
-        this.context = context;
-    }
-
-    @Override
-    public void send(StreamEvent event) throws Exception {
-        AlertStreamEvent alert = new AlertStreamEvent();
-        alert.setPolicyId(context.getPolicyDefinition().getName());
-        alert.setSchema(sds.get(event.getStreamId()));
-        this.collector.emit(alert);
-    }
-
-    @Override
-    public void close() throws Exception {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
deleted file mode 100755
index c9e09fd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ /dev/null
@@ -1,701 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
-import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.runner.AlertBolt;
-import org.apache.eagle.alert.engine.runner.TestStreamRouterBolt;
-import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Since 5/2/16.
- */
-@SuppressWarnings( {"rawtypes", "unused"})
-public class TestAlertBolt {
-
-    public static final String TEST_STREAM = "test-stream";
-    
-    private static final Logger LOG = LoggerFactory.getLogger(TestAlertBolt.class);
-
-    /**
-     * Following knowledge is guaranteed in
-     *
-     * @throws Exception Add test case: 2 alerts should be generated even if they are very close to each other in timestamp
-     * @see org.apache.eagle.alert.engine.runner.AlertBolt#execute{
-     * if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){
-     * throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName());
-     * }
-     * }
-     */
-    @Test
-    public void testAlertBolt() throws Exception {
-        final AtomicInteger alertCount = new AtomicInteger();
-        final Semaphore mutex = new Semaphore(0);
-        OutputCollector collector = new OutputCollector(new IOutputCollector() {
-            int count = 0;
-
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                alertCount.incrementAndGet();
-                mutex.release();
-                Assert.assertEquals("testAlertStream", ((PublishPartition) tuple.get(0)).getStreamId());
-                Assert.assertEquals("testAlertPublish", ((PublishPartition) tuple.get(0)).getPublishId());
-                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
-                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", ((PublishPartition) tuple.get(0)).getStreamId(), tuple));
-                return null;
-            }
-
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-            }
-
-            @Override
-            public void ack(Tuple input) {
-            }
-
-            @Override
-            public void fail(Tuple input) {
-            }
-
-            @Override
-            public void reportError(Throwable error) {
-            }
-            
-        });
-        AlertBolt bolt = createAlertBolt(collector);
-
-        String streamId = "cpuUsageStream";
-
-        // construct StreamDefinition
-        StreamDefinition schema = new StreamDefinition();
-        schema.setStreamId(streamId);
-        StreamColumn column = new StreamColumn();
-        column.setName("col1");
-        column.setType(StreamColumn.Type.STRING);
-        schema.setColumns(Collections.singletonList(column));
-        Map<String, StreamDefinition> sds = new HashMap<>();
-        sds.put(schema.getStreamId(), schema);
-
-        // construct StreamPartition
-        StreamPartition sp = new StreamPartition();
-        sp.setColumns(Collections.singletonList("col1"));
-        sp.setStreamId(streamId);
-        sp.setType(StreamPartition.Type.GROUPBY);
-
-        AlertBoltSpec spec = new AlertBoltSpec();
-        spec.setVersion("version1");
-        spec.setTopologyName("testTopology");
-        PolicyDefinition pd = new PolicyDefinition();
-        pd.setName("policy1");
-        pd.setPartitionSpec(Collections.singletonList(sp));
-        pd.setOutputStreams(Collections.singletonList("testAlertStream"));
-        pd.setInputStreams(Collections.singletonList(streamId));
-        pd.setDefinition(new PolicyDefinition.Definition());
-        pd.getDefinition().type = PolicyStreamHandlers.SIDDHI_ENGINE;
-        pd.getDefinition().value = "from cpuUsageStream[col1=='value1' OR col1=='value2'] select col1 insert into testAlertStream;";
-        spec.addBoltPolicy("alertBolt1", pd.getName());
-        spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<PolicyDefinition>(Arrays.asList(pd)));
-        spec.addPublishPartition("testAlertStream", "policy1", "testAlertPublish", null);
-        bolt.onAlertBoltSpecChange(spec, sds);
-
-        // contruct GeneralTopologyContext
-        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
-        int taskId = 1;
-        when(context.getComponentId(taskId)).thenReturn("comp1");
-        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-
-        // construct event with "value1"
-        StreamEvent event1 = new StreamEvent();
-        event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000);
-        event1.setMetaVersion("version1");
-        Object[] data = new Object[] {"value1"};
-        event1.setData(data);
-        event1.setStreamId(streamId);
-        PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp, 1001);
-
-        // construct another event with "value1"
-        StreamEvent event2 = new StreamEvent();
-        event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000);
-        event2.setMetaVersion("version1");
-        data = new Object[] {"value2"};
-        event2.setData(data);
-        event2.setStreamId(streamId);
-        PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001);
-        
-        Thread.sleep(3000);
-        Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default");
-        Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default");
-        bolt.execute(input);
-        bolt.execute(input2);
-        Assert.assertTrue("Timeout to acquire mutex in 5s", mutex.tryAcquire(2, 5, TimeUnit.SECONDS));
-        Assert.assertEquals(2, alertCount.get());
-        bolt.cleanup();
-    }
-
-    public static AlertBolt createAlertBolt(OutputCollector collector) {
-        Config config = ConfigFactory.load();
-        PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
-        TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService();
-        AlertBolt bolt = new AlertBolt("alertBolt1", config, mockChangeService);
-        Map stormConf = new HashMap<>();
-        TopologyContext topologyContext = mock(TopologyContext.class);
-        when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
-        bolt.prepare(stormConf, topologyContext, collector);
-        return bolt;
-    }
-
-    @Test
-    public void testMetadataMismatch() throws Exception {
-        AtomicInteger failedCount = new AtomicInteger();
-        OutputCollector collector = new OutputCollector(new IOutputCollector() {
-            int count = 0;
-
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                Assert.assertEquals("testAlertStream", tuple.get(0));
-                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
-                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
-                return null;
-            }
-
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-            }
-
-            @Override
-            public void ack(Tuple input) {
-            }
-
-            @Override
-            public void fail(Tuple input) {
-                failedCount.incrementAndGet();
-            }
-
-            @Override
-            public void reportError(Throwable error) {
-            }
-        });
-        AlertBolt bolt = createAlertBolt(collector);
-
-        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
-        int taskId = 1;
-        when(context.getComponentId(taskId)).thenReturn("comp1");
-        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-        // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange)
-        PartitionedEvent pe = new PartitionedEvent();
-        pe.setPartitionKey(1);
-        pe.setPartition(createPartition());
-        StreamEvent streamEvent = new StreamEvent();
-        streamEvent.setStreamId("test-stream");
-        streamEvent.setTimestamp(System.currentTimeMillis());
-        pe.setEvent(streamEvent);
-
-        PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt);
-        byte[] serializedEvent = peSer.serialize(pe);
-        Tuple input = new TupleImpl(context, Collections.singletonList(serializedEvent), taskId, "default");
-        bolt.execute(input);
-
-        Assert.assertEquals(1, failedCount.get());
-        failedCount.set(0);
-
-        {
-            // case 2: metadata loaded but empty (AlertBoltSepc)
-            bolt.onAlertBoltSpecChange(new AlertBoltSpec(), new HashMap());
-
-            bolt.execute(input);
-            Assert.assertEquals(1, failedCount.get());
-            failedCount.set(0);
-        }
-
-        // case 3: metadata loaded but mismatched
-        {
-            Map<String, StreamDefinition> sds = new HashMap();
-            StreamDefinition sdTest = new StreamDefinition();
-            String streamId = "pd-test"; // here streamId is different from the one "test-stream" (StreamEvent)
-            sdTest.setStreamId(streamId);
-            sds.put(sdTest.getStreamId(), sdTest);
-
-            AlertBoltSpec boltSpecs = new AlertBoltSpec();
-            boltSpecs.setVersion("specVersion-" + System.currentTimeMillis());
-
-            PolicyDefinition def = new PolicyDefinition();
-            def.setName("policy-definition");
-            def.setInputStreams(Arrays.asList(streamId));
-            def.setOutputStreams(Arrays.asList("output"));
-            PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-            definition.setType(PolicyStreamHandlers.NO_DATA_ALERT_ENGINE);
-            definition.setValue("PT0M,provided,1,host,host1");
-            def.setDefinition(definition);
-
-            boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
-
-            bolt.onAlertBoltSpecChange(boltSpecs, sds);
-
-            bolt.execute(input);
-            Assert.assertEquals(1, failedCount.get());
-            failedCount.set(0);
-        }
-    }
-
-    //TODO: no data alert failed, need to check when no data alert merged.
-    @Test
-    public void testMetaversionConflict() throws Exception {
-        AtomicInteger failedCount = new AtomicInteger();
-        OutputCollector collector = new OutputCollector(new IOutputCollector() {
-            int count = 0;
-
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                Assert.assertEquals("testAlertStream", tuple.get(0));
-                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
-                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
-                return null;
-            }
-
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-            }
-
-            @Override
-            public void ack(Tuple input) {
-            }
-
-            @Override
-            public void fail(Tuple input) {
-                failedCount.incrementAndGet();
-            }
-
-            @Override
-            public void reportError(Throwable error) {
-            }
-        });
-        AlertBolt bolt = createAlertBolt(collector);
-
-        Map<String, StreamDefinition> sds = new HashMap();
-        StreamDefinition sdTest = new StreamDefinition();
-        String streamId = "test-stream";
-        sdTest.setStreamId(streamId);
-        sds.put(sdTest.getStreamId(), sdTest);
-
-        AlertBoltSpec boltSpecs = new AlertBoltSpec();
-        boltSpecs.setVersion("spec_version_" + System.currentTimeMillis());
-        boltSpecs.setTopologyName("alertUnitTopology_1");
-
-        PolicyDefinition def = new PolicyDefinition();
-        def.setName("policy-definition");
-        def.setInputStreams(Arrays.asList(streamId));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType(PolicyStreamHandlers.NO_DATA_ALERT_ENGINE);
-        definition.setValue("PT0M,provided,1,host,host1");
-        def.setDefinition(definition);
-        def.setPartitionSpec(Arrays.asList(createPartition()));
-        def.setOutputStreams(Arrays.asList("out"));
-
-        boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
-        bolt = createAlertBolt(collector);
-        bolt.onAlertBoltSpecChange(boltSpecs, sds);
-
-        Tuple input = createTuple(bolt, boltSpecs.getVersion());
-        bolt.execute(input);
-
-        // Sleep 10s to wait thread in bolt.execute() to finish works
-        Thread.sleep(10000);
-
-        Assert.assertEquals(0, failedCount.get());
-        failedCount.set(0);
-
-    }
-
-    private Tuple createTuple(AlertBolt bolt, String version) throws IOException {
-        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
-        int taskId = 1;
-        when(context.getComponentId(taskId)).thenReturn("comp1");
-        when(context.getComponentOutputFields("comp1", TEST_STREAM)).thenReturn(new Fields("f0"));
-        // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange)
-        PartitionedEvent pe = new PartitionedEvent();
-        pe.setPartitionKey(1);
-        pe.setPartition(createPartition());
-        StreamEvent streamEvent = new StreamEvent();
-        streamEvent.setStreamId(TEST_STREAM);
-        streamEvent.setTimestamp(System.currentTimeMillis());
-        streamEvent.setMetaVersion(version);
-        pe.setEvent(streamEvent);
-
-        PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt);
-        byte[] serializedEvent = peSer.serialize(pe);
-        return new TupleImpl(context, Collections.singletonList(serializedEvent), taskId, TEST_STREAM);
-    }
-
-    private StreamPartition createPartition() {
-        StreamPartition sp = new StreamPartition();
-        sp.setStreamId(TEST_STREAM);
-        sp.setType(StreamPartition.Type.GROUPBY);
-        return sp;
-    }
-
-    @Test
-    public void testExtendDefinition() throws IOException {
-        PolicyDefinition def = new PolicyDefinition();
-        def.setName("policy-definition");
-        def.setInputStreams(Arrays.asList(TEST_STREAM));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
-        definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler");
-        definition.setValue("PT0M,plain,1,host,host1");
-        def.setDefinition(definition);
-        def.setPartitionSpec(Arrays.asList(createPartition()));
-
-        AlertBoltSpec boltSpecs = new AlertBoltSpec();
-
-        AtomicBoolean recieved = new AtomicBoolean(false);
-        OutputCollector collector = new OutputCollector(new IOutputCollector() {
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                recieved.set(true);
-                return Collections.emptyList();
-            }
-
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-            }
-
-            @Override
-            public void ack(Tuple input) {
-            }
-
-            @Override
-            public void fail(Tuple input) {
-            }
-
-            @Override
-            public void reportError(Throwable error) {
-            }
-        });
-        AlertBolt bolt = createAlertBolt(collector);
-
-        boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
-        boltSpecs.setVersion("spec_" + System.currentTimeMillis());
-        // stream def map
-        Map<String, StreamDefinition> sds = new HashMap();
-        StreamDefinition sdTest = new StreamDefinition();
-        sdTest.setStreamId(TEST_STREAM);
-        sds.put(sdTest.getStreamId(), sdTest);
-        
-        boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish", null);
-
-        bolt.onAlertBoltSpecChange(boltSpecs, sds);
-
-        // how to assert
-        Tuple t = createTuple(bolt, boltSpecs.getVersion());
-
-        bolt.execute(t);
-
-        Assert.assertTrue(recieved.get());
-    }
-    
-    @Test
-    public void testStreamDefinitionChange() throws IOException {
-        PolicyDefinition def = new PolicyDefinition();
-        def.setName("policy-definition");
-        def.setInputStreams(Arrays.asList(TEST_STREAM));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
-        definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler");
-        definition.setValue("PT0M,plain,1,host,host1");
-        def.setDefinition(definition);
-        def.setPartitionSpec(Arrays.asList(createPartition()));
-
-        AlertBoltSpec boltSpecs = new AlertBoltSpec();
-
-        AtomicBoolean recieved = new AtomicBoolean(false);
-        OutputCollector collector = new OutputCollector(new IOutputCollector() {
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                recieved.set(true);
-                return Collections.emptyList();
-            }
-
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-            }
-
-            @Override
-            public void ack(Tuple input) {
-            }
-
-            @Override
-            public void fail(Tuple input) {
-            }
-
-            @Override
-            public void reportError(Throwable error) {
-            }
-        });
-        AlertBolt bolt = createAlertBolt(collector);
-
-        boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
-        boltSpecs.setVersion("spec_" + System.currentTimeMillis());
-        // stream def map
-        Map<String, StreamDefinition> sds = new HashMap();
-        StreamDefinition sdTest = new StreamDefinition();
-        sdTest.setStreamId(TEST_STREAM);
-        sds.put(sdTest.getStreamId(), sdTest);
-        
-        boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish", null);
-
-        bolt.onAlertBoltSpecChange(boltSpecs, sds);
-        
-        LOG.info("Update stream");
-        sds = new HashMap();
-        sdTest = new StreamDefinition();
-        sdTest.setStreamId(TEST_STREAM);
-        sds.put(sdTest.getStreamId(), sdTest);
-        sdTest.setDescription("update the stream");
-        bolt.onAlertBoltSpecChange(boltSpecs, sds);
-        
-        LOG.info("Update stream & update policy");
-        sds = new HashMap();
-        sdTest = new StreamDefinition();
-        sdTest.setStreamId(TEST_STREAM);
-        sds.put(sdTest.getStreamId(), sdTest);
-        sdTest.setDescription("update the stream & update policy");
-        
-        def = new PolicyDefinition();
-        def.setName("policy-definition");
-        def.setInputStreams(Arrays.asList(TEST_STREAM));
-
-        definition = new PolicyDefinition.Definition();
-        definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
-        definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler");
-        definition.setValue("PT0M,plain,1,host,host2");
-        def.setDefinition(definition);
-        def.setPartitionSpec(Arrays.asList(createPartition()));
-        boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
-        
-        bolt.onAlertBoltSpecChange(boltSpecs, sds);
-        
-        LOG.info("No any change");
-        sds = new HashMap();
-        sdTest = new StreamDefinition();
-        sdTest.setStreamId(TEST_STREAM);
-        sds.put(sdTest.getStreamId(), sdTest);
-        sdTest.setDescription("update the stream");
-        bolt.onAlertBoltSpecChange(boltSpecs, sds);
-        
-        // how to assert
-        Tuple t = createTuple(bolt, boltSpecs.getVersion());
-
-        bolt.execute(t);
-
-        Assert.assertTrue(recieved.get());
-    }
-
-    @Test @Ignore
-    public void testMultiStreamDefinition() throws Exception {
-        final AtomicInteger alertCount = new AtomicInteger();
-        final Semaphore mutex = new Semaphore(0);
-        OutputCollector collector = new OutputCollector(new IOutputCollector() {
-            int count = 0;
-
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                System.out.println("=====output collector==========");
-                alertCount.incrementAndGet();
-                mutex.release();
-                Assert.assertTrue("symptomaticAlertOutputStream".equals((String) tuple.get(0))
-                    || "deviceDownAlertStream".equals((String) tuple.get(0)));
-                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
-                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
-
-                System.out.println("**********output collector end***********");
-                return null;
-            }
-
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-            }
-
-            @Override
-            public void ack(Tuple input) {
-            }
-
-            @Override
-            public void fail(Tuple input) {
-            }
-
-            @Override
-            public void reportError(Throwable error) {
-            }
-        });
-
-
-        AlertBolt bolt = createAlertBolt(collector);
-
-        // construct StreamPartition
-        StreamPartition sp = new StreamPartition();
-        sp.setColumns(Collections.singletonList("col1"));
-        sp.setStreamId("correlatedStream");
-        sp.setType(StreamPartition.Type.GROUPBY);
-
-        pushAlertBoltSpec(sp, bolt);
-
-        // now emit
-        // contruct GeneralTopologyContext
-        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
-        int taskId = 1;
-        when(context.getComponentId(taskId)).thenReturn("comp1");
-        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-
-        long base = System.currentTimeMillis();
-        int i = 0;
-        String linkedSwitch = "lvs-ra-01";
-
-        // construct event with "value1"
-        StreamEvent event1 = new StreamEvent();
-        event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000);
-        event1.setMetaVersion("version1");
-        Object[] data = new Object[] { base , "child-"+ (i++), "", linkedSwitch};
-        event1.setData(data);
-        event1.setStreamId("correlatedStream");
-        PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp, 1001);
-
-        // construct another event with "value1"
-        StreamEvent event2 = new StreamEvent();
-        event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:05:00") * 1000);
-        event2.setMetaVersion("version1");
-        data = new Object[] { base , "child-"+ (i++), "", linkedSwitch};
-        event2.setData(data);
-        event2.setStreamId("correlatedStream");
-        PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001);
-
-        Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default");
-        Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default");
-        bolt.execute(input);
-        bolt.execute(input2);
-        Assert.assertTrue("Timeout to acquire mutex in 10s", mutex.tryAcquire(1, 10, TimeUnit.SECONDS));
-        Assert.assertEquals(3, alertCount.get());
-        bolt.cleanup();
-    }
-
-    private void pushAlertBoltSpec(StreamPartition sp, AlertBolt bolt) {
-        Map<String, StreamDefinition> sds = new HashMap<>();
-        sds.put("correlatedStream", createCorrelateStream("correlatedStream"));
-        sds.put("symptomaticAlertOutputStream", createCorrelateStream("symptomaticAlertOutputStream")); // output of updated correlatedStream
-        sds.put("deviceDownAlertStream", createCorrelateStream("deviceDownAlertStream"));
-
-        PolicyDefinition pd = new PolicyDefinition();
-        pd.setName("network_symptomatic");
-        pd.setInputStreams(Arrays.asList("correlatedStream"));
-        pd.setOutputStreams(Arrays.asList("deviceDownAlertStream", "symptomaticAlertOutputStream"));
-
-        pd.setPartitionSpec(Arrays.asList(sp));
-
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        def.setType(PolicyStreamHandlers.SIDDHI_ENGINE);
-        def.setValue("from correlatedStream#window.externalTime(timestamp, 3 min) select UUID() as docId, linkedSwitch, '' as parentKey, timestamp group by linkedSwitch having count() > 0 insert into deviceDownAlertStream; " +
-            " from correlatedStream#window.externalTime(timestamp, 3 min) as left join deviceDownAlertStream#window.time(3 min) as right on left.linkedSwitch == right.linkedSwitch" +
-            " select left.docId, left.timestamp, left.linkedSwitch, right.docId as parentKey insert into symptomaticAlertOutputStream;");
-        pd.setDefinition(def);
-
-
-        AlertBoltSpec spec = new AlertBoltSpec();
-        spec.setVersion("version1");
-        spec.setTopologyName("testTopology");
-        spec.addBoltPolicy("alertBolt1", pd.getName());
-        spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(Arrays.asList(pd)));
-
-        bolt.onAlertBoltSpecChange(spec, sds);
-    }
-
-    private StreamDefinition createCorrelateStream(String streamId) {
-        // construct StreamDefinition
-        StreamDefinition schema = new StreamDefinition();
-        schema.setStreamId(streamId);
-        List<StreamColumn> columns = new LinkedList<>();
-        {
-            StreamColumn column = new StreamColumn();
-            column.setName("timestamp");
-            column.setType(StreamColumn.Type.LONG);
-            columns.add(column);
-        }
-        {
-            StreamColumn column = new StreamColumn();
-            column.setName("docId");
-            column.setType(StreamColumn.Type.STRING);
-            columns.add(column);
-        }
-        {
-            StreamColumn column = new StreamColumn();
-            column.setName("parentKey");
-            column.setType(StreamColumn.Type.STRING);
-            columns.add(column);
-        }
-        {
-            StreamColumn column = new StreamColumn();
-            column.setName("linkedSwitch");
-            column.setType(StreamColumn.Type.STRING);
-            columns.add(column);
-        }
-
-        schema.setColumns(columns);
-        return schema;
-    }
-
-}
-


Mime
View raw message