eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [14/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:22 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
deleted file mode 100644
index 46517fe..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
-import org.apache.eagle.alert.engine.runner.MapComparator;
-import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @Since 5/14/16.
- */
-public class TestAlertPublisherBolt {
-
-    @SuppressWarnings("rawtypes")
-    @Ignore
-    @Test
-    public void test() {
-        Config config = ConfigFactory.load("application-test.conf");
-        AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
-        publisher.init(config, new HashMap());
-        PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
-        publisher.onPublishChange(spec.getPublishments(), null, null, null);
-        AlertStreamEvent event = create("testAlertStream");
-        publisher.nextEvent(new PublishPartition(event.getStreamId(), event.getPolicyId(),
-            spec.getPublishments().get(0).getName(), spec.getPublishments().get(0).getPartitionColumns()), event);
-        AlertStreamEvent event1 = create("testAlertStream");
-        publisher.nextEvent(new PublishPartition(event1.getStreamId(), event1.getPolicyId(),
-            spec.getPublishments().get(0).getName(), spec.getPublishments().get(0).getPartitionColumns()), event1);
-    }
-
-    private AlertStreamEvent create(String streamId) {
-        AlertStreamEvent alert = new AlertStreamEvent();
-        PolicyDefinition policy = new PolicyDefinition();
-        policy.setName("policy1");
-        alert.setPolicyId(policy.getName());
-        alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[] {"field_1", 2, "field_3"});
-        alert.setStreamId(streamId);
-        alert.setCreatedBy(this.toString());
-        return alert;
-    }
-
-
-    @Test
-    public void testMapComparatorAdded() {
-
-        PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd1.json"), PublishSpec.class);
-        PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd0.json"), PublishSpec.class);
-
-        Map<String, Publishment> map1 = new HashMap<>();
-        Map<String, Publishment> map2 = new HashMap<>();
-        spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
-        spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
-
-        MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
-        comparator.compare();
-        Assert.assertTrue(comparator.getAdded().size() == 1);
-
-    }
-
-    @Test
-    public void testMapComparatorRemoved() {
-
-        PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd0.json"), PublishSpec.class);
-        PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd1.json"), PublishSpec.class);
-
-        Map<String, Publishment> map1 = new HashMap<>();
-        Map<String, Publishment> map2 = new HashMap<>();
-        spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
-        spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
-
-        MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
-        comparator.compare();
-        Assert.assertTrue(comparator.getRemoved().size() == 1);
-
-    }
-
-    @Test
-    public void testMapComparatorModified() {
-
-        PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd0.json"), PublishSpec.class);
-        PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForMdyValue.json"), PublishSpec.class);
-
-        Map<String, Publishment> map1 = new HashMap<>();
-        Map<String, Publishment> map2 = new HashMap<>();
-        spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
-        spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
-
-        MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
-        comparator.compare();
-        Assert.assertTrue(comparator.getModified().size() == 1);
-
-    }
-
-
-    @Test
-    public void testMapComparator() {
-        PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
-        PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec2.json"), PublishSpec.class);
-        PublishSpec spec3 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec3.json"), PublishSpec.class);
-        Map<String, Publishment> map1 = new HashMap<>();
-        Map<String, Publishment> map2 = new HashMap<>();
-        spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
-        spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
-
-        MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
-        comparator.compare();
-        Assert.assertTrue(comparator.getModified().size() == 1);
-
-        AlertPublisherBolt publisherBolt = new AlertPublisherBolt("alert-publisher-test", null, null);
-        publisherBolt.onAlertPublishSpecChange(spec1, null);
-        publisherBolt.onAlertPublishSpecChange(spec2, null);
-        publisherBolt.onAlertPublishSpecChange(spec3, null);
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Test
-    public void testAlertPublisher() throws Exception {
-        AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
-        Config config = ConfigFactory.load("application-test.conf");
-        alertPublisher.init(config, new HashMap());
-        List<Publishment> oldPubs = loadEntities("/publishments1.json", Publishment.class);
-        List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class);
-        alertPublisher.onPublishChange(oldPubs, null, null, null);
-        alertPublisher.onPublishChange(null, null, newPubs, oldPubs);
-    }
-
-    private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
-        ObjectMapper objectMapper = new ObjectMapper();
-        JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
-        List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type);
-        return l;
-    }
-
-    private AlertStreamEvent createWithStreamDef(String hostname, String appName, String state) {
-        AlertStreamEvent alert = new AlertStreamEvent();
-        PolicyDefinition policy = new PolicyDefinition();
-        policy.setName("perfmon_cpu_host_check");
-        alert.setPolicyId(policy.getName());
-        alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[] {appName, hostname, state});
-        alert.setStreamId("testAlertStream");
-        alert.setCreatedBy(this.toString());
-
-        // build stream definition
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn appColumn = new StreamColumn();
-        appColumn.setName("appname");
-        appColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("hostname");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn stateColumn = new StreamColumn();
-        stateColumn.setName("state");
-        stateColumn.setType(StreamColumn.Type.STRING);
-
-        sd.setColumns(Arrays.asList(appColumn, hostColumn, stateColumn));
-
-        alert.setSchema(sd);
-        return alert;
-    }
-
-    @Test
-    public void testCustomFieldDedupEvent() throws Exception {
-        List<Publishment> pubs = loadEntities("/router/publishments.json", Publishment.class);
-
-        AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
-        AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", "OPEN");
-        AlertStreamEvent event2 = createWithStreamDef("host2", "testapp1", "OPEN");
-        AlertStreamEvent event3 = createWithStreamDef("host2", "testapp2", "CLOSE");
-
-        Assert.assertNotNull(plugin.dedup(event1));
-        Assert.assertNull(plugin.dedup(event2));
-        Assert.assertNotNull(plugin.dedup(event3));
-    }
-
-    @Test
-    public void testEmptyCustomFieldDedupEvent() throws Exception {
-        List<Publishment> pubs = loadEntities("/router/publishments-empty-dedup-field.json", Publishment.class);
-
-        AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
-        AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", "OPEN");
-        AlertStreamEvent event2 = createWithStreamDef("host1", "testapp1", "OPEN");
-
-        Assert.assertNotNull(plugin.dedup(event1));
-        Assert.assertNull(plugin.dedup(event2));
-    }
-
-    private AlertStreamEvent createSeverityWithStreamDef(String hostname, String appName, String message, String severity, String docId, String df_device, String df_type, String colo) {
-        AlertStreamEvent alert = new AlertStreamEvent();
-        PolicyDefinition policy = new PolicyDefinition();
-        policy.setName("switch_check");
-        alert.setPolicyId(policy.getName());
-        alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[] {appName, hostname, message, severity, docId, df_device, df_type, colo});
-        alert.setStreamId("testAlertStream");
-        alert.setCreatedBy(this.toString());
-
-        // build stream definition
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn appColumn = new StreamColumn();
-        appColumn.setName("appname");
-        appColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("hostname");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn msgColumn = new StreamColumn();
-        msgColumn.setName("message");
-        msgColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn severityColumn = new StreamColumn();
-        severityColumn.setName("severity");
-        severityColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn docIdColumn = new StreamColumn();
-        docIdColumn.setName("docId");
-        docIdColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn deviceColumn = new StreamColumn();
-        deviceColumn.setName("df_device");
-        deviceColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn deviceTypeColumn = new StreamColumn();
-        deviceTypeColumn.setName("df_type");
-        deviceTypeColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn coloColumn = new StreamColumn();
-        coloColumn.setName("dc");
-        coloColumn.setType(StreamColumn.Type.STRING);
-
-        sd.setColumns(Arrays.asList(appColumn, hostColumn, msgColumn, severityColumn, docIdColumn, deviceColumn, deviceTypeColumn, coloColumn));
-
-        alert.setSchema(sd);
-        return alert;
-    }
-
-    @Test
-    public void testSlackPublishment() throws Exception {
-        Config config = ConfigFactory.load("application-test.conf");
-        AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
-        publisher.init(config, new HashMap());
-        List<Publishment> pubs = loadEntities("/router/publishments-slack.json", Publishment.class);
-        publisher.onPublishChange(pubs, null, null, null);
-
-        AlertStreamEvent event1 = createSeverityWithStreamDef("switch1", "testapp1", "Memory 1 inconsistency detected", "WARNING", "docId1", "ed01", "distribution switch", "us");
-        AlertStreamEvent event2 = createSeverityWithStreamDef("switch2", "testapp2", "Memory 2 inconsistency detected", "CRITICAL", "docId2", "ed02", "distribution switch", "us");
-        AlertStreamEvent event3 = createSeverityWithStreamDef("switch2", "testapp2", "Memory 3 inconsistency detected", "WARNING", "docId3", "ed02", "distribution switch", "us");
-
-        publisher.nextEvent(new PublishPartition(event1.getStreamId(), event1.getPolicyId(),
-            pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event1);
-        publisher.nextEvent(new PublishPartition(event2.getStreamId(), event2.getPolicyId(),
-            pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event2);
-        publisher.nextEvent(new PublishPartition(event3.getStreamId(), event3.getPolicyId(),
-            pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event3);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
deleted file mode 100644
index 704857d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.text.ParseException;
-import java.util.*;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-public class TestStreamRouterBoltOutputCollector {
-
-    @Test
-    public void testStreamRouterCollector() throws ParseException {
-        String streamId = "HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX";
-        StreamPartition partition = new StreamPartition();
-        partition.setStreamId(streamId);
-        partition.setType(StreamPartition.Type.GROUPBY);
-        partition.setColumns(new ArrayList<String>() {{
-            add("col1");
-        }});
-
-        // begin to create two router specs
-        WorkSlot worker1 = new WorkSlot("ALERT_UNIT_TOPOLOGY_APP_SANDBOX", "alertBolt1");
-        WorkSlot worker2 = new WorkSlot("ALERT_UNIT_TOPOLOGY_APP_SANDBOX", "alertBolt2");
-
-        PolicyWorkerQueue queue1 = new PolicyWorkerQueue();
-        queue1.setPartition(partition);
-        queue1.setWorkers(new ArrayList<WorkSlot>() {
-            {
-                add(worker1);
-            }
-        });
-
-        PolicyWorkerQueue queue2 = new PolicyWorkerQueue();
-        queue2.setPartition(partition);
-        queue2.setWorkers(new ArrayList<WorkSlot>() {
-            {
-                add(worker1);
-                add(worker2);
-            }
-        });
-
-        StreamRouterSpec spec1 = new StreamRouterSpec();
-        spec1.setStreamId(streamId);
-        spec1.setPartition(partition);
-
-        spec1.setTargetQueue(new ArrayList<PolicyWorkerQueue>() {{
-            add(queue1);
-        }});
-
-        StreamRouterSpec spec2 = new StreamRouterSpec();
-        spec2.setStreamId(streamId);
-        spec2.setPartition(partition);
-
-        spec2.setTargetQueue(new ArrayList<PolicyWorkerQueue>() {{
-            add(queue2);
-        }});
-
-        // the end of creating
-
-        List<String> targetStreamIds = new ArrayList<>();
-        IOutputCollector delegate = new IOutputCollector() {
-
-            @Override
-            public void reportError(Throwable error) {
-
-            }
-
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                targetStreamIds.add(streamId);
-                return null;
-            }
-
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-            }
-
-            @Override
-            public void ack(Tuple input) {
-            }
-
-            @Override
-            public void fail(Tuple input) {
-            }
-
-        };
-
-        List<StreamRouterSpec> list1 = new ArrayList<>();
-        list1.add(spec1);
-
-        List<StreamRouterSpec> list2 = new ArrayList<>();
-        list2.add(spec2);
-
-        // construct StreamDefinition
-        StreamDefinition schema = new StreamDefinition();
-        schema.setStreamId(streamId);
-        StreamColumn column = new StreamColumn();
-        column.setName("col1");
-        column.setType(StreamColumn.Type.STRING);
-        schema.setColumns(Collections.singletonList(column));
-        Map<String, StreamDefinition> sds = new HashMap<>();
-        sds.put(schema.getStreamId(), schema);
-
-        // create two events
-        StreamEvent event1 = new StreamEvent();
-        Object[] data = new Object[]{"value1"};
-        event1.setData(data);
-        event1.setStreamId(streamId);
-        PartitionedEvent pEvent1 = new PartitionedEvent();
-        pEvent1.setEvent(event1);
-        pEvent1.setPartition(partition);
-
-        StreamEvent event2 = new StreamEvent();
-        Object[] data2 = new Object[]{"value3"};
-        event2.setData(data2);
-        event2.setStreamId(streamId);
-        PartitionedEvent pEvent2 = new PartitionedEvent();
-        pEvent2.setEvent(event2);
-        pEvent2.setPartition(partition);
-
-        TopologyContext context = Mockito.mock(TopologyContext.class);
-        when(context.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
-        StreamContext streamContext = new StreamContextImpl(null, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context);
-        StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new StormOutputCollector(new OutputCollector(delegate), null), null, streamContext);
-
-        // add a StreamRouterSpec which has one worker
-        collector.onStreamRouterSpecChange(list1, new ArrayList<>(), new ArrayList<>(), sds);
-        collector.emit(pEvent1);
-        Assert.assertTrue(targetStreamIds.size() == 1);
-
-        // modify the StreamRouterSpec to contain two workers
-        collector.onStreamRouterSpecChange(new ArrayList<>(), new ArrayList<>(), list2, sds);
-        collector.emit(pEvent2);
-        Assert.assertTrue(targetStreamIds.size() == 2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
deleted file mode 100644
index a480fcf..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.joda.time.Period;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestStreamRouterBolt {
-    private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class);
-
-    /**
-     * Mocked 5 Events
-     * <p>
-     * 1. Sent in random order:
-     * "value1","value2","value3","value4","value5"
-     * <p>
-     * 2. Received correct time order and value5 is thrown because too late: "value2","value1","value3","value4"
-     *
-     * @throws Exception
-     */
-    @SuppressWarnings("rawtypes")
-    @Test
-    public void testRouterWithSortAndRouteSpec() throws Exception {
-        Config config = ConfigFactory.load();
-        MockChangeService mockChangeService = new MockChangeService();
-        StreamRouterBolt routerBolt = new StreamRouterBolt("routerBolt1", config, mockChangeService);
-
-        final Map<String, List<PartitionedEvent>> streamCollected = new HashMap<>();
-        final List<PartitionedEvent> orderCollected = new ArrayList<>();
-
-        OutputCollector collector = new OutputCollector(new IOutputCollector() {
-            int count = 0;
-
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                PartitionedEvent event;
-                try {
-                    event = routerBolt.deserialize(tuple.get(0));
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-                if (count == 0) {
-                    count++;
-                }
-                LOG.info(String.format("Collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
-                if (!streamCollected.containsKey(streamId)) {
-                    streamCollected.put(streamId, new ArrayList<>());
-                }
-                streamCollected.get(streamId).add(event);
-                orderCollected.add(event);
-                return null;
-            }
-
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-            }
-
-            @Override
-            public void ack(Tuple input) {
-            }
-
-            @Override
-            public void fail(Tuple input) {
-            }
-
-            @SuppressWarnings("unused")
-            public void resetTimeout(Tuple input) {
-            }
-
-            @Override
-            public void reportError(Throwable error) {
-            }
-        });
-
-        Map stormConf = new HashMap<>();
-        TopologyContext topologyContext = mock(TopologyContext.class);
-        when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
-        routerBolt.prepare(stormConf, topologyContext, collector);
-
-        String streamId = "cpuUsageStream";
-        // StreamPartition, groupby col1 for stream cpuUsageStream
-        StreamPartition sp = new StreamPartition();
-        sp.setStreamId(streamId);
-        sp.setColumns(Collections.singletonList("col1"));
-        sp.setType(StreamPartition.Type.GROUPBY);
-
-        StreamSortSpec sortSpec = new StreamSortSpec();
-//        sortSpec.setColumn("timestamp");
-//        sortSpec.setOrder("asc");
-        sortSpec.setWindowPeriod2(Period.minutes(1));
-        sortSpec.setWindowMargin(1000);
-        sp.setSortSpec(sortSpec);
-
-        RouterSpec boltSpec = new RouterSpec();
-
-        // set StreamRouterSpec to have 2 WorkSlot
-        StreamRouterSpec routerSpec = new StreamRouterSpec();
-        routerSpec.setPartition(sp);
-        routerSpec.setStreamId(streamId);
-        PolicyWorkerQueue queue = new PolicyWorkerQueue();
-        queue.setPartition(sp);
-        queue.setWorkers(Arrays.asList(new WorkSlot("testTopology", "alertBolt1"), new WorkSlot("testTopology", "alertBolt2")));
-        routerSpec.setTargetQueue(Collections.singletonList(queue));
-        boltSpec.addRouterSpec(routerSpec);
-        boltSpec.setVersion("version1");
-
-        // construct StreamDefinition
-        StreamDefinition schema = new StreamDefinition();
-        schema.setStreamId(streamId);
-        StreamColumn column = new StreamColumn();
-        column.setName("col1");
-        column.setType(StreamColumn.Type.STRING);
-        schema.setColumns(Collections.singletonList(column));
-        Map<String, StreamDefinition> sds = new HashMap<>();
-        sds.put(schema.getStreamId(), schema);
-
-        routerBolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2"));
-        routerBolt.onStreamRouteBoltSpecChange(boltSpec, sds);
-        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
-        int taskId = 1;
-        when(context.getComponentId(taskId)).thenReturn("comp1");
-        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-
-        // =======================================
-        // Mock 5 Events
-        //
-        // 1. Sent in random order:
-        // "value1","value2","value3","value4","value5"
-        //
-        // 2. Received correct time order and value5 is thrown because too:
-        // "value2","value1","value3","value4"
-        // =======================================
-
-        // construct event with "value1"
-        StreamEvent event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30") * 1000);
-        Object[] data = new Object[] {"value1"};
-        event.setData(data);
-        event.setStreamId(streamId);
-        PartitionedEvent pEvent = new PartitionedEvent();
-        pEvent.setEvent(event);
-        pEvent.setPartition(sp);
-        Tuple input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        routerBolt.execute(input);
-
-        // construct another event with "value2"
-        event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10") * 1000);
-        data = new Object[] {"value2"};
-        event.setData(data);
-        event.setStreamId(streamId);
-        pEvent = new PartitionedEvent();
-        pEvent.setPartition(sp);
-        pEvent.setEvent(event);
-        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        routerBolt.execute(input);
-
-        // construct another event with "value3"
-        event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40") * 1000);
-        data = new Object[] {"value3"};
-        event.setData(data);
-        event.setStreamId(streamId);
-        pEvent = new PartitionedEvent();
-        pEvent.setPartition(sp);
-        pEvent.setEvent(event);
-        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        routerBolt.execute(input);
-
-        // construct another event with "value4"
-        event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10") * 1000);
-        data = new Object[] {"value4"};
-        event.setData(data);
-        event.setStreamId(streamId);
-        pEvent = new PartitionedEvent();
-        pEvent.setPartition(sp);
-        pEvent.setEvent(event);
-        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        routerBolt.execute(input);
-
-        // construct another event with "value5", which will be thrown because two late
-        event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10") * 1000);
-        data = new Object[] {"value5"};
-        event.setData(data);
-        event.setStreamId(streamId);
-        pEvent = new PartitionedEvent();
-        pEvent.setPartition(sp);
-        pEvent.setEvent(event);
-        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        routerBolt.execute(input);
-
-        Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size());
-        Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1", streamCollected.keySet().contains(
-            String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1"))));
-        Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2", streamCollected.keySet().contains(
-            String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2"))));
-
-        Assert.assertEquals("Should finally collect 3 events", 3, orderCollected.size());
-        Assert.assertArrayEquals("Should sort 3 events in ASC order", new String[] {"value2", "value1", "value3"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray());
-
-        // The first 3 events are ticked automatically by window
-
-        routerBolt.cleanup();
-
-        // Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock
-        // The 5th event will be thrown because too late and out of margin
-
-        Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size());
-        Assert.assertEquals("Should finally collect 3 events", 4, orderCollected.size());
-        Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp", new String[] {"value2", "value1", "value3", "value4"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray());
-
-    }
-
-    @SuppressWarnings("serial")
-    public static class MockChangeService extends AbstractMetadataChangeNotifyService {
-        private final static Logger LOG = LoggerFactory.getLogger(MockChangeService.class);
-
-        @Override
-        public void close() throws IOException {
-            LOG.info("Closing");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
deleted file mode 100644
index a3939cc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package org.apache.eagle.alert.engine.serialization;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.utils.ByteUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class JavaSerializationTest {
-    private final static Logger LOG = LoggerFactory.getLogger(JavaSerializationTest.class);
-
-    @Test
-    public void testJavaSerialization() {
-        PartitionedEvent partitionedEvent = new PartitionedEvent();
-        partitionedEvent.setPartitionKey(partitionedEvent.hashCode());
-        partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream", Arrays.asList("name", "host")));
-        StreamEvent event = new StreamEvent();
-        event.setStreamId("sampleStream");
-        event.setTimestamp(System.currentTimeMillis());
-        event.setData(new Object[] {"CPU", "LOCALHOST", true, Long.MAX_VALUE, 60.0});
-        partitionedEvent.setEvent(event);
-
-        int javaSerializationLength = SerializationUtils.serialize(partitionedEvent).length;
-        LOG.info("Java serialization length: {}, event: {}", javaSerializationLength, partitionedEvent);
-
-        int compactLength = 0;
-        compactLength += "sampleStream".getBytes().length;
-        compactLength += ByteUtils.intToBytes(partitionedEvent.getPartition().hashCode()).length;
-        compactLength += ByteUtils.longToBytes(partitionedEvent.getTimestamp()).length;
-        compactLength += "CPU".getBytes().length;
-        compactLength += "LOCALHOST".getBytes().length;
-        compactLength += 1;
-        compactLength += ByteUtils.longToBytes(Long.MAX_VALUE).length;
-        compactLength += ByteUtils.doubleToBytes(60.0).length;
-
-        LOG.info("Compact serialization length: {}, event: {}", compactLength, partitionedEvent);
-        Assert.assertTrue(compactLength * 20 < javaSerializationLength);
-    }
-
-
-    public static StreamDefinition createSampleStreamDefinition(String streamId) {
-        StreamDefinition sampleStreamDefinition = new StreamDefinition();
-        sampleStreamDefinition.setStreamId(streamId);
-        sampleStreamDefinition.setTimeseries(true);
-        sampleStreamDefinition.setValidate(true);
-        sampleStreamDefinition.setDescription("Schema for " + streamId);
-        List<StreamColumn> streamColumns = new ArrayList<>();
-
-        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
-        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
-        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-        sampleStreamDefinition.setColumns(streamColumns);
-        return sampleStreamDefinition;
-    }
-
-    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField) {
-        StreamPartition streamPartition = new StreamPartition();
-        streamPartition.setStreamId(streamId);
-        streamPartition.setColumns(groupByField);
-        streamPartition.setType(StreamPartition.Type.GROUPBY);
-        return streamPartition;
-    }
-
-    @SuppressWarnings("serial")
-    public static PartitionedEvent createSimpleStreamEvent() {
-        StreamEvent event = StreamEvent.builder()
-            .schema(createSampleStreamDefinition("sampleStream_1"))
-            .streamId("sampleStream_1")
-            .timestamep(System.currentTimeMillis())
-            .attributes(new HashMap<String, Object>() {{
-                put("name", "cpu");
-                put("host", "localhost");
-                put("flag", true);
-                put("value", 60.0);
-                put("data", Long.MAX_VALUE);
-                put("unknown", "unknown column value");
-            }}).build();
-        PartitionedEvent pEvent = new PartitionedEvent();
-        pEvent.setEvent(event);
-        pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name", "host")));
-        return pEvent;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
deleted file mode 100644
index 5a81e26..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import backtype.storm.serialization.DefaultKryoFactory;
-import backtype.storm.serialization.DefaultSerializationDelegate;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.joda.time.Period;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.BitSet;
-
-
-public class PartitionedEventSerializerTest {
-    private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class);
-
-    @SuppressWarnings("deprecation")
-    @Test
-    public void testPartitionEventSerialization() throws IOException {
-        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis());
-        ;
-        PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
-
-        ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
-        serializer.serialize(partitionedEvent, dataOutput1);
-        byte[] serializedBytes = dataOutput1.toByteArray();
-        PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
-        Assert.assertEquals(partitionedEvent, deserializedEvent);
-
-        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true);
-
-        byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
-        PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
-        Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
-
-        PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
-        ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
-        serializer2.serialize(partitionedEvent, dataOutput2);
-        byte[] serializedBytes2 = dataOutput2.toByteArray();
-        ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
-        PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
-        Assert.assertEquals(partitionedEvent, deserializedEvent2);
-
-        byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
-        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
-        Output output = new Output(10000);
-        kryo.writeClassAndObject(output, partitionedEvent);
-        byte[] kryoBytes = output.toBytes();
-        Input input = new Input(kryoBytes);
-        PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
-        Assert.assertEquals(partitionedEvent, kryoDeserializedEvent);
-        LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}", serializedBytes.length, serializedBytesCompressed.length, serializedBytes2.length, javaSerialization.length, kryoBytes.length, kryoSerialize(serializedBytes).length, kryoSerialize(serializedBytes2).length);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test
-    public void testPartitionEventSerializationEfficiency() throws IOException {
-        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis());
-        ;
-        PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
-
-        int count = 100000;
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        int i = 0;
-        while (i < count) {
-            ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
-            serializer.serialize(partitionedEvent, dataOutput1);
-            byte[] serializedBytes = dataOutput1.toByteArray();
-            PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
-            Assert.assertEquals(partitionedEvent, deserializedEvent);
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Cached Stream: {} ms", stopWatch.getTime());
-        stopWatch.reset();
-        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true);
-        i = 0;
-        stopWatch.start();
-        while (i < count) {
-            byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
-            PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
-            Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Compressed Cached Stream: {} ms", stopWatch.getTime());
-        stopWatch.reset();
-
-        i = 0;
-        stopWatch.start();
-        while (i < count) {
-            PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
-            ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
-            serializer2.serialize(partitionedEvent, dataOutput2);
-            byte[] serializedBytes2 = dataOutput2.toByteArray();
-            ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
-            PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
-            Assert.assertEquals(partitionedEvent, deserializedEvent2);
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Cached Stream&Partition: {} ms", stopWatch.getTime());
-        stopWatch.reset();
-        i = 0;
-        stopWatch.start();
-        while (i < count) {
-            byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
-            PartitionedEvent javaSerializedEvent = (PartitionedEvent) new DefaultSerializationDelegate().deserialize(javaSerialization);
-            Assert.assertEquals(partitionedEvent, javaSerializedEvent);
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Java Native: {} ms", stopWatch.getTime());
-        stopWatch.reset();
-        i = 0;
-        stopWatch.start();
-        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
-        while (i < count) {
-            Output output = new Output(10000);
-            kryo.writeClassAndObject(output, partitionedEvent);
-            byte[] kryoBytes = output.toBytes();
-            Input input = new Input(kryoBytes);
-            PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
-            Assert.assertEquals(partitionedEvent, kryoDeserializedEvent);
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Kryo: {} ms", stopWatch.getTime());
-    }
-
-    /**
-     * Kryo Serialization Length = Length of byte[] + 2
-     */
-    @Test
-    public void testKryoByteArraySerialization() {
-        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
-        byte[] bytes = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
-        Output output = new Output(1000);
-        kryo.writeObject(output, bytes);
-        Assert.assertEquals(bytes.length + 2, output.toBytes().length);
-    }
-
-    private byte[] kryoSerialize(Object object) {
-        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
-        Output output = new Output(100000);
-        kryo.writeClassAndObject(output, object);
-        return output.toBytes();
-    }
-
-    @Test
-    public void testBitSet() {
-        BitSet bitSet = new BitSet();
-        bitSet.set(0, true); // 1
-        bitSet.set(1, false); // 0
-        bitSet.set(2, true); // 1
-        LOG.info("Bit Set Size: {}", bitSet.size());
-        LOG.info("Bit Set Byte[]: {}", bitSet.toByteArray());
-        LOG.info("Bit Set Byte[]: {}", bitSet.toLongArray());
-        LOG.info("BitSet[0]: {}", bitSet.get(0));
-        LOG.info("BitSet[1]: {}", bitSet.get(1));
-        LOG.info("BitSet[1]: {}", bitSet.get(2));
-
-        byte[] bytes = bitSet.toByteArray();
-
-        BitSet bitSet2 = BitSet.valueOf(bytes);
-
-        LOG.info("Bit Set Size: {}", bitSet2.size());
-        LOG.info("Bit Set Byte[]: {}", bitSet2.toByteArray());
-        LOG.info("Bit Set Byte[]: {}", bitSet2.toLongArray());
-        LOG.info("BitSet[0]: {}", bitSet2.get(0));
-        LOG.info("BitSet[1]: {}", bitSet2.get(1));
-        LOG.info("BitSet[1]: {}", bitSet2.get(2));
-
-
-        BitSet bitSet3 = new BitSet();
-        bitSet3.set(0, true);
-        Assert.assertEquals(1, bitSet3.length());
-
-        BitSet bitSet4 = new BitSet();
-        bitSet4.set(0, false);
-        Assert.assertEquals(0, bitSet4.length());
-        Assert.assertFalse(bitSet4.get(1));
-        Assert.assertFalse(bitSet4.get(2));
-    }
-
-    @Test
-    public void testPartitionType() {
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
deleted file mode 100644
index 9520b62..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.siddhi;
-
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * @since Jun 21, 2016
- */
-public class SiddhiPolicyTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyTest.class);
-
-    private String streams = " define stream syslog_stream("
-        + "dims_facility string, "
-        + "dims_severity string, "
-        + "dims_hostname string, "
-        + "dims_msgid string, "
-        + "timestamp string, "
-        + "conn string, "
-        + "op string, "
-        + "msgId string, "
-        + "command string, "
-        + "name string, "
-        + "namespace string, "
-        + "epochMillis long); ";
-    private SiddhiManager sm;
-
-    @Before
-    public void setup() {
-        sm = new SiddhiManager();
-    }
-
-    @After
-    public void shutdown() {
-        sm.shutdown();
-    }
-
-    @Test
-    public void testPolicy_grpby() {
-        String ql = " from syslog_stream#window.time(1min) select name, namespace, timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
-        StreamCallback sc = new StreamCallback() {
-            @Override
-            public void receive(Event[] arg0) {
-
-            }
-
-            ;
-        };
-
-        String executionPlan = streams + ql;
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
-        runtime.addCallback("syslog_severity_check_output", sc);
-        runtime.start();
-    }
-
-    @Ignore
-    @Test
-    public void testPolicy_agg() throws Exception {
-        String sql = " from syslog_stream#window.time(1min) select "
-            + "name, "
-            + "namespace, "
-            + "timestamp, "
-            + "dims_hostname, "
-            + "count(*) as abortCount "
-            + "group by dims_hostname "
-            + "having abortCount > 3 insert into syslog_severity_check_output; ";
-
-        final AtomicBoolean checked = new AtomicBoolean(false);
-        StreamCallback sc = new StreamCallback() {
-            @Override
-            public void receive(Event[] arg0) {
-                checked.set(true);
-                LOG.info("event array size: " + arg0.length);
-                Set<String> hosts = new HashSet<String>();
-                for (Event e : arg0) {
-                    hosts.add((String) e.getData()[3]);
-                }
-
-                LOG.info(" grouped hosts : " + hosts);
-                Assert.assertTrue(hosts.contains("HOSTNAME-" + 0));
-                Assert.assertTrue(hosts.contains("HOSTNAME-" + 1));
-                Assert.assertTrue(hosts.contains("HOSTNAME-" + 2));
-                Assert.assertFalse(hosts.contains("HOSTNAME-" + 3));
-            }
-
-            ;
-        };
-
-        String executionPlan = streams + sql;
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
-        runtime.addCallback("syslog_severity_check_output", sc);
-        runtime.start();
-        InputHandler handler = runtime.getInputHandler("syslog_stream");
-
-        sendInput(handler);
-
-        Thread.sleep(1000);
-
-        Assert.assertTrue(checked.get());
-
-        runtime.shutdown();
-    }
-
-    /*
-        + "dims_facility string, "
-        + "dims_severity string, "
-        + "dims_hostname string, "
-        + "dims_msgid string, "
-        + "timestamp string, "
-        + "conn string, "
-        + "op string, "
-        + "msgId string, "
-        + "command string, "
-        + "name string, "
-        + "namespace string, "
-        + "epochMillis long)
-     */
-    private void sendInput(InputHandler handler) throws Exception {
-        int length = 15;
-        Event[] events = new Event[length];
-        for (int i = 0; i < length; i++) {
-            Event e = new Event(12);
-            e.setTimestamp(System.currentTimeMillis());
-            e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i % 4, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
-            events[i] = e;
-        }
-
-        handler.send(events);
-
-        Thread.sleep(61 * 1000);
-
-        Event e = new Event(12);
-        e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-        handler.send(e);
-    }
-
-    @Ignore
-    @Test
-    public void testPolicy_regex() throws Exception {
-        String sql = " from syslog_stream[regex:find(\"Abort\", op)]#window.time(1min) select timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
-
-        AtomicBoolean checked = new AtomicBoolean();
-        StreamCallback sc = new StreamCallback() {
-            @Override
-            public void receive(Event[] arg0) {
-                checked.set(true);
-            }
-
-            ;
-        };
-
-        String executionPlan = streams + sql;
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
-        runtime.addCallback("syslog_severity_check_output", sc);
-        runtime.start();
-
-        InputHandler handler = runtime.getInputHandler("syslog_stream");
-
-        sendInput(handler);
-
-        Thread.sleep(1000);
-
-        Assert.assertTrue(checked.get());
-
-        runtime.shutdown();
-    }
-
-    @Ignore
-    @Test
-    public void testPolicy_seq() throws Exception {
-        String sql = ""
-            + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> "
-            + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min "
-            + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op "
-            + " insert into syslog_severity_check_output; ";
-
-        AtomicBoolean checked = new AtomicBoolean();
-        StreamCallback sc = new StreamCallback() {
-            @Override
-            public void receive(Event[] arg0) {
-                checked.set(true);
-            }
-
-            ;
-        };
-
-        String executionPlan = streams + sql;
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
-        runtime.addCallback("syslog_severity_check_output", sc);
-        runtime.start();
-        InputHandler handler = runtime.getInputHandler("syslog_stream");
-
-        sendPatternInput(handler);
-
-        Thread.sleep(1000);
-        Assert.assertTrue(checked.get());
-
-        runtime.shutdown();
-    }
-
-    private void sendPatternInput(InputHandler handler) throws Exception {
-        // validate one
-        Event e = new Event(12);
-        e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
-        e = new Event(12);
-        e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
-        e = new Event(12);
-        e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
-        Thread.sleep(61 * 1000);
-
-        e = new Event(12);
-        e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-        handler.send(e);
-    }
-
-
-    @Test
-    public void testStrConcat() throws Exception {
-        String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " +
-            " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; ";
-        SiddhiManager manager = new SiddhiManager();
-        ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
-        runtime.addCallback("output", new StreamCallback() {
-            @Override
-            public void receive(Event[] events) {
-                EventPrinter.print(events);
-            }
-        });
-
-        runtime.start();
-
-        InputHandler logInput = runtime.getInputHandler("log");
-
-        Event e = new Event();
-        e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {System.currentTimeMillis(), "switch-ra-slc-01", "port01", "log-message...."});
-        logInput.send(e);
-
-        Thread.sleep(1000);
-        runtime.shutdown();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
deleted file mode 100644
index 7694623..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * @since Apr 1, 2016
- */
-public class AttributeCollectAggregatorTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(AttributeCollectAggregatorTest.class);
-
-    @Test
-    public void test() throws Exception {
-        String ql = "define stream s1(timestamp long, host string, type string);";
-        ql += " from s1#window.externalTime(timestamp, 1 sec)";
-        ql += " select eagle:collect(timestamp) as timestamps, eagle:collect(host) as hosts, type group by type insert into output;";
-
-        SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
-
-        InputHandler input = runtime.getInputHandler("s1");
-        runtime.addCallback("output", new StreamCallback() {
-
-            @Override
-            public void receive(Event[] arg0) {
-                logger.info("output event length:" + arg0.length);
-
-                for (Event e : arg0) {
-                    StringBuilder sb = new StringBuilder("\t - [").append(e.getData().length).append("]");
-                    for (Object o : e.getData()) {
-                        sb.append("," + o);
-                    }
-                    logger.info(sb.toString());
-                }
-                logger.info("===end===");
-            }
-        });
-//        StreamDefinition definition = (StreamDefinition) runtime.getStreamDefinitionMap().get("output");
-
-        runtime.start();
-
-        Event[] events = generateEvents();
-        for (Event e : events) {
-            input.send(e);
-        }
-
-        Thread.sleep(1000);
-
-    }
-
-    private Event[] generateEvents() {
-        List<Event> events = new LinkedList<Event>();
-
-        Random r = new Random();
-        Event e = null;
-        long base = System.currentTimeMillis();
-        {
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
-            base += 100;
-            events.add(e);
-        }
-
-        {
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
-            base += 100;
-            events.add(e);
-        }
-
-        base += 10000;
-        {
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
-            base += 100;
-            events.add(e);
-        }
-
-        {
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
-            base += 100;
-            events.add(e);
-        }
-        base += 10000;
-        e = new Event(base, new Object[] {base, "host" + r.nextInt(), "mq"});
-
-        return events.toArray(new Event[0]);
-    }
-
-    @Test
-    public void testQuery() {
-        String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );";
-        ql += "from perfmon_input_stream_cpu#window.length(3) select host, min(value) as min group by host having min>91.0 insert into perfmon_output_stream_cpu;";
-
-        SiddhiManager sm = new SiddhiManager();
-        sm.createExecutionPlanRuntime(ql);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
deleted file mode 100644
index ebec509..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mapdb.BTreeMap;
-import org.mapdb.DB;
-import org.mapdb.DBMaker;
-import org.mapdb.Serializer;
-
-public class MapDBTestSuite {
-    @Test
-    public void testOnHeapDB() {
-        DB db = DBMaker.heapDB().make();
-        BTreeMap<Long, String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create();
-        Assert.assertFalse(map.putIfAbsentBoolean(1L, "val_1"));
-        Assert.assertTrue(map.putIfAbsentBoolean(1L, "val_2"));
-        Assert.assertTrue(map.putIfAbsentBoolean(1L, "val_3"));
-        Assert.assertFalse(map.putIfAbsentBoolean(2L, "val_4"));
-
-        Assert.assertEquals("val_1", map.get(1L));
-        Assert.assertEquals("val_4", map.get(2L));
-
-        Assert.assertTrue(map.replace(2L, "val_4", "val_5"));
-        Assert.assertEquals("val_5", map.get(2L));
-
-        map.close();
-        db.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
deleted file mode 100644
index ff7b8ee..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.Slf4jReporter;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import com.google.common.collect.Ordering;
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
-import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl;
-import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.management.ManagementFactory;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-/**
- * -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+PrintGCTaskTimeStamps -XX:+PrintGCDetails -verbose:gc
- */
-public class StreamSortHandlerTest {
-    private final static Logger LOG = LoggerFactory.getLogger(StreamSortHandlerTest.class);
-
-    static {
-        LOG.info(ManagementFactory.getRuntimeMXBean().getName());
-    }
-
-    private ScheduledReporter metricReporter;
-
-    @Before
-    public void setUp() {
-        final MetricRegistry metrics = new MetricRegistry();
-        metrics.registerAll(new MemoryUsageGaugeSet());
-        metrics.registerAll(new GarbageCollectorMetricSet());
-        metricReporter = Slf4jReporter.forRegistry(metrics)
-            .filter((name, metric) -> name.matches("(.*heap|pools.PS.*).usage"))
-            .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
-            .convertRatesTo(TimeUnit.SECONDS)
-            .convertDurationsTo(TimeUnit.MILLISECONDS)
-            .build();
-        metricReporter.start(60, TimeUnit.SECONDS);
-    }
-
-    /**
-     * Used to debug window bucket lifecycle
-     * <p>
-     * Window period: PT1s, margin: 5s
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testWithUnsortedEventsIn1MinuteWindow() throws InterruptedException {
-        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
-        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1m", 5000), mockCollector);
-        List<PartitionedEvent> unsortedList = new LinkedList<>();
-
-        int i = 0;
-        while (i < 1000) {
-            PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
-            sortHandler.nextEvent(event);
-            unsortedList.add(event);
-            if (event.getTimestamp() > timeClock.getTime()) {
-                timeClock.moveForward(event.getTimestamp());
-            }
-            sortHandler.onTick(timeClock, System.currentTimeMillis());
-            i++;
-        }
-        sortHandler.close();
-        Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
-        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertTrue(mockCollector.get().size() > 0);
-    }
-
-    @Test
-    public void testStreamSortHandlerWithUnsortedEventsIn1HourWindow() throws InterruptedException {
-        testWithUnsortedEventsIn1hWindow(1000000);
-    }
-
-    @Test
-    public void testSortedInPatient() throws InterruptedException {
-        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
-        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
-        List<PartitionedEvent> sortedList = new LinkedList<>();
-
-        int i = 0;
-        while (i < 1000000) {
-            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", System.currentTimeMillis() + i);
-            sortHandler.nextEvent(event);
-            sortedList.add(event);
-            if (event.getTimestamp() > timeClock.getTime()) {
-                timeClock.moveForward(event.getTimestamp());
-            }
-            sortHandler.onTick(timeClock, System.currentTimeMillis());
-            i++;
-        }
-        sortHandler.close();
-        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
-        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertEquals(1000000, mockCollector.get().size());
-    }
-
-    /**
-     * -XX:+PrintGC
-     *
-     * @throws InterruptedException
-     */
-    @Test @Ignore("Igored heavy benchmark test in unit test")
-    public void testWithUnsortedEventsInLargeWindowBenchmark() throws InterruptedException {
-        metricReporter.report();
-        testWithUnsortedEventsIn1hWindow(1000);
-        metricReporter.report();
-        testWithUnsortedEventsIn1hWindow(10000);
-        metricReporter.report();
-        testWithUnsortedEventsIn1hWindow(100000);
-        metricReporter.report();
-        testWithUnsortedEventsIn1hWindow(1000000);
-        metricReporter.report();
-        testWithUnsortedEventsIn1hWindow(10000000);
-        metricReporter.report();
-    }
-
-    public void testWithUnsortedEventsIn1hWindow(int count) throws InterruptedException {
-        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
-        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
-        List<PartitionedEvent> unsortedList = new LinkedList<>();
-
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        int i = 0;
-        while (i < count) {
-            PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
-            sortHandler.nextEvent(event);
-            unsortedList.add(event);
-            if (event.getEvent().getTimestamp() > timeClock.getTime()) {
-                timeClock.moveForward(event.getEvent().getTimestamp());
-            }
-            sortHandler.onTick(timeClock, System.currentTimeMillis());
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Produced {} events in {} ms", count, stopWatch.getTime());
-        sortHandler.close();
-        Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
-        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertTrue(mockCollector.get().size() >= 0);
-    }
-
-    /**
-     * Used to debug window bucket lifecycle
-     * <p>
-     * Window period: PT1h, margin: 5s
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testWithSortedEvents() throws InterruptedException {
-        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
-        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
-        List<PartitionedEvent> sortedList = new LinkedList<>();
-
-        int i = 0;
-        while (i < 1000000) {
-            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", System.currentTimeMillis() + i);
-            sortHandler.nextEvent(event);
-            sortedList.add(event);
-            if (event.getTimestamp() > timeClock.getTime()) {
-                timeClock.moveForward(event.getTimestamp());
-            }
-            sortHandler.onTick(timeClock, System.currentTimeMillis());
-            i++;
-        }
-        sortHandler.close();
-        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
-        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertEquals(1000000, mockCollector.get().size());
-    }
-
-    /**
-     * Used to debug window bucket lifecycle
-     * <p>
-     * Window period: PT1h, margin: 5s
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testWithSortedEventsAndExpireBySystemTime() throws InterruptedException {
-        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
-        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT10s", 1000), mockCollector);
-        List<PartitionedEvent> sortedList = new LinkedList<>();
-
-        PartitionedEvent event = MockSampleMetadataFactory.createRandomSortedEventGroupedByName("sampleStream_1");
-        sortHandler.nextEvent(event);
-        sortedList.add(event);
-        timeClock.moveForward(event.getTimestamp());
-        sortHandler.onTick(timeClock, System.currentTimeMillis());
-
-        // Triggered to become expired by System time
-        sortHandler.onTick(timeClock, System.currentTimeMillis() + 10 * 1000 + 1000L + 1);
-
-        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
-        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertEquals(1, mockCollector.get().size());
-
-        sortHandler.close();
-    }
-
-    //    @Test
-    public void testWithTimerLock() throws InterruptedException {
-        Timer timer = new Timer();
-        List<Long> collected = new ArrayList<>();
-        timer.schedule(new TimerTask() {
-            @Override
-            public void run() {
-                synchronized (collected) {
-                    LOG.info("Ticking {}", DateTimeUtil.millisecondsToHumanDateWithMilliseconds(System.currentTimeMillis()));
-                    collected.add(System.currentTimeMillis());
-                    try {
-                        Thread.sleep(5000);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-        }, 0, 100);
-    }
-}
\ No newline at end of file


Mime
View raw message