eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [28/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:08:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
new file mode 100755
index 0000000..ab19801
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
+import org.apache.eagle.alert.engine.mock.MockStreamCollector;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.metric.api.MultiCountMetric;
+
+public class SiddhiCEPPolicyEventHandlerTest {
+    private final static Logger LOG = LoggerFactory.getLogger(SiddhiCEPPolicyEventHandlerTest.class);
+
+    private Map<String, StreamDefinition> createDefinition(String ... streamIds) {
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        for(String streamId:streamIds) {
+            // construct StreamDefinition
+            StreamDefinition sd = MockSampleMetadataFactory.createSampleStreamDefinition(streamId);
+            sds.put(streamId, sd);
+        }
+        return sds;
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testBySendSimpleEvent() throws Exception {
+        SiddhiPolicyHandler handler;
+        MockStreamCollector collector;
+
+        handler = new SiddhiPolicyHandler(createDefinition("sampleStream_1","sampleStream_2"));
+        collector = new MockStreamCollector();
+        PolicyDefinition policyDefinition = MockSampleMetadataFactory.createSingleMetricSamplePolicy();
+        PolicyHandlerContext context = new PolicyHandlerContext();
+        context.setPolicyDefinition(policyDefinition);
+        context.setPolicyCounter(new MultiCountMetric());
+        handler.prepare(collector,context);
+        StreamEvent event = StreamEvent.Builder()
+                .schema(MockSampleMetadataFactory.createSampleStreamDefinition("sampleStream_1"))
+                .streamId("sampleStream_1")
+                .timestamep(System.currentTimeMillis())
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","cpu");
+                    put("value",60.0);
+                    put("bad","bad column value");
+                }}).build();
+        handler.send(event);
+        handler.close();
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testWithTwoStreamJoinPolicy() throws Exception {
+        Map<String,StreamDefinition> ssd = createDefinition("sampleStream_1","sampleStream_2");
+
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("SampleJoinPolicyForTest");
+        policyDefinition.setInputStreams(Arrays.asList("sampleStream_1","sampleStream_2"));
+        policyDefinition.setOutputStreams(Collections.singletonList("joinedStream"));
+        policyDefinition.setDefinition(new PolicyDefinition.Definition(PolicyStreamHandlers.SIDDHI_ENGINE,
+                "from sampleStream_1#window.length(10) as left " +
+                "join sampleStream_2#window.length(10) as right " +
+                "on left.name == right.name and left.value == right.value " +
+                "select left.timestamp,left.name,left.value "+
+                "insert into joinedStream"));
+        policyDefinition.setPartitionSpec(Collections.singletonList(MockSampleMetadataFactory.createSampleStreamGroupbyPartition("sampleStream_1", Collections.singletonList("name"))));
+        SiddhiPolicyHandler handler;
+        Semaphore mutex = new Semaphore(0);
+        List<AlertStreamEvent> alerts = new ArrayList<>(0);
+        Collector<AlertStreamEvent> collector = (event) -> {
+            LOG.info("Collected {}",event);
+            Assert.assertTrue(event != null);
+            alerts.add(event);
+            mutex.release();
+        };
+
+        handler = new SiddhiPolicyHandler(ssd);
+        PolicyHandlerContext context = new PolicyHandlerContext();
+        context.setPolicyDefinition(policyDefinition);
+        context.setPolicyCounter(new MultiCountMetric());
+        handler.prepare(collector,context);
+
+        long ts_1 = System.currentTimeMillis();
+        long ts_2 = System.currentTimeMillis()+1;
+
+        handler.send(StreamEvent.Builder()
+                .schema(ssd.get("sampleStream_1"))
+                .streamId("sampleStream_1")
+                .timestamep(ts_1)
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","cpu");
+                    put("value",60.0);
+                    put("bad","bad column value");
+                }}).build());
+
+        handler.send(StreamEvent.Builder()
+                .schema(ssd.get("sampleStream_2"))
+                .streamId("sampleStream_2")
+                .timestamep(ts_2)
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","cpu");
+                    put("value",61.0);
+                }}).build());
+
+        handler.send(StreamEvent.Builder()
+                .schema(ssd.get("sampleStream_2"))
+                .streamId("sampleStream_2")
+                .timestamep(ts_2)
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","disk");
+                    put("value",60.0);
+                }}).build());
+
+        handler.send(StreamEvent.Builder()
+                .schema(ssd.get("sampleStream_2"))
+                .streamId("sampleStream_2")
+                .timestamep(ts_2)
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","cpu");
+                    put("value",60.0);
+                }}).build());
+
+        handler.close();
+
+        Assert.assertTrue("Should get result in 5 s",mutex.tryAcquire(5, TimeUnit.SECONDS));
+        Assert.assertEquals(1,alerts.size());
+        Assert.assertEquals("joinedStream",alerts.get(0).getStreamId());
+        Assert.assertEquals("cpu",alerts.get(0).getData()[1]);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
new file mode 100644
index 0000000..d887451
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.integration;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+
+@SuppressWarnings("serial")
+public class MockMetadataServiceClient implements IMetadataServiceClient {
+
+    @Override
+    public List<SpoutSpec>  listSpoutMetadata() {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public ScheduleState getVersionedSpec(String version) {
+        return null;
+    }
+
+    @Override
+    public List<StreamingCluster> listClusters() {
+        return null;
+    }
+
+    @Override
+    public List<PolicyDefinition> listPolicies() {
+        return null;
+    }
+
+    @Override
+    public List<StreamDefinition> listStreams() {
+        return null;
+    }
+
+    @Override
+    public List<Kafka2TupleMetadata> listDataSources() {
+        return null;
+    }
+
+    @Override
+    public List<Publishment> listPublishment() {
+        return null;
+    }
+
+    @Override
+    public ScheduleState getVersionedSpec() {
+        return null;
+    }
+
+    @Override
+    public void addScheduleState(ScheduleState state) {
+        
+    }
+
+    @Override
+    public List<Topology> listTopologies() {
+        return null;
+    }
+
+    @Override
+    public void addStreamingCluster(StreamingCluster cluster) {
+        
+    }
+
+    @Override
+    public void addTopology(Topology t) {
+        
+    }
+
+    @Override
+    public void addPolicy(PolicyDefinition policy) {
+        
+    }
+
+    @Override
+    public void addStreamDefinition(StreamDefinition streamDef) {
+        
+    }
+
+    @Override
+    public void addDataSource(Kafka2TupleMetadata k2t) {
+        
+    }
+
+    @Override
+    public void addPublishment(Publishment pub) {
+        
+    }
+
+    @Override
+    public void clear() {
+        
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
new file mode 100644
index 0000000..76aef7e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
@@ -0,0 +1,47 @@
+package org.apache.eagle.alert.engine.metric;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class MemoryUsageGaugeSetTest {
+    private final Logger LOG = LoggerFactory.getLogger(MemoryUsageGaugeSetTest.class);
+
+    @Test
+    public void testJVMMetrics() throws InterruptedException {
+        LOG.info("Starting testJVMMetrics");
+        final MetricRegistry metrics = new MetricRegistry();
+        ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics)
+                .convertRatesTo(TimeUnit.SECONDS)
+                .convertDurationsTo(TimeUnit.MILLISECONDS)
+                .build();
+        metrics.registerAll(new MemoryUsageGaugeSet());
+        metrics.register("sample", (Gauge<Double>) () -> 0.1234);
+        reporter.start(1, TimeUnit.SECONDS);
+        Thread.sleep(5000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
new file mode 100644
index 0000000..74d11d2
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
@@ -0,0 +1,55 @@
+package org.apache.eagle.alert.engine.mock;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class MockPartitionedCollector implements PartitionedEventCollector {
+    @SuppressWarnings("unused")
+    private final static Logger LOG = LoggerFactory.getLogger(MockPartitionedCollector.class);
+    private List<PartitionedEvent> cache;
+    public MockPartitionedCollector(){
+        cache = new LinkedList<>();
+    }
+
+    public void emit(PartitionedEvent event) {
+        cache.add(event);
+    }
+
+    public void clear(){
+        cache.clear();
+    }
+
+    public List<PartitionedEvent> get(){
+        return cache;
+    }
+
+    public int size(){
+        return cache.size();
+    }
+
+    @Override
+    public void drop(PartitionedEvent event) {
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
new file mode 100644
index 0000000..97e6310
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.mock;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+@SuppressWarnings("serial")
+public class MockSampleMetadataFactory {
+    private static MockStreamMetadataService mockStreamMetadataServiceInstance = null;
+    public static MockStreamMetadataService createSingletonMetadataServiceWithSample(){
+        if(mockStreamMetadataServiceInstance!=null) return mockStreamMetadataServiceInstance;
+        mockStreamMetadataServiceInstance = new MockStreamMetadataService();
+        mockStreamMetadataServiceInstance.registerStream("sampleStream",createSampleStreamDefinition("sampleStream"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_1",createSampleStreamDefinition("sampleStream_1"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_2",createSampleStreamDefinition("sampleStream_2"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_3",createSampleStreamDefinition("sampleStream_3"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_4",createSampleStreamDefinition("sampleStream_4"));
+        return mockStreamMetadataServiceInstance;
+    }
+
+    public static StreamDefinition createSampleStreamDefinition(String streamId){
+        StreamDefinition sampleStreamDefinition = new StreamDefinition();
+        sampleStreamDefinition.setStreamId(streamId);
+        sampleStreamDefinition.setTimeseries(true);
+        sampleStreamDefinition.setValidate(true);
+        sampleStreamDefinition.setDescription("Schema for "+streamId);
+        List<StreamColumn> streamColumns = new ArrayList<>();
+
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+//        streamColumns.add(new StreamColumn.Builder().name("value1").type(StreamColumn.Type.DOUBLE).build());
+//        streamColumns.add(new StreamColumn.Builder().name("value2").type(StreamColumn.Type.DOUBLE).build());
+//        streamColumns.add(new StreamColumn.Builder().name("value3").type(StreamColumn.Type.DOUBLE).build());
+//        streamColumns.add(new StreamColumn.Builder().name("value4").type(StreamColumn.Type.DOUBLE).build());
+//        streamColumns.add(new StreamColumn.Builder().name("value5").type(StreamColumn.Type.DOUBLE).build());
+        sampleStreamDefinition.setColumns(streamColumns);
+        return sampleStreamDefinition;
+    }
+
+    /**
+     * By default window period is: PT1m
+     *
+     * @param streamId
+     * @return
+     */
+    public static StreamSortSpec createSampleStreamSortSpec(String streamId){
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+//        streamSortSpec.setColumn("timestamp");
+//        streamSortSpec.setStreamId(streamId);
+        streamSortSpec.setWindowMargin(1000);
+        streamSortSpec.setWindowPeriod("PT1m");
+        return streamSortSpec;
+    }
+
+    public static StreamSortSpec createSampleStreamSortSpec(String streamId,String period,int margin){
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+//        streamSortSpec.setColumn("timestamp");
+//        streamSortSpec.setStreamId(streamId);
+        streamSortSpec.setWindowMargin(margin);
+        streamSortSpec.setWindowPeriod(period);
+        return streamSortSpec;
+    }
+
+    /**
+     * Policy: from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;
+     *
+     * @return PolicyDefinition[from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;]
+     */
+    public static PolicyDefinition createSingleMetricSamplePolicy(){
+        String definePolicy = "from sampleStream_1[name == \"cpu\" and value > 50.0] select name, host, flag, value insert into outputStream;";
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("SamplePolicyForTest");
+        policyDefinition.setInputStreams(Arrays.asList("sampleStream_1"));
+        policyDefinition.setOutputStreams(Arrays.asList("outputStream"));
+        policyDefinition.setDefinition(new PolicyDefinition.Definition(
+                PolicyStreamHandlers.SIDDHI_ENGINE,
+                definePolicy
+        ));
+        policyDefinition.setPartitionSpec(Arrays.asList(createSampleStreamGroupbyPartition("sampleStream_1",Arrays.asList("name"))));
+        return policyDefinition;
+    }
+
+    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
+        StreamPartition streamPartition = new StreamPartition();
+        streamPartition.setStreamId(streamId);
+        streamPartition.setColumns(new ArrayList<>(groupByField));
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT30m");
+        streamSortSpec.setWindowMargin(10000);
+        streamPartition.setSortSpec(streamSortSpec);
+        return streamPartition;
+    }
+
+    public static StreamRouterSpec createSampleStreamRouteSpec(String streamId, String groupByField, List<String> targetEvaluatorIds){
+        List<WorkSlot> slots = Arrays.asList(targetEvaluatorIds.stream().map((t) -> {
+            return new WorkSlot("sampleTopology", t);
+        }).toArray(WorkSlot[]::new));
+        StreamRouterSpec streamRouteSpec = new StreamRouterSpec();
+        streamRouteSpec.setStreamId(streamId);
+        streamRouteSpec.setPartition(createSampleStreamGroupbyPartition(streamId,Arrays.asList(groupByField)));
+        streamRouteSpec.setTargetQueue(Arrays.asList(new PolicyWorkerQueue(slots)));
+        return streamRouteSpec;
+    }
+
+    public static StreamRouterSpec createSampleStreamRouteSpec(List<String> targetEvaluatorIds){
+        return createSampleStreamRouteSpec("sampleStream_1","name",targetEvaluatorIds);
+    }
+
+    /**
+     * GROUPBY_sampleStream_1_ON_name
+     *
+     * @param targetEvaluatorIds
+     * @return
+     */
+    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_1_BY_name(List<String> targetEvaluatorIds){
+        return createSampleStreamRouteSpec("sampleStream_1","name",targetEvaluatorIds);
+    }
+
+    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_2_BY_name(List<String> targetEvaluatorIds){
+        return createSampleStreamRouteSpec("sampleStream_2","name",targetEvaluatorIds);
+    }
+
+    public static PartitionedEvent createSimpleStreamEvent()  {
+        StreamEvent event = null;
+        try {
+            event = StreamEvent.Builder()
+                .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition("sampleStream_1"))
+                .streamId("sampleStream_1")
+                .timestamep(System.currentTimeMillis())
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","cpu");
+                    put("value",60.0);
+                    put("unknown","unknown column value");
+                }}).build();
+        } catch (StreamDefinitionNotFoundException e) {
+            e.printStackTrace();
+        }
+        PartitionedEvent pEvent = new PartitionedEvent();
+        pEvent.setEvent(event);
+        return pEvent;
+    }
+
+    private final static String[] SAMPLE_STREAM_NAME_OPTIONS=new String[]{
+            "cpu","memory","disk","network"
+    };
+
+    private final static String[] SAMPLE_STREAM_HOST_OPTIONS =new String[]{
+            "localhost_1","localhost_2","localhost_3","localhost_4"
+    };
+
+    private final static Boolean[] SAMPLE_STREAM_FLAG_OPTIONS=new Boolean[]{
+            true,false
+    };
+
+    private final static Double[] SAMPLE_STREAM_VALUE_OPTIONS=new Double[]{
+            -0.20, 40.4,50.5,60.6,10000.1
+    };
+    private final static String[] SAMPLE_STREAM_ID_OPTIONS=new String[]{
+            "sampleStream_1","sampleStream_2","sampleStream_3","sampleStream_4",
+    };
+    private final static Random RANDOM = ThreadLocalRandom.current();
+
+    public static StreamEvent createRandomStreamEvent()  {
+        return createRandomStreamEvent(SAMPLE_STREAM_ID_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_ID_OPTIONS.length)]);
+    }
+
+    public static StreamEvent createRandomStreamEvent(String streamId)  {
+        return createRandomStreamEvent(streamId,System.currentTimeMillis());
+    }
+
+    private final static Long[] TIME_DELTA_OPTIONS = new Long[]{
+            -30000L, -10000L, -5000L, -1000L, 0L, 1000L, 5000L, 10000L, 30000L
+    };
+
+    public static StreamEvent createRandomOutOfTimeOrderStreamEvent(String streamId)  {
+        StreamEvent event = createRandomStreamEvent(streamId);
+        event.setTimestamp(System.currentTimeMillis()+TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
+        return event;
+    }
+
+
+    public static PartitionedEvent createRandomOutOfTimeOrderEventGroupedByName(String streamId)  {
+        StreamEvent event = createRandomStreamEvent(streamId);
+        event.setTimestamp(System.currentTimeMillis()+TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
+        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+    }
+
+    public static PartitionedEvent createPartitionedEventGroupedByName(String streamId,long timestamp)  {
+        StreamEvent event = createRandomStreamEvent(streamId);
+        event.setTimestamp(timestamp);
+        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+    }
+
+    public static PartitionedEvent createRandomSortedEventGroupedByName(String streamId)  {
+        StreamEvent event = createRandomStreamEvent(streamId);
+        event.setTimestamp(System.currentTimeMillis());
+        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+    }
+
+    public static StreamEvent createRandomStreamEvent(String streamId, long timestamp)  {
+        StreamEvent event;
+        try {
+            event = StreamEvent.Builder()
+                    .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition(streamId))
+                    .streamId(streamId)
+                    .timestamep(timestamp)
+                    .attributes(new HashMap<String,Object>(){{
+                        put("name",SAMPLE_STREAM_NAME_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_NAME_OPTIONS.length)]);
+                        put("value", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+                        put("host", SAMPLE_STREAM_HOST_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_HOST_OPTIONS.length)]);
+                        put("flag",SAMPLE_STREAM_FLAG_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_FLAG_OPTIONS.length)]);
+//                        put("value1", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+//                        put("value2", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+//                        put("value3", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+//                        put("value4", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+//                        put("value5", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+                        put("unknown","unknown column value");
+                    }}).build();
+        } catch (StreamDefinitionNotFoundException e) {
+            throw new IllegalStateException(e.getMessage(),e);
+        }
+        return event;
+    }
+
+    public static PartitionedEvent createRandomPartitionedEvent(String streamId, long timestamp)  {
+        StreamEvent event = createRandomStreamEvent(streamId,timestamp);
+        PartitionedEvent partitionedEvent = new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+        return partitionedEvent;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
new file mode 100755
index 0000000..fa07701
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.mock;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockStreamCollector implements Collector<AlertStreamEvent> {
+    @SuppressWarnings("unused")
+    private final static Logger LOG = LoggerFactory.getLogger(MockStreamCollector.class);
+    private List<AlertStreamEvent> cache;
+    public MockStreamCollector(){
+        cache = new LinkedList<>();
+    }
+
+    public void emit(AlertStreamEvent event) {
+        cache.add(event);
+        // LOG.info("PartitionedEventCollector received: {}",event);
+    }
+
+    public void clear(){
+        cache.clear();
+    }
+
+    public List<AlertStreamEvent> get(){
+        return cache;
+    }
+
+    public int size(){
+        return cache.size();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
new file mode 100644
index 0000000..2119ecd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
+
+public class MockStreamMetadataService{
+    private final Map<String,StreamDefinition> streamSchemaMap = new HashMap<>();
+
+    public StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException {
+        if(streamSchemaMap.containsKey(streamId)) {
+            return streamSchemaMap.get(streamId);
+        }else {
+            throw new StreamDefinitionNotFoundException(streamId);
+        }
+    }
+
+    public void registerStream(String streamId, StreamDefinition schema){
+        streamSchemaMap.put(streamId,schema);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
new file mode 100644
index 0000000..1e44c2a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
@@ -0,0 +1,79 @@
+package org.apache.eagle.alert.engine.mock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@SuppressWarnings("serial")
+public class MockStreamReceiver extends BaseRichSpout {
+    private final static Logger LOG = LoggerFactory.getLogger(MockStreamReceiver.class);
+    private SpoutOutputCollector collector;
+    private List<String> outputStreamIds;
+    public MockStreamReceiver(int partition){
+        outputStreamIds = new ArrayList<>(partition);
+        for(int i=0;i<partition;i++){
+            outputStreamIds.add(StreamIdConversion.generateStreamIdByPartition(i));
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void close() {}
+
+    /**
+     * This unit test is not to mock the end2end logic of correlation spout,
+     * but simply generate some sample data for following bolts testing
+     */
+    @Override
+    public void nextTuple() {
+        PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
+        LOG.info("Receive {}",event);
+        collector.emit(outputStreamIds.get(
+                // group by the first field in event i.e. name
+                (int) (event.getPartitionKey() % outputStreamIds.size())),
+                Collections.singletonList(event));
+        Utils.sleep(500);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String streamId:outputStreamIds) {
+            declarer.declareStream(streamId,new Fields(AlertConstants.FIELD_0));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
new file mode 100644
index 0000000..251e47f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.perf;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Since 5/13/16.
+ */
+public class TestSerDeserPer {
+    Object[] data = null;
+    @Before
+    public void before(){
+        int max = 100;
+        StringBuilder sb = new StringBuilder();
+        for(int i=0; i<max; i++){
+            sb.append("a");
+        }
+        data = new Object[]{sb.toString()};
+    }
+
+    @Test
+    public void testSerDeserPerf() throws Exception{
+        Kryo kryo = new Kryo();
+        Output output = new Output(new FileOutputStream("/tmp/file.bin"));
+        for(int i=0; i<1000; i++){
+            kryo.writeObject(output, constructPE());
+        }
+        output.close();
+        Input input = new Input(new FileInputStream("/tmp/file.bin"));
+        PartitionedEvent someObject = kryo.readObject(input, PartitionedEvent.class);
+        input.close();
+        Assert.assertTrue(someObject.getData().length == 1);
+    }
+
+    private PartitionedEvent constructPE(){
+        StreamEvent e = new StreamEvent();
+        e.setStreamId("testStreamId");
+        e.setTimestamp(1463159382000L);
+        e.setData(data);
+        StreamPartition sp = new StreamPartition();
+        List<String> col = new ArrayList<>();
+        col.add("host");
+        sp.setColumns(col);
+        StreamSortSpec sortSpec = new StreamSortSpec();
+        sortSpec.setWindowMargin(30000);
+        sortSpec.setWindowPeriod("PT1M");
+        sp.setSortSpec(sortSpec);
+        sp.setStreamId("testStreamId");
+        sp.setType(StreamPartition.Type.GROUPBY);
+        PartitionedEvent pe = new PartitionedEvent();
+        pe.setEvent(e);
+        pe.setPartition(sp);
+        pe.setPartitionKey(1000);
+        return pe;
+    }
+
+    @Test
+    public void testSerDeserPerf2() throws Exception{
+        Kryo kryo = new Kryo();
+        Output output = new Output(new FileOutputStream("/tmp/file2.bin"));
+        for(int i=0; i<1000; i++){
+            kryo.writeObject(output, constructNewPE());
+        }
+        output.close();
+        Input input = new Input(new FileInputStream("/tmp/file2.bin"));
+        NewPartitionedEvent someObject = kryo.readObject(input, NewPartitionedEvent.class);
+        input.close();
+        Assert.assertTrue(someObject.getData().length == 1);
+    }
+
+    private NewPartitionedEvent constructNewPE(){
+        NewPartitionedEvent pe = new NewPartitionedEvent();
+        pe.setStreamId("testStreamId");
+        pe.setTimestamp(1463159382000L);
+        pe.setData(data);
+
+        pe.setType(StreamPartition.Type.GROUPBY);
+        List<String> col = new ArrayList<>();
+        col.add("host");
+        pe.setColumns(col);
+        pe.setPartitionKey(1000);
+
+        pe.setWindowMargin(30000);
+        pe.setWindowPeriod("PT1M");
+        return pe;
+    }
+
+    @Test
+    public void testSerDeserPerf3() throws Exception{
+        Kryo kryo = new Kryo();
+        Output output = new Output(new FileOutputStream("/tmp/file3.bin"));
+        for(int i=0; i<1000; i++){
+            kryo.writeObject(output, constructNewPE2());
+        }
+        output.close();
+        Input input = new Input(new FileInputStream("/tmp/file3.bin"));
+        NewPartitionedEvent2 someObject = kryo.readObject(input, NewPartitionedEvent2.class);
+        input.close();
+        Assert.assertTrue(someObject.getData().length == 1);
+    }
+
+    private NewPartitionedEvent2 constructNewPE2(){
+        NewPartitionedEvent2 pe = new NewPartitionedEvent2();
+        pe.setStreamId(100);
+        pe.setTimestamp(1463159382000L);
+        pe.setData(data);
+
+        pe.setType(1);
+        int[] col = new int[1];
+        col[0] = 1;
+        pe.setColumns(col);
+        pe.setPartitionKey(1000);
+
+        pe.setWindowMargin(30000);
+        pe.setWindowPeriod(60);
+        return pe;
+    }
+
+    public static class NewPartitionedEvent implements Serializable {
+        private static final long serialVersionUID = -3840016190614238593L;
+        // basic
+        private String streamId;
+        private long timestamp;
+        private Object[] data;
+
+        // stream partition
+        private StreamPartition.Type type;
+        private List<String> columns = new ArrayList<>();
+        private long partitionKey;
+
+        // sort spec
+        private String windowPeriod="";
+        private long windowMargin = 30 * 1000;
+
+        public NewPartitionedEvent(){
+        }
+
+        public String getStreamId() {
+            return streamId;
+        }
+
+        public void setStreamId(String streamId) {
+            this.streamId = streamId;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        public void setTimestamp(long timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        public Object[] getData() {
+            return data;
+        }
+
+        public void setData(Object[] data) {
+            this.data = data;
+        }
+
+        public StreamPartition.Type getType() {
+            return type;
+        }
+
+        public void setType(StreamPartition.Type type) {
+            this.type = type;
+        }
+
+        public List<String> getColumns() {
+            return columns;
+        }
+
+        public void setColumns(List<String> columns) {
+            this.columns = columns;
+        }
+
+        public long getPartitionKey() {
+            return partitionKey;
+        }
+
+        public void setPartitionKey(long partitionKey) {
+            this.partitionKey = partitionKey;
+        }
+
+        public String getWindowPeriod() {
+            return windowPeriod;
+        }
+
+        public void setWindowPeriod(String windowPeriod) {
+            this.windowPeriod = windowPeriod;
+        }
+
+        public long getWindowMargin() {
+            return windowMargin;
+        }
+
+        public void setWindowMargin(long windowMargin) {
+            this.windowMargin = windowMargin;
+        }
+    }
+
+    public static class NewPartitionedEvent2 implements Serializable {
+        private static final long serialVersionUID = -3840016190614238593L;
+        // basic
+        private int streamId;
+        private long timestamp;
+        private Object[] data;
+
+        // stream partition
+        private int type;
+        private int[] columns;
+        private long partitionKey;
+
+        // sort spec
+        private long windowPeriod;
+        private long windowMargin = 30 * 1000;
+
+        public NewPartitionedEvent2(){
+        }
+
+        public int getStreamId() {
+            return streamId;
+        }
+
+        public void setStreamId(int streamId) {
+            this.streamId = streamId;
+        }
+
+        public int getType() {
+            return type;
+        }
+
+        public void setType(int type) {
+            this.type = type;
+        }
+
+        public int[] getColumns() {
+            return columns;
+        }
+
+        public void setColumns(int[] columns) {
+            this.columns = columns;
+        }
+
+        public long getPartitionKey() {
+            return partitionKey;
+        }
+
+        public void setPartitionKey(long partitionKey) {
+            this.partitionKey = partitionKey;
+        }
+
+        public long getWindowPeriod() {
+            return windowPeriod;
+        }
+
+        public void setWindowPeriod(long windowPeriod) {
+            this.windowPeriod = windowPeriod;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        public void setTimestamp(long timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        public Object[] getData() {
+            return data;
+        }
+
+        public void setData(Object[] data) {
+            this.data = data;
+        }
+
+        public long getWindowMargin() {
+            return windowMargin;
+        }
+
+        public void setWindowMargin(long windowMargin) {
+            this.windowMargin = windowMargin;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
new file mode 100755
index 0000000..e322099
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
+import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.runner.AlertBolt;
+import org.apache.eagle.alert.engine.runner.TestStreamRouterBolt;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 5/2/16.
+ */
+@SuppressWarnings({"rawtypes", "unused"})
+public class TestAlertBolt {
+    /**
+     * Following knowledge is guaranteed in
+     *
+     * @see org.apache.eagle.alert.engine.runner.AlertBolt#execute{
+     *    if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){
+     *      throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName());
+     *    }
+     * }
+     *
+     * @throws Exception
+     *
+     * Add test case: 2 alerts should be generated even if they are very close to each other in timestamp
+     */
+    @Test
+    public void testAlertBolt() throws Exception{
+        final AtomicInteger alertCount = new AtomicInteger();
+        final Semaphore mutex = new Semaphore(0);
+        Config config = ConfigFactory.load();
+        PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
+        TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService();
+        AlertBolt bolt = new AlertBolt("alertBolt1", policyGroupEvaluator, config, mockChangeService);
+        OutputCollector collector = new OutputCollector(new IOutputCollector(){
+            int count = 0;
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+                alertCount.incrementAndGet();
+                mutex.release();
+                Assert.assertEquals("testAlertStream", tuple.get(0));
+                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
+                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
+                return null;
+            }
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {            }
+            @Override
+            public void ack(Tuple input) {            }
+            @Override
+            public void fail(Tuple input) {            }
+            @Override
+            public void reportError(Throwable error) {            }
+        });
+        Map stormConf = new HashMap<>();
+        TopologyContext topologyContext = mock(TopologyContext.class);
+        when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
+        bolt.prepare(stormConf, topologyContext, collector);
+
+        String streamId = "cpuUsageStream";
+
+        // construct StreamDefinition
+        StreamDefinition schema = new StreamDefinition();
+        schema.setStreamId(streamId);
+        StreamColumn column = new StreamColumn();
+        column.setName("col1");
+        column.setType(StreamColumn.Type.STRING);
+        schema.setColumns(Collections.singletonList(column));
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        sds.put(schema.getStreamId(), schema);
+
+        // construct StreamPartition
+        StreamPartition sp = new StreamPartition();
+        sp.setColumns(Collections.singletonList("col1"));
+        sp.setStreamId(streamId);
+        sp.setType(StreamPartition.Type.GROUPBY);
+
+        AlertBoltSpec spec = new AlertBoltSpec();
+        spec.setVersion("version1");
+        spec.setTopologyName("testTopology");
+        PolicyDefinition pd = new PolicyDefinition();
+        pd.setName("policy1");
+        pd.setPartitionSpec(Collections.singletonList(sp));
+        pd.setOutputStreams(Collections.singletonList("testAlertStream"));
+        pd.setInputStreams(Collections.singletonList(streamId));
+        pd.setDefinition(new PolicyDefinition.Definition());
+        pd.getDefinition().type = PolicyStreamHandlers.SIDDHI_ENGINE;
+        pd.getDefinition().value = "from cpuUsageStream[col1=='value1' OR col1=='value2'] select col1 insert into testAlertStream;";
+        spec.addBoltPolicy("alertBolt1", pd.getName());
+        spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<PolicyDefinition>(Arrays.asList(pd)));
+        bolt.onAlertBoltSpecChange(spec, sds);
+
+        // contruct GeneralTopologyContext
+        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
+        int taskId = 1;
+        when(context.getComponentId(taskId)).thenReturn("comp1");
+        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+
+        // construct event with "value1"
+        StreamEvent event1 = new StreamEvent();
+        event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
+        Object[] data = new Object[]{"value1"};
+        event1.setData(data);
+        event1.setStreamId(streamId);
+        PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp,1001);
+
+        // construct another event with "value1"
+        StreamEvent event2 = new StreamEvent();
+        event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
+        data = new Object[]{"value2"};
+        event2.setData(data);
+        event2.setStreamId(streamId);
+        PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp,1001);
+
+        Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default");
+        Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default");
+        bolt.execute(input);
+        bolt.execute(input2);
+        Assert.assertTrue("Timeout to acquire mutex in 5s",mutex.tryAcquire(2, 5, TimeUnit.SECONDS));
+        Assert.assertEquals(2, alertCount.get());
+        bolt.cleanup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
new file mode 100644
index 0000000..af79f96
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import org.apache.eagle.alert.coordination.model.PublishSpec;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
+import org.apache.eagle.alert.engine.runner.MapComparator;
+import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @Since 5/14/16.
+ */
+public class TestAlertPublisherBolt {
+
+    @Ignore
+    @Test
+    public void test() {
+        Config config = ConfigFactory.load("application-test.conf");
+        AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
+        publisher.init(config);
+        PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
+        publisher.onPublishChange(spec.getPublishments(), null, null, null);
+        AlertStreamEvent event = create("testAlertStream");
+        publisher.nextEvent(event);
+        AlertStreamEvent event1 = create("testAlertStream");
+        publisher.nextEvent(event1);
+    }
+
+    private AlertStreamEvent create(String streamId){
+        AlertStreamEvent alert = new AlertStreamEvent();
+        PolicyDefinition policy = new PolicyDefinition();
+        policy.setName("policy1");
+        alert.setPolicy(policy);
+        alert.setCreatedTime(System.currentTimeMillis());
+        alert.setData(new Object[]{"field_1", 2, "field_3"});
+        alert.setStreamId(streamId);
+        alert.setCreatedBy(this.toString());
+        return alert;
+    }
+
+    @Test
+    public void testMapComparator() {
+        PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
+        PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec2.json"), PublishSpec.class);
+        Map<String, Publishment> map1 = new HashMap<>();
+        Map<String, Publishment> map2 = new HashMap<>();
+        spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
+        spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
+
+        MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
+        comparator.compare();
+        Assert.assertTrue(comparator.getModified().size() == 1);
+
+        AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
+        AlertPublisherBolt publisherBolt = new AlertPublisherBolt(alertPublisher, null, null);
+        publisherBolt.onAlertPublishSpecChange(spec1, null);
+        publisherBolt.onAlertPublishSpecChange(spec2, null);
+    }
+
+    @Test
+    public void testAlertPublisher() throws Exception {
+        AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
+        List<Publishment> oldPubs = loadEntities("/publishments.json", Publishment.class);
+        List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class);
+        alertPublisher.onPublishChange(oldPubs, null, null, null);
+        alertPublisher.onPublishChange(null, null, newPubs, oldPubs);
+    }
+
+    private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
+        ObjectMapper objectMapper = new ObjectMapper();
+        JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
+        List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type);
+        return l;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
new file mode 100644
index 0000000..8c048cb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.runner;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStreamRouterBolt {
+    private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class);
+
+    /**
+     * Mocked 5 Events
+     *
+     * 1. Sent in random order:
+     * "value1","value2","value3","value4","value5"
+     *
+     * 2. Received correct time order and value5 is thrown because too late: "value2","value1","value3","value4"
+     *
+     * @throws Exception
+     */
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testRouterWithSortAndRouteSpec() throws Exception{
+        Config config = ConfigFactory.load();
+        StreamRouterImpl routerImpl = new StreamRouterImpl("testStreamRouterImpl");
+        MockChangeService mockChangeService = new MockChangeService();
+        StreamRouterBolt bolt = new StreamRouterBolt(routerImpl, config, mockChangeService);
+
+        final Map<String,List<PartitionedEvent>> streamCollected = new HashMap<>();
+        final List<PartitionedEvent> orderCollected = new ArrayList<>();
+
+        OutputCollector collector = new OutputCollector(new IOutputCollector(){
+            int count = 0;
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+                PartitionedEvent event;
+                try {
+                    event = bolt.deserialize(tuple.get(0));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                if(count == 0) {
+                    count++;
+                }
+                LOG.info(String.format("Collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
+                if(!streamCollected.containsKey(streamId)){
+                    streamCollected.put(streamId,new ArrayList<>());
+                }
+                streamCollected.get(streamId).add(event);
+                orderCollected.add(event);
+                return null;
+            }
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {            }
+            @Override
+            public void ack(Tuple input) {            }
+            @Override
+            public void fail(Tuple input) {            }
+            @SuppressWarnings("unused")
+            public void resetTimeout(Tuple input) {            }
+            @Override
+            public void reportError(Throwable error) {            }
+        });
+
+        Map stormConf = new HashMap<>();
+        TopologyContext topologyContext = mock(TopologyContext.class);
+        when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
+        bolt.prepare(stormConf, topologyContext, collector);
+
+        String streamId = "cpuUsageStream";
+        // StreamPartition, groupby col1 for stream cpuUsageStream
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId(streamId);
+        sp.setColumns(Collections.singletonList("col1"));
+        sp.setType(StreamPartition.Type.GROUPBY);
+
+        StreamSortSpec sortSpec = new StreamSortSpec();
+//        sortSpec.setColumn("timestamp");
+//        sortSpec.setOrder("asc");
+        sortSpec.setWindowPeriod2(Period.minutes(1));
+        sortSpec.setWindowMargin(1000);
+        sp.setSortSpec(sortSpec);
+
+        RouterSpec boltSpec = new RouterSpec();
+
+        // set StreamRouterSpec to have 2 WorkSlot
+        StreamRouterSpec routerSpec = new StreamRouterSpec();
+        routerSpec.setPartition(sp);
+        routerSpec.setStreamId(streamId);
+        PolicyWorkerQueue queue = new PolicyWorkerQueue();
+        queue.setPartition(sp);
+        queue.setWorkers(Arrays.asList(new WorkSlot("testTopology","alertBolt1"), new WorkSlot("testTopology","alertBolt2")));
+        routerSpec.setTargetQueue(Collections.singletonList(queue));
+        boltSpec.addRouterSpec(routerSpec);
+
+        // construct StreamDefinition
+        StreamDefinition schema = new StreamDefinition();
+        schema.setStreamId(streamId);
+        StreamColumn column = new StreamColumn();
+        column.setName("col1");
+        column.setType(StreamColumn.Type.STRING);
+        schema.setColumns(Collections.singletonList(column));
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        sds.put(schema.getStreamId(), schema);
+
+        bolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2"));
+        bolt.onStreamRouteBoltSpecChange(boltSpec, sds);
+        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
+        int taskId = 1;
+        when(context.getComponentId(taskId)).thenReturn("comp1");
+        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+
+        // =======================================
+        // Mock 5 Events
+        //
+        // 1. Sent in random order:
+        // "value1","value2","value3","value4","value5"
+        //
+        // 2. Received correct time order and value5 is thrown because too:
+        // "value2","value1","value3","value4"
+        // =======================================
+
+        // construct event with "value1"
+        StreamEvent event = new StreamEvent();
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30")*1000);
+        Object[] data = new Object[]{"value1"};
+        event.setData(data);
+        event.setStreamId(streamId);
+        PartitionedEvent pEvent = new PartitionedEvent();
+        pEvent.setEvent(event);
+        pEvent.setPartition(sp);
+        Tuple input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
+        bolt.execute(input);
+
+        // construct another event with "value2"
+        event = new StreamEvent();
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10")*1000);
+        data = new Object[]{"value2"};
+        event.setData(data);
+        event.setStreamId(streamId);
+        pEvent = new PartitionedEvent();
+        pEvent.setPartition(sp);
+        pEvent.setEvent(event);
+        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
+        bolt.execute(input);
+
+        // construct another event with "value3"
+        event = new StreamEvent();
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40")*1000);
+        data = new Object[]{"value3"};
+        event.setData(data);
+        event.setStreamId(streamId);
+        pEvent = new PartitionedEvent();
+        pEvent.setPartition(sp);
+        pEvent.setEvent(event);
+        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
+        bolt.execute(input);
+
+        // construct another event with "value4"
+        event = new StreamEvent();
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10")*1000);
+        data = new Object[]{"value4"};
+        event.setData(data);
+        event.setStreamId(streamId);
+        pEvent = new PartitionedEvent();
+        pEvent.setPartition(sp);
+        pEvent.setEvent(event);
+        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
+        bolt.execute(input);
+
+        // construct another event with "value5", which will be thrown because two late
+        event = new StreamEvent();
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10")*1000);
+        data = new Object[]{"value5"};
+        event.setData(data);
+        event.setStreamId(streamId);
+        pEvent = new PartitionedEvent();
+        pEvent.setPartition(sp);
+        pEvent.setEvent(event);
+        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
+        bolt.execute(input);
+
+        Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
+        Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt1",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt1"));
+        Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt2",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt2"));
+
+        Assert.assertEquals("Should finally collect 3 events",3,orderCollected.size());
+        Assert.assertArrayEquals("Should sort 3 events in ASC order",new String[]{"value2","value1","value3"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
+
+        // The first 3 events are ticked automatically by window
+
+        bolt.cleanup();
+
+        // Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock
+        // The 5th event will be thrown because too late and out of margin
+
+        Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
+        Assert.assertEquals("Should finally collect 3 events",4,orderCollected.size());
+        Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp",new String[]{"value2","value1","value3","value4"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
+
+    }
+
+    @SuppressWarnings("serial")
+    public static class MockChangeService extends AbstractMetadataChangeNotifyService{
+        private final static Logger LOG = LoggerFactory.getLogger(MockChangeService.class);
+
+        @Override
+        public void close() throws IOException {
+            LOG.info("Closing");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
new file mode 100644
index 0000000..13d1015
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
@@ -0,0 +1,112 @@
+package org.apache.eagle.alert.engine.serialization;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.utils.ByteUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class JavaSerializationTest {
+    private final static Logger LOG = LoggerFactory.getLogger(JavaSerializationTest.class);
+
+    @Test
+    public void testJavaSerialization(){
+        PartitionedEvent partitionedEvent = new PartitionedEvent();
+        partitionedEvent.setPartitionKey(partitionedEvent.hashCode());
+        partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream",Arrays.asList("name","host")));
+        StreamEvent event = new StreamEvent();
+        event.setStreamId("sampleStream");
+        event.setTimestamp(System.currentTimeMillis());
+        event.setData(new Object[]{"CPU","LOCALHOST",true,Long.MAX_VALUE,60.0});
+        partitionedEvent.setEvent(event);
+
+        int javaSerializationLength = SerializationUtils.serialize(partitionedEvent).length;
+        LOG.info("Java serialization length: {}, event: {}",javaSerializationLength,partitionedEvent);
+
+        int compactLength = 0;
+        compactLength += "sampleStream".getBytes().length;
+        compactLength += ByteUtils.intToBytes(partitionedEvent.getPartition().hashCode()).length;
+        compactLength += ByteUtils.longToBytes(partitionedEvent.getTimestamp()).length;
+        compactLength += "CPU".getBytes().length;
+        compactLength += "LOCALHOST".getBytes().length;
+        compactLength += 1;
+        compactLength += ByteUtils.longToBytes(Long.MAX_VALUE).length;
+        compactLength += ByteUtils.doubleToBytes(60.0).length;
+
+        LOG.info("Compact serialization length: {}, event: {}",compactLength,partitionedEvent);
+        Assert.assertTrue(compactLength * 20 < javaSerializationLength);
+    }
+
+
+    public static StreamDefinition createSampleStreamDefinition(String streamId){
+        StreamDefinition sampleStreamDefinition = new StreamDefinition();
+        sampleStreamDefinition.setStreamId(streamId);
+        sampleStreamDefinition.setTimeseries(true);
+        sampleStreamDefinition.setValidate(true);
+        sampleStreamDefinition.setDescription("Schema for "+streamId);
+        List<StreamColumn> streamColumns = new ArrayList<>();
+
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        sampleStreamDefinition.setColumns(streamColumns);
+        return sampleStreamDefinition;
+    }
+
+    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
+        StreamPartition streamPartition = new StreamPartition();
+        streamPartition.setStreamId(streamId);
+        streamPartition.setColumns(groupByField);
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        return streamPartition;
+    }
+
+    @SuppressWarnings("serial")
+    public static PartitionedEvent createSimpleStreamEvent()  {
+        StreamEvent event = StreamEvent.Builder()
+                    .schema(createSampleStreamDefinition("sampleStream_1"))
+                    .streamId("sampleStream_1")
+                    .timestamep(System.currentTimeMillis())
+                    .attributes(new HashMap<String,Object>(){{
+                        put("name","cpu");
+                        put("host","localhost");
+                        put("flag",true);
+                        put("value",60.0);
+                        put("data",Long.MAX_VALUE);
+                        put("unknown","unknown column value");
+                    }}).build();
+        PartitionedEvent pEvent = new PartitionedEvent();
+        pEvent.setEvent(event);
+        pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name","host")));
+        return pEvent;
+    }
+}


Mime
View raw message