eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [06/19] incubator-eagle git commit: EAGLE-324: Init branch-v0.5
Date Wed, 01 Jun 2016 05:56:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
new file mode 100755
index 0000000..fa07701
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.mock;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockStreamCollector implements Collector<AlertStreamEvent> {
+    @SuppressWarnings("unused")
+    private final static Logger LOG = LoggerFactory.getLogger(MockStreamCollector.class);
+    private List<AlertStreamEvent> cache;
+    public MockStreamCollector(){
+        cache = new LinkedList<>();
+    }
+
+    public void emit(AlertStreamEvent event) {
+        cache.add(event);
+        // LOG.info("PartitionedEventCollector received: {}",event);
+    }
+
+    public void clear(){
+        cache.clear();
+    }
+
+    public List<AlertStreamEvent> get(){
+        return cache;
+    }
+
+    public int size(){
+        return cache.size();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
new file mode 100644
index 0000000..2119ecd
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
+
+public class MockStreamMetadataService{
+    private final Map<String,StreamDefinition> streamSchemaMap = new HashMap<>();
+
+    public StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException {
+        if(streamSchemaMap.containsKey(streamId)) {
+            return streamSchemaMap.get(streamId);
+        }else {
+            throw new StreamDefinitionNotFoundException(streamId);
+        }
+    }
+
+    public void registerStream(String streamId, StreamDefinition schema){
+        streamSchemaMap.put(streamId,schema);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
new file mode 100644
index 0000000..1e44c2a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
@@ -0,0 +1,79 @@
+package org.apache.eagle.alert.engine.mock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@SuppressWarnings("serial")
+public class MockStreamReceiver extends BaseRichSpout {
+    private final static Logger LOG = LoggerFactory.getLogger(MockStreamReceiver.class);
+    private SpoutOutputCollector collector;
+    private List<String> outputStreamIds;
+    public MockStreamReceiver(int partition){
+        outputStreamIds = new ArrayList<>(partition);
+        for(int i=0;i<partition;i++){
+            outputStreamIds.add(StreamIdConversion.generateStreamIdByPartition(i));
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void close() {}
+
+    /**
+     * This unit test is not to mock the end2end logic of correlation spout,
+     * but simply generate some sample data for following bolts testing
+     */
+    @Override
+    public void nextTuple() {
+        PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
+        LOG.info("Receive {}",event);
+        collector.emit(outputStreamIds.get(
+                // group by the first field in event i.e. name
+                (int) (event.getPartitionKey() % outputStreamIds.size())),
+                Collections.singletonList(event));
+        Utils.sleep(500);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String streamId:outputStreamIds) {
+            declarer.declareStream(streamId,new Fields(AlertConstants.FIELD_0));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
new file mode 100644
index 0000000..1bccbbf
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
@@ -0,0 +1,301 @@
+package org.apache.eagle.alert.engine.perf;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Since 5/13/16.
+ */
+public class TestSerDeserPer {
+    Object[] data = null;
+    @Before
+    public void before(){
+        int max = 100;
+        StringBuilder sb = new StringBuilder();
+        for(int i=0; i<max; i++){
+            sb.append("a");
+        }
+        data = new Object[]{sb.toString()};
+    }
+
+    @Test
+    public void testSerDeserPerf() throws Exception{
+        Kryo kryo = new Kryo();
+        Output output = new Output(new FileOutputStream("/tmp/file.bin"));
+        for(int i=0; i<1000; i++){
+            kryo.writeObject(output, constructPE());
+        }
+        output.close();
+        Input input = new Input(new FileInputStream("/tmp/file.bin"));
+        PartitionedEvent someObject = kryo.readObject(input, PartitionedEvent.class);
+        input.close();
+        Assert.assertTrue(someObject.getData().length == 1);
+    }
+
+    private PartitionedEvent constructPE(){
+        StreamEvent e = new StreamEvent();
+        e.setStreamId("testStreamId");
+        e.setTimestamp(1463159382000L);
+        e.setData(data);
+        StreamPartition sp = new StreamPartition();
+        List<String> col = new ArrayList<>();
+        col.add("host");
+        sp.setColumns(col);
+        StreamSortSpec sortSpec = new StreamSortSpec();
+        sortSpec.setWindowMargin(30000);
+        sortSpec.setWindowPeriod("PT1M");
+        sp.setSortSpec(sortSpec);
+        sp.setStreamId("testStreamId");
+        sp.setType(StreamPartition.Type.GROUPBY);
+        PartitionedEvent pe = new PartitionedEvent();
+        pe.setEvent(e);
+        pe.setPartition(sp);
+        pe.setPartitionKey(1000);
+        return pe;
+    }
+
+    @Test
+    public void testSerDeserPerf2() throws Exception{
+        Kryo kryo = new Kryo();
+        Output output = new Output(new FileOutputStream("/tmp/file2.bin"));
+        for(int i=0; i<1000; i++){
+            kryo.writeObject(output, constructNewPE());
+        }
+        output.close();
+        Input input = new Input(new FileInputStream("/tmp/file2.bin"));
+        NewPartitionedEvent someObject = kryo.readObject(input, NewPartitionedEvent.class);
+        input.close();
+        Assert.assertTrue(someObject.getData().length == 1);
+    }
+
+    private NewPartitionedEvent constructNewPE(){
+        NewPartitionedEvent pe = new NewPartitionedEvent();
+        pe.setStreamId("testStreamId");
+        pe.setTimestamp(1463159382000L);
+        pe.setData(data);
+
+        pe.setType(StreamPartition.Type.GROUPBY);
+        List<String> col = new ArrayList<>();
+        col.add("host");
+        pe.setColumns(col);
+        pe.setPartitionKey(1000);
+
+        pe.setWindowMargin(30000);
+        pe.setWindowPeriod("PT1M");
+        return pe;
+    }
+
+    @Test
+    public void testSerDeserPerf3() throws Exception{
+        Kryo kryo = new Kryo();
+        Output output = new Output(new FileOutputStream("/tmp/file3.bin"));
+        for(int i=0; i<1000; i++){
+            kryo.writeObject(output, constructNewPE2());
+        }
+        output.close();
+        Input input = new Input(new FileInputStream("/tmp/file3.bin"));
+        NewPartitionedEvent2 someObject = kryo.readObject(input, NewPartitionedEvent2.class);
+        input.close();
+        Assert.assertTrue(someObject.getData().length == 1);
+    }
+
+    private NewPartitionedEvent2 constructNewPE2(){
+        NewPartitionedEvent2 pe = new NewPartitionedEvent2();
+        pe.setStreamId(100);
+        pe.setTimestamp(1463159382000L);
+        pe.setData(data);
+
+        pe.setType(1);
+        int[] col = new int[1];
+        col[0] = 1;
+        pe.setColumns(col);
+        pe.setPartitionKey(1000);
+
+        pe.setWindowMargin(30000);
+        pe.setWindowPeriod(60);
+        return pe;
+    }
+
+    public static class NewPartitionedEvent implements Serializable {
+        private static final long serialVersionUID = -3840016190614238593L;
+        // basic
+        private String streamId;
+        private long timestamp;
+        private Object[] data;
+
+        // stream partition
+        private StreamPartition.Type type;
+        private List<String> columns = new ArrayList<>();
+        private long partitionKey;
+
+        // sort spec
+        private String windowPeriod="";
+        private long windowMargin = 30 * 1000;
+
+        public NewPartitionedEvent(){
+        }
+
+        public String getStreamId() {
+            return streamId;
+        }
+
+        public void setStreamId(String streamId) {
+            this.streamId = streamId;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        public void setTimestamp(long timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        public Object[] getData() {
+            return data;
+        }
+
+        public void setData(Object[] data) {
+            this.data = data;
+        }
+
+        public StreamPartition.Type getType() {
+            return type;
+        }
+
+        public void setType(StreamPartition.Type type) {
+            this.type = type;
+        }
+
+        public List<String> getColumns() {
+            return columns;
+        }
+
+        public void setColumns(List<String> columns) {
+            this.columns = columns;
+        }
+
+        public long getPartitionKey() {
+            return partitionKey;
+        }
+
+        public void setPartitionKey(long partitionKey) {
+            this.partitionKey = partitionKey;
+        }
+
+        public String getWindowPeriod() {
+            return windowPeriod;
+        }
+
+        public void setWindowPeriod(String windowPeriod) {
+            this.windowPeriod = windowPeriod;
+        }
+
+        public long getWindowMargin() {
+            return windowMargin;
+        }
+
+        public void setWindowMargin(long windowMargin) {
+            this.windowMargin = windowMargin;
+        }
+    }
+
+    public static class NewPartitionedEvent2 implements Serializable {
+        private static final long serialVersionUID = -3840016190614238593L;
+        // basic
+        private int streamId;
+        private long timestamp;
+        private Object[] data;
+
+        // stream partition
+        private int type;
+        private int[] columns;
+        private long partitionKey;
+
+        // sort spec
+        private long windowPeriod;
+        private long windowMargin = 30 * 1000;
+
+        public NewPartitionedEvent2(){
+        }
+
+        public int getStreamId() {
+            return streamId;
+        }
+
+        public void setStreamId(int streamId) {
+            this.streamId = streamId;
+        }
+
+        public int getType() {
+            return type;
+        }
+
+        public void setType(int type) {
+            this.type = type;
+        }
+
+        public int[] getColumns() {
+            return columns;
+        }
+
+        public void setColumns(int[] columns) {
+            this.columns = columns;
+        }
+
+        public long getPartitionKey() {
+            return partitionKey;
+        }
+
+        public void setPartitionKey(long partitionKey) {
+            this.partitionKey = partitionKey;
+        }
+
+        public long getWindowPeriod() {
+            return windowPeriod;
+        }
+
+        public void setWindowPeriod(long windowPeriod) {
+            this.windowPeriod = windowPeriod;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        public void setTimestamp(long timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        public Object[] getData() {
+            return data;
+        }
+
+        public void setData(Object[] data) {
+            this.data = data;
+        }
+
+        public long getWindowMargin() {
+            return windowMargin;
+        }
+
+        public void setWindowMargin(long windowMargin) {
+            this.windowMargin = windowMargin;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
new file mode 100755
index 0000000..e322099
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
+import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.runner.AlertBolt;
+import org.apache.eagle.alert.engine.runner.TestStreamRouterBolt;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 5/2/16.
+ */
+@SuppressWarnings({"rawtypes", "unused"})
+public class TestAlertBolt {
+    /**
+     * Following knowledge is guaranteed in
+     *
+     * @see org.apache.eagle.alert.engine.runner.AlertBolt#execute{
+     *    if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){
+     *      throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName());
+     *    }
+     * }
+     *
+     * @throws Exception
+     *
+     * Add test case: 2 alerts should be generated even if they are very close to each other in timestamp
+     */
+    @Test
+    public void testAlertBolt() throws Exception{
+        final AtomicInteger alertCount = new AtomicInteger();
+        final Semaphore mutex = new Semaphore(0);
+        Config config = ConfigFactory.load();
+        PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
+        TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService();
+        AlertBolt bolt = new AlertBolt("alertBolt1", policyGroupEvaluator, config, mockChangeService);
+        OutputCollector collector = new OutputCollector(new IOutputCollector(){
+            int count = 0;
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+                alertCount.incrementAndGet();
+                mutex.release();
+                Assert.assertEquals("testAlertStream", tuple.get(0));
+                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
+                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
+                return null;
+            }
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {            }
+            @Override
+            public void ack(Tuple input) {            }
+            @Override
+            public void fail(Tuple input) {            }
+            @Override
+            public void reportError(Throwable error) {            }
+        });
+        Map stormConf = new HashMap<>();
+        TopologyContext topologyContext = mock(TopologyContext.class);
+        when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
+        bolt.prepare(stormConf, topologyContext, collector);
+
+        String streamId = "cpuUsageStream";
+
+        // construct StreamDefinition
+        StreamDefinition schema = new StreamDefinition();
+        schema.setStreamId(streamId);
+        StreamColumn column = new StreamColumn();
+        column.setName("col1");
+        column.setType(StreamColumn.Type.STRING);
+        schema.setColumns(Collections.singletonList(column));
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        sds.put(schema.getStreamId(), schema);
+
+        // construct StreamPartition
+        StreamPartition sp = new StreamPartition();
+        sp.setColumns(Collections.singletonList("col1"));
+        sp.setStreamId(streamId);
+        sp.setType(StreamPartition.Type.GROUPBY);
+
+        AlertBoltSpec spec = new AlertBoltSpec();
+        spec.setVersion("version1");
+        spec.setTopologyName("testTopology");
+        PolicyDefinition pd = new PolicyDefinition();
+        pd.setName("policy1");
+        pd.setPartitionSpec(Collections.singletonList(sp));
+        pd.setOutputStreams(Collections.singletonList("testAlertStream"));
+        pd.setInputStreams(Collections.singletonList(streamId));
+        pd.setDefinition(new PolicyDefinition.Definition());
+        pd.getDefinition().type = PolicyStreamHandlers.SIDDHI_ENGINE;
+        pd.getDefinition().value = "from cpuUsageStream[col1=='value1' OR col1=='value2'] select col1 insert into testAlertStream;";
+        spec.addBoltPolicy("alertBolt1", pd.getName());
+        spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<PolicyDefinition>(Arrays.asList(pd)));
+        bolt.onAlertBoltSpecChange(spec, sds);
+
+        // contruct GeneralTopologyContext
+        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
+        int taskId = 1;
+        when(context.getComponentId(taskId)).thenReturn("comp1");
+        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+
+        // construct event with "value1"
+        StreamEvent event1 = new StreamEvent();
+        event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
+        Object[] data = new Object[]{"value1"};
+        event1.setData(data);
+        event1.setStreamId(streamId);
+        PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp,1001);
+
+        // construct another event with "value1"
+        StreamEvent event2 = new StreamEvent();
+        event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
+        data = new Object[]{"value2"};
+        event2.setData(data);
+        event2.setStreamId(streamId);
+        PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp,1001);
+
+        Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default");
+        Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default");
+        bolt.execute(input);
+        bolt.execute(input2);
+        Assert.assertTrue("Timeout to acquire mutex in 5s",mutex.tryAcquire(2, 5, TimeUnit.SECONDS));
+        Assert.assertEquals(2, alertCount.get());
+        bolt.cleanup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
new file mode 100644
index 0000000..af79f96
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import org.apache.eagle.alert.coordination.model.PublishSpec;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
+import org.apache.eagle.alert.engine.runner.MapComparator;
+import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @Since 5/14/16.
+ */
+public class TestAlertPublisherBolt {
+
+    @Ignore
+    @Test
+    public void test() {
+        Config config = ConfigFactory.load("application-test.conf");
+        AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
+        publisher.init(config);
+        PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
+        publisher.onPublishChange(spec.getPublishments(), null, null, null);
+        AlertStreamEvent event = create("testAlertStream");
+        publisher.nextEvent(event);
+        AlertStreamEvent event1 = create("testAlertStream");
+        publisher.nextEvent(event1);
+    }
+
+    private AlertStreamEvent create(String streamId){
+        AlertStreamEvent alert = new AlertStreamEvent();
+        PolicyDefinition policy = new PolicyDefinition();
+        policy.setName("policy1");
+        alert.setPolicy(policy);
+        alert.setCreatedTime(System.currentTimeMillis());
+        alert.setData(new Object[]{"field_1", 2, "field_3"});
+        alert.setStreamId(streamId);
+        alert.setCreatedBy(this.toString());
+        return alert;
+    }
+
+    @Test
+    public void testMapComparator() {
+        PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
+        PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec2.json"), PublishSpec.class);
+        Map<String, Publishment> map1 = new HashMap<>();
+        Map<String, Publishment> map2 = new HashMap<>();
+        spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
+        spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
+
+        MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
+        comparator.compare();
+        Assert.assertTrue(comparator.getModified().size() == 1);
+
+        AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
+        AlertPublisherBolt publisherBolt = new AlertPublisherBolt(alertPublisher, null, null);
+        publisherBolt.onAlertPublishSpecChange(spec1, null);
+        publisherBolt.onAlertPublishSpecChange(spec2, null);
+    }
+
+    @Test
+    public void testAlertPublisher() throws Exception {
+        AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
+        List<Publishment> oldPubs = loadEntities("/publishments.json", Publishment.class);
+        List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class);
+        alertPublisher.onPublishChange(oldPubs, null, null, null);
+        alertPublisher.onPublishChange(null, null, newPubs, oldPubs);
+    }
+
+    private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
+        ObjectMapper objectMapper = new ObjectMapper();
+        JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
+        List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type);
+        return l;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
new file mode 100644
index 0000000..8c048cb
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.runner;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStreamRouterBolt {
+    private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class);
+
+    /**
+     * Mocked 5 Events
+     *
+     * 1. Sent in random order:
+     * "value1","value2","value3","value4","value5"
+     *
+     * 2. Received correct time order and value5 is thrown because too late: "value2","value1","value3","value4"
+     *
+     * @throws Exception
+     */
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testRouterWithSortAndRouteSpec() throws Exception{
+        Config config = ConfigFactory.load();
+        StreamRouterImpl routerImpl = new StreamRouterImpl("testStreamRouterImpl");
+        MockChangeService mockChangeService = new MockChangeService();
+        StreamRouterBolt bolt = new StreamRouterBolt(routerImpl, config, mockChangeService);
+
+        final Map<String,List<PartitionedEvent>> streamCollected = new HashMap<>();
+        final List<PartitionedEvent> orderCollected = new ArrayList<>();
+
+        OutputCollector collector = new OutputCollector(new IOutputCollector(){
+            int count = 0;
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+                PartitionedEvent event;
+                try {
+                    event = bolt.deserialize(tuple.get(0));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                if(count == 0) {
+                    count++;
+                }
+                LOG.info(String.format("Collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
+                if(!streamCollected.containsKey(streamId)){
+                    streamCollected.put(streamId,new ArrayList<>());
+                }
+                streamCollected.get(streamId).add(event);
+                orderCollected.add(event);
+                return null;
+            }
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {            }
+            @Override
+            public void ack(Tuple input) {            }
+            @Override
+            public void fail(Tuple input) {            }
+            @SuppressWarnings("unused")
+            public void resetTimeout(Tuple input) {            }
+            @Override
+            public void reportError(Throwable error) {            }
+        });
+
+        Map stormConf = new HashMap<>();
+        TopologyContext topologyContext = mock(TopologyContext.class);
+        when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
+        bolt.prepare(stormConf, topologyContext, collector);
+
+        String streamId = "cpuUsageStream";
+        // StreamPartition, groupby col1 for stream cpuUsageStream
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId(streamId);
+        sp.setColumns(Collections.singletonList("col1"));
+        sp.setType(StreamPartition.Type.GROUPBY);
+
+        StreamSortSpec sortSpec = new StreamSortSpec();
+//        sortSpec.setColumn("timestamp");
+//        sortSpec.setOrder("asc");
+        sortSpec.setWindowPeriod2(Period.minutes(1));
+        sortSpec.setWindowMargin(1000);
+        sp.setSortSpec(sortSpec);
+
+        RouterSpec boltSpec = new RouterSpec();
+
+        // set StreamRouterSpec to have 2 WorkSlot
+        StreamRouterSpec routerSpec = new StreamRouterSpec();
+        routerSpec.setPartition(sp);
+        routerSpec.setStreamId(streamId);
+        PolicyWorkerQueue queue = new PolicyWorkerQueue();
+        queue.setPartition(sp);
+        queue.setWorkers(Arrays.asList(new WorkSlot("testTopology","alertBolt1"), new WorkSlot("testTopology","alertBolt2")));
+        routerSpec.setTargetQueue(Collections.singletonList(queue));
+        boltSpec.addRouterSpec(routerSpec);
+
+        // construct StreamDefinition
+        StreamDefinition schema = new StreamDefinition();
+        schema.setStreamId(streamId);
+        StreamColumn column = new StreamColumn();
+        column.setName("col1");
+        column.setType(StreamColumn.Type.STRING);
+        schema.setColumns(Collections.singletonList(column));
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        sds.put(schema.getStreamId(), schema);
+
+        bolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2"));
+        bolt.onStreamRouteBoltSpecChange(boltSpec, sds);
+        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
+        int taskId = 1;
+        when(context.getComponentId(taskId)).thenReturn("comp1");
+        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+
+        // =======================================
+        // Mock 5 Events
+        //
+        // 1. Sent in random order:
+        // "value1","value2","value3","value4","value5"
+        //
+        // 2. Received correct time order and value5 is thrown because too:
+        // "value2","value1","value3","value4"
+        // =======================================
+
+        // construct event with "value1"
+        StreamEvent event = new StreamEvent();
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30")*1000);
+        Object[] data = new Object[]{"value1"};
+        event.setData(data);
+        event.setStreamId(streamId);
+        PartitionedEvent pEvent = new PartitionedEvent();
+        pEvent.setEvent(event);
+        pEvent.setPartition(sp);
+        Tuple input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
+        bolt.execute(input);
+
+        // construct another event with "value2"
+        event = new StreamEvent();
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10")*1000);
+        data = new Object[]{"value2"};
+        event.setData(data);
+        event.setStreamId(streamId);
+        pEvent = new PartitionedEvent();
+        pEvent.setPartition(sp);
+        pEvent.setEvent(event);
+        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
+        bolt.execute(input);
+
+        // construct another event with "value3"
+        event = new StreamEvent();
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40")*1000);
+        data = new Object[]{"value3"};
+        event.setData(data);
+        event.setStreamId(streamId);
+        pEvent = new PartitionedEvent();
+        pEvent.setPartition(sp);
+        pEvent.setEvent(event);
+        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
+        bolt.execute(input);
+
+        // construct another event with "value4"
+        event = new StreamEvent();
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10")*1000);
+        data = new Object[]{"value4"};
+        event.setData(data);
+        event.setStreamId(streamId);
+        pEvent = new PartitionedEvent();
+        pEvent.setPartition(sp);
+        pEvent.setEvent(event);
+        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
+        bolt.execute(input);
+
+        // construct another event with "value5", which will be thrown because two late
+        event = new StreamEvent();
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10")*1000);
+        data = new Object[]{"value5"};
+        event.setData(data);
+        event.setStreamId(streamId);
+        pEvent = new PartitionedEvent();
+        pEvent.setPartition(sp);
+        pEvent.setEvent(event);
+        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
+        bolt.execute(input);
+
+        Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
+        Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt1",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt1"));
+        Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt2",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt2"));
+
+        Assert.assertEquals("Should finally collect 3 events",3,orderCollected.size());
+        Assert.assertArrayEquals("Should sort 3 events in ASC order",new String[]{"value2","value1","value3"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
+
+        // The first 3 events are ticked automatically by window
+
+        bolt.cleanup();
+
+        // Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock
+        // The 5th event will be thrown because too late and out of margin
+
+        Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
+        Assert.assertEquals("Should finally collect 3 events",4,orderCollected.size());
+        Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp",new String[]{"value2","value1","value3","value4"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
+
+    }
+
+    @SuppressWarnings("serial")
+    public static class MockChangeService extends AbstractMetadataChangeNotifyService{
+        private final static Logger LOG = LoggerFactory.getLogger(MockChangeService.class);
+
+        @Override
+        public void close() throws IOException {
+            LOG.info("Closing");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
new file mode 100644
index 0000000..13d1015
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
@@ -0,0 +1,112 @@
+package org.apache.eagle.alert.engine.serialization;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.utils.ByteUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class JavaSerializationTest {
+    private final static Logger LOG = LoggerFactory.getLogger(JavaSerializationTest.class);
+
+    @Test
+    public void testJavaSerialization(){
+        PartitionedEvent partitionedEvent = new PartitionedEvent();
+        partitionedEvent.setPartitionKey(partitionedEvent.hashCode());
+        partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream",Arrays.asList("name","host")));
+        StreamEvent event = new StreamEvent();
+        event.setStreamId("sampleStream");
+        event.setTimestamp(System.currentTimeMillis());
+        event.setData(new Object[]{"CPU","LOCALHOST",true,Long.MAX_VALUE,60.0});
+        partitionedEvent.setEvent(event);
+
+        int javaSerializationLength = SerializationUtils.serialize(partitionedEvent).length;
+        LOG.info("Java serialization length: {}, event: {}",javaSerializationLength,partitionedEvent);
+
+        int compactLength = 0;
+        compactLength += "sampleStream".getBytes().length;
+        compactLength += ByteUtils.intToBytes(partitionedEvent.getPartition().hashCode()).length;
+        compactLength += ByteUtils.longToBytes(partitionedEvent.getTimestamp()).length;
+        compactLength += "CPU".getBytes().length;
+        compactLength += "LOCALHOST".getBytes().length;
+        compactLength += 1;
+        compactLength += ByteUtils.longToBytes(Long.MAX_VALUE).length;
+        compactLength += ByteUtils.doubleToBytes(60.0).length;
+
+        LOG.info("Compact serialization length: {}, event: {}",compactLength,partitionedEvent);
+        Assert.assertTrue(compactLength * 20 < javaSerializationLength);
+    }
+
+
+    public static StreamDefinition createSampleStreamDefinition(String streamId){
+        StreamDefinition sampleStreamDefinition = new StreamDefinition();
+        sampleStreamDefinition.setStreamId(streamId);
+        sampleStreamDefinition.setTimeseries(true);
+        sampleStreamDefinition.setValidate(true);
+        sampleStreamDefinition.setDescription("Schema for "+streamId);
+        List<StreamColumn> streamColumns = new ArrayList<>();
+
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        sampleStreamDefinition.setColumns(streamColumns);
+        return sampleStreamDefinition;
+    }
+
+    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
+        StreamPartition streamPartition = new StreamPartition();
+        streamPartition.setStreamId(streamId);
+        streamPartition.setColumns(groupByField);
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        return streamPartition;
+    }
+
+    @SuppressWarnings("serial")
+    public static PartitionedEvent createSimpleStreamEvent()  {
+        StreamEvent event = StreamEvent.Builder()
+                    .schema(createSampleStreamDefinition("sampleStream_1"))
+                    .streamId("sampleStream_1")
+                    .timestamep(System.currentTimeMillis())
+                    .attributes(new HashMap<String,Object>(){{
+                        put("name","cpu");
+                        put("host","localhost");
+                        put("flag",true);
+                        put("value",60.0);
+                        put("data",Long.MAX_VALUE);
+                        put("unknown","unknown column value");
+                    }}).build();
+        PartitionedEvent pEvent = new PartitionedEvent();
+        pEvent.setEvent(event);
+        pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name","host")));
+        return pEvent;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
new file mode 100644
index 0000000..0347d50
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import backtype.storm.serialization.DefaultKryoFactory;
+import backtype.storm.serialization.DefaultSerializationDelegate;
+import com.esotericsoftware.kryo.*;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+
+public class PartitionedEventSerializerTest {
+    private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class);
+    @Test
+    public void testPartitionEventSerialization() throws IOException {
+        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
+        PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
+
+        ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
+        serializer.serialize(partitionedEvent,dataOutput1);
+        byte[] serializedBytes = dataOutput1.toByteArray();
+        PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
+        Assert.assertEquals(partitionedEvent,deserializedEvent);
+
+        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
+
+        byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
+        PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
+        Assert.assertEquals(partitionedEvent,deserializedEventCompressed);
+
+        PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
+        ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
+        serializer2.serialize(partitionedEvent,dataOutput2);
+        byte[] serializedBytes2 = dataOutput2.toByteArray();
+        ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
+        PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
+        Assert.assertEquals(partitionedEvent,deserializedEvent2);
+
+        byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
+        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
+        Output output = new Output(10000);
+        kryo.writeClassAndObject(output,partitionedEvent);
+        byte[] kryoBytes = output.toBytes();
+        Input input = new Input(kryoBytes);
+        PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
+        Assert.assertEquals(partitionedEvent,kryoDeserializedEvent);
+        LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}",serializedBytes.length,serializedBytesCompressed.length,serializedBytes2.length,javaSerialization.length,kryoBytes.length,kryoSerialize(serializedBytes).length,kryoSerialize(serializedBytes2).length);
+    }
+    @Test
+    public void testPartitionEventSerializationEfficiency() throws IOException {
+        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
+        PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
+
+        int count = 100000;
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        int i = 0;
+        while(i<count) {
+            ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
+            serializer.serialize(partitionedEvent, dataOutput1);
+            byte[] serializedBytes = dataOutput1.toByteArray();
+            PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
+            Assert.assertEquals(partitionedEvent, deserializedEvent);
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Cached Stream: {} ms",stopWatch.getTime());
+        stopWatch.reset();
+        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
+        i = 0;
+        stopWatch.start();
+        while(i<count) {
+            byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
+            PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
+            Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Compressed Cached Stream: {} ms",stopWatch.getTime());
+        stopWatch.reset();
+
+        i = 0;
+        stopWatch.start();
+        while(i<count) {
+            PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
+            ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
+            serializer2.serialize(partitionedEvent, dataOutput2);
+            byte[] serializedBytes2 = dataOutput2.toByteArray();
+            ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
+            PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
+            Assert.assertEquals(partitionedEvent, deserializedEvent2);
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Cached Stream&Partition: {} ms",stopWatch.getTime());
+        stopWatch.reset();
+        i = 0;
+        stopWatch.start();
+        while(i<count) {
+            byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
+            PartitionedEvent javaSerializedEvent = (PartitionedEvent) new DefaultSerializationDelegate().deserialize(javaSerialization);
+            Assert.assertEquals(partitionedEvent, javaSerializedEvent);
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Java Native: {} ms",stopWatch.getTime());
+        stopWatch.reset();
+        i = 0;
+        stopWatch.start();
+        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
+        while(i<count) {
+            Output output = new Output(10000);
+            kryo.writeClassAndObject(output, partitionedEvent);
+            byte[] kryoBytes = output.toBytes();
+            Input input = new Input(kryoBytes);
+            PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
+            Assert.assertEquals(partitionedEvent, kryoDeserializedEvent);
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Kryo: {} ms",stopWatch.getTime());
+    }
+
+    /**
+     * Kryo Serialization Length = Length of byte[] + 2
+     */
+    @Test
+    public void testKryoByteArraySerialization(){
+        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
+        byte[] bytes = new byte[]{0,1,2,3,4,5,6,7,8,9};
+        Output output = new Output(1000);
+        kryo.writeObject(output,bytes);
+        Assert.assertEquals(bytes.length + 2,output.toBytes().length);
+    }
+
+    private byte[] kryoSerialize(Object object){
+        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
+        Output output = new Output(100000);
+        kryo.writeClassAndObject(output,object);
+        return output.toBytes();
+    }
+
+    @Test
+    public void testBitSet(){
+        BitSet bitSet = new BitSet();
+        bitSet.set(0,true); // 1
+        bitSet.set(1,false); // 0
+        bitSet.set(2,true); // 1
+        LOG.info("Bit Set Size: {}",bitSet.size());
+        LOG.info("Bit Set Byte[]: {}",bitSet.toByteArray());
+        LOG.info("Bit Set Byte[]: {}",bitSet.toLongArray());
+        LOG.info("BitSet[0]: {}",bitSet.get(0));
+        LOG.info("BitSet[1]: {}",bitSet.get(1));
+        LOG.info("BitSet[1]: {}",bitSet.get(2));
+
+        byte[] bytes = bitSet.toByteArray();
+
+        BitSet bitSet2 = BitSet.valueOf(bytes);
+
+        LOG.info("Bit Set Size: {}",bitSet2.size());
+        LOG.info("Bit Set Byte[]: {}",bitSet2.toByteArray());
+        LOG.info("Bit Set Byte[]: {}",bitSet2.toLongArray());
+        LOG.info("BitSet[0]: {}",bitSet2.get(0));
+        LOG.info("BitSet[1]: {}",bitSet2.get(1));
+        LOG.info("BitSet[1]: {}",bitSet2.get(2));
+
+
+        BitSet bitSet3 = new BitSet();
+        bitSet3.set(0,true);
+        Assert.assertEquals(1,bitSet3.length());
+
+        BitSet bitSet4 = new BitSet();
+        bitSet4.set(0,false);
+        Assert.assertEquals(0,bitSet4.length());
+        Assert.assertFalse(bitSet4.get(1));
+        Assert.assertFalse(bitSet4.get(2));
+    }
+
+    @Test
+    public void testPeriod(){
+        Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m")));
+        Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30*60*1000)));
+        Assert.assertEquals("PT1800S", Period.millis(30*60*1000).toString());
+    }
+
+    @Test
+    public void testPartitionType(){
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
new file mode 100644
index 0000000..f2f3b46
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
@@ -0,0 +1,149 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.siddhi.extension;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+/**
+ * @since Apr 1, 2016
+ *
+ */
+public class AttributeCollectAggregatorTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(AttributeCollectAggregatorTest.class);
+
+    @Test
+    public void test() throws Exception {
+        String ql = "define stream s1(timestamp long, host string, type string);";
+        ql += " from s1#window.externalTime(timestamp, 1 sec)";
+        ql += " select eagle:collect(timestamp) as timestamps, eagle:collect(host) as hosts, type group by type insert into output;";
+
+        SiddhiManager sm = new SiddhiManager();
+        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
+
+        InputHandler input = runtime.getInputHandler("s1");
+        runtime.addCallback("output", new StreamCallback() {
+
+            @Override
+            public void receive(Event[] arg0) {
+                logger.info("output event length:" + arg0.length);
+
+                for (Event e : arg0) {
+                    StringBuilder sb = new StringBuilder("\t - [").append(e.getData().length).append("]");
+                    for (Object o : e.getData()) {
+                        sb.append("," + o);
+                    }
+                    logger.info(sb.toString());
+                }
+                logger.info("===end===");
+            }
+        });
+//        StreamDefinition definition = (StreamDefinition) runtime.getStreamDefinitionMap().get("output");
+
+        runtime.start();
+
+        Event[] events = generateEvents();
+        for (Event e : events) {
+            input.send(e);
+        }
+
+        Thread.sleep(1000);
+
+    }
+
+    private Event[] generateEvents() {
+        List<Event> events = new LinkedList<Event>();
+
+        Random r = new Random();
+        Event e = null;
+        long base = System.currentTimeMillis();
+        {
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+            base += 100;
+            events.add(e);
+        }
+
+        {
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+            base += 100;
+            events.add(e);
+        }
+
+        base += 10000;
+        {
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+            base += 100;
+            events.add(e);
+        }
+
+        {
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+            base += 100;
+            events.add(e);
+        }
+        base += 10000;
+        e = new Event(base, new Object[] { base, "host" + r.nextInt(), "mq" });
+
+        return events.toArray(new Event[0]);
+    }
+    
+    @Test
+    public void testQuery() {
+        String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );";
+        ql += "from perfmon_input_stream_cpu#window.length(3) select host, min(value) as min group by host having min>91.0 insert into perfmon_output_stream_cpu;";
+
+        SiddhiManager sm = new SiddhiManager();
+        sm.createExecutionPlanRuntime(ql);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
new file mode 100644
index 0000000..613be00
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mapdb.BTreeMap;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.Serializer;
+
+public class MapDBTestSuite {
+    @Test
+    public void testOnHeapDB(){
+        DB db = DBMaker.heapDB().make();
+        BTreeMap<Long,String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create();
+        Assert.assertFalse(map.putIfAbsentBoolean(1L,"val_1"));
+        Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_2"));
+        Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_3"));
+        Assert.assertFalse(map.putIfAbsentBoolean(2L,"val_4"));
+
+        Assert.assertEquals("val_1",map.get(1L));
+        Assert.assertEquals("val_4",map.get(2L));
+
+        Assert.assertTrue(map.replace(2L,"val_4","val_5"));
+        Assert.assertEquals("val_5",map.get(2L));
+
+        map.close();
+        db.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
new file mode 100644
index 0000000..6cadba7
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
+import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
+import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl;
+import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Slf4jReporter;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.google.common.collect.Ordering;
+
+/**
+ * -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+PrintGCTaskTimeStamps -XX:+PrintGCDetails -verbose:gc
+ */
+public class StreamSortHandlerTest {
+    private final static Logger LOG = LoggerFactory.getLogger(StreamSortHandlerTest.class);
+
+    static {
+        LOG.info(ManagementFactory.getRuntimeMXBean().getName());
+    }
+
+    private ScheduledReporter metricReporter;
+    @Before
+    public void setUp(){
+        final MetricRegistry metrics = new MetricRegistry();
+        metrics.registerAll(new MemoryUsageGaugeSet());
+        metrics.registerAll(new GarbageCollectorMetricSet());
+        metricReporter = Slf4jReporter.forRegistry(metrics)
+                .filter((name, metric) -> name.matches("(.*heap|pools.PS.*).usage"))
+                .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
+                .convertRatesTo(TimeUnit.SECONDS)
+                .convertDurationsTo(TimeUnit.MILLISECONDS)
+                .build();
+        metricReporter.start(60,TimeUnit.SECONDS);
+    }
+
+    /**
+     * Used to debug window bucket lifecycle
+     *
+     * Window period: PT1s, margin: 5s
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWithUnsortedEventsIn1MinuteWindow() throws InterruptedException {
+        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
+        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1m",5000),mockCollector);
+        List<PartitionedEvent> unsortedList = new LinkedList<>();
+
+        int i = 0;
+        while(i<1000) {
+            PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
+            sortHandler.nextEvent(event);
+            unsortedList.add(event);
+            if(event.getTimestamp()>timeClock.getTime()) {
+                timeClock.moveForward(event.getTimestamp());
+            }
+            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            i++;
+        }
+        sortHandler.close();
+        Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
+        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
+        Assert.assertTrue(mockCollector.get().size() > 0);
+    }
+
+    @Test
+    public void testStreamSortHandlerWithUnsortedEventsIn1HourWindow() throws InterruptedException {
+        testWithUnsortedEventsIn1hWindow(1000000);
+    }
+
+    @Test
+    public void testSortedInPatient() throws InterruptedException {
+        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
+        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+        List<PartitionedEvent> sortedList = new LinkedList<>();
+
+        int i = 0;
+        while(i<1000000) {
+            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",System.currentTimeMillis()+i);
+            sortHandler.nextEvent(event);
+            sortedList.add(event);
+            if(event.getTimestamp()>timeClock.getTime()) {
+                timeClock.moveForward(event.getTimestamp());
+            }
+            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            i++;
+        }
+        sortHandler.close();
+        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
+        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
+        Assert.assertEquals(1000000,mockCollector.get().size());
+    }
+
+    /**
+     * -XX:+PrintGC
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWithUnsortedEventsInLargeWindowBenchmark() throws InterruptedException {
+        metricReporter.report();
+        testWithUnsortedEventsIn1hWindow(1000);
+        metricReporter.report();
+        testWithUnsortedEventsIn1hWindow(10000);
+        metricReporter.report();
+        testWithUnsortedEventsIn1hWindow(100000);
+        metricReporter.report();
+        testWithUnsortedEventsIn1hWindow(1000000);
+        metricReporter.report();
+//        testWithUnsortedEventsIn1hWindow(10000000);
+//        metricReporter.report();
+    }
+
+    public void testWithUnsortedEventsIn1hWindow(int count) throws InterruptedException {
+        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
+        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+        List<PartitionedEvent> unsortedList = new LinkedList<>();
+
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        int i = 0;
+        while(i<count) {
+            PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
+            sortHandler.nextEvent(event);
+            unsortedList.add(event);
+            if(event.getEvent().getTimestamp()>timeClock.getTime()) {
+                timeClock.moveForward(event.getEvent().getTimestamp());
+            }
+            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Produced {} events in {} ms",count,stopWatch.getTime());
+        sortHandler.close();
+        Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
+        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
+        Assert.assertTrue(mockCollector.get().size()>=0);
+    }
+
+    /**
+     * Used to debug window bucket lifecycle
+     *
+     * Window period: PT1h, margin: 5s
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWithSortedEvents() throws InterruptedException {
+        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
+        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+        List<PartitionedEvent> sortedList = new LinkedList<>();
+
+        int i = 0;
+        while(i<1000000) {
+            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",System.currentTimeMillis()+i);
+            sortHandler.nextEvent(event);
+            sortedList.add(event);
+            if(event.getTimestamp()>timeClock.getTime()) {
+                timeClock.moveForward(event.getTimestamp());
+            }
+            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            i++;
+        }
+        sortHandler.close();
+        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
+        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
+        Assert.assertEquals(1000000,mockCollector.get().size());
+    }
+
+    /**
+     * Used to debug window bucket lifecycle
+     *
+     * Window period: PT1h, margin: 5s
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWithSortedEventsAndExpireBySystemTime() throws InterruptedException {
+        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
+        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT10s",1000),mockCollector);
+        List<PartitionedEvent> sortedList = new LinkedList<>();
+
+        PartitionedEvent event = MockSampleMetadataFactory.createRandomSortedEventGroupedByName("sampleStream_1");
+        sortHandler.nextEvent(event);
+        sortedList.add(event);
+        timeClock.moveForward(event.getTimestamp());
+        sortHandler.onTick(timeClock,System.currentTimeMillis());
+
+        // Triggered to become expired by System time
+        sortHandler.onTick(timeClock,System.currentTimeMillis()+10*1000+1000L + 1);
+
+        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
+        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
+        Assert.assertEquals(1,mockCollector.get().size());
+
+        sortHandler.close();
+    }
+
+//    @Test
+    public void testWithTimerLock() throws InterruptedException {
+        Timer timer = new Timer();
+        List<Long> collected = new ArrayList<>();
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+                synchronized (collected) {
+                    LOG.info("Ticking {}", DateTimeUtil.millisecondsToHumanDateWithMilliseconds(System.currentTimeMillis()));
+                    collected.add(System.currentTimeMillis());
+                    try {
+                        Thread.sleep(5000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        },0,100);
+    }
+}
\ No newline at end of file


Mime
View raw message