eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [27/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:08:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
new file mode 100644
index 0000000..0347d50
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import backtype.storm.serialization.DefaultKryoFactory;
+import backtype.storm.serialization.DefaultSerializationDelegate;
+import com.esotericsoftware.kryo.*;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+
+public class PartitionedEventSerializerTest {
+    private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class);
+    @Test
+    public void testPartitionEventSerialization() throws IOException {
+        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
+        PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
+
+        ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
+        serializer.serialize(partitionedEvent,dataOutput1);
+        byte[] serializedBytes = dataOutput1.toByteArray();
+        PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
+        Assert.assertEquals(partitionedEvent,deserializedEvent);
+
+        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
+
+        byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
+        PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
+        Assert.assertEquals(partitionedEvent,deserializedEventCompressed);
+
+        PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
+        ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
+        serializer2.serialize(partitionedEvent,dataOutput2);
+        byte[] serializedBytes2 = dataOutput2.toByteArray();
+        ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
+        PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
+        Assert.assertEquals(partitionedEvent,deserializedEvent2);
+
+        byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
+        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
+        Output output = new Output(10000);
+        kryo.writeClassAndObject(output,partitionedEvent);
+        byte[] kryoBytes = output.toBytes();
+        Input input = new Input(kryoBytes);
+        PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
+        Assert.assertEquals(partitionedEvent,kryoDeserializedEvent);
+        LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}",serializedBytes.length,serializedBytesCompressed.length,serializedBytes2.length,javaSerialization.length,kryoBytes.length,kryoSerialize(serializedBytes).length,kryoSerialize(serializedBytes2).length);
+    }
+    @Test
+    public void testPartitionEventSerializationEfficiency() throws IOException {
+        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
+        PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
+
+        int count = 100000;
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        int i = 0;
+        while(i<count) {
+            ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
+            serializer.serialize(partitionedEvent, dataOutput1);
+            byte[] serializedBytes = dataOutput1.toByteArray();
+            PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
+            Assert.assertEquals(partitionedEvent, deserializedEvent);
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Cached Stream: {} ms",stopWatch.getTime());
+        stopWatch.reset();
+        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
+        i = 0;
+        stopWatch.start();
+        while(i<count) {
+            byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
+            PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
+            Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Compressed Cached Stream: {} ms",stopWatch.getTime());
+        stopWatch.reset();
+
+        i = 0;
+        stopWatch.start();
+        while(i<count) {
+            PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
+            ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
+            serializer2.serialize(partitionedEvent, dataOutput2);
+            byte[] serializedBytes2 = dataOutput2.toByteArray();
+            ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
+            PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
+            Assert.assertEquals(partitionedEvent, deserializedEvent2);
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Cached Stream&Partition: {} ms",stopWatch.getTime());
+        stopWatch.reset();
+        i = 0;
+        stopWatch.start();
+        while(i<count) {
+            byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
+            PartitionedEvent javaSerializedEvent = (PartitionedEvent) new DefaultSerializationDelegate().deserialize(javaSerialization);
+            Assert.assertEquals(partitionedEvent, javaSerializedEvent);
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Java Native: {} ms",stopWatch.getTime());
+        stopWatch.reset();
+        i = 0;
+        stopWatch.start();
+        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
+        while(i<count) {
+            Output output = new Output(10000);
+            kryo.writeClassAndObject(output, partitionedEvent);
+            byte[] kryoBytes = output.toBytes();
+            Input input = new Input(kryoBytes);
+            PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
+            Assert.assertEquals(partitionedEvent, kryoDeserializedEvent);
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Kryo: {} ms",stopWatch.getTime());
+    }
+
+    /**
+     * Kryo Serialization Length = Length of byte[] + 2
+     */
+    @Test
+    public void testKryoByteArraySerialization(){
+        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
+        byte[] bytes = new byte[]{0,1,2,3,4,5,6,7,8,9};
+        Output output = new Output(1000);
+        kryo.writeObject(output,bytes);
+        Assert.assertEquals(bytes.length + 2,output.toBytes().length);
+    }
+
+    private byte[] kryoSerialize(Object object){
+        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
+        Output output = new Output(100000);
+        kryo.writeClassAndObject(output,object);
+        return output.toBytes();
+    }
+
+    @Test
+    public void testBitSet(){
+        BitSet bitSet = new BitSet();
+        bitSet.set(0,true); // 1
+        bitSet.set(1,false); // 0
+        bitSet.set(2,true); // 1
+        LOG.info("Bit Set Size: {}",bitSet.size());
+        LOG.info("Bit Set Byte[]: {}",bitSet.toByteArray());
+        LOG.info("Bit Set Byte[]: {}",bitSet.toLongArray());
+        LOG.info("BitSet[0]: {}",bitSet.get(0));
+        LOG.info("BitSet[1]: {}",bitSet.get(1));
+        LOG.info("BitSet[1]: {}",bitSet.get(2));
+
+        byte[] bytes = bitSet.toByteArray();
+
+        BitSet bitSet2 = BitSet.valueOf(bytes);
+
+        LOG.info("Bit Set Size: {}",bitSet2.size());
+        LOG.info("Bit Set Byte[]: {}",bitSet2.toByteArray());
+        LOG.info("Bit Set Byte[]: {}",bitSet2.toLongArray());
+        LOG.info("BitSet[0]: {}",bitSet2.get(0));
+        LOG.info("BitSet[1]: {}",bitSet2.get(1));
+        LOG.info("BitSet[1]: {}",bitSet2.get(2));
+
+
+        BitSet bitSet3 = new BitSet();
+        bitSet3.set(0,true);
+        Assert.assertEquals(1,bitSet3.length());
+
+        BitSet bitSet4 = new BitSet();
+        bitSet4.set(0,false);
+        Assert.assertEquals(0,bitSet4.length());
+        Assert.assertFalse(bitSet4.get(1));
+        Assert.assertFalse(bitSet4.get(2));
+    }
+
+    @Test
+    public void testPeriod(){
+        Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m")));
+        Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30*60*1000)));
+        Assert.assertEquals("PT1800S", Period.millis(30*60*1000).toString());
+    }
+
+    @Test
+    public void testPartitionType(){
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
new file mode 100644
index 0000000..f2f3b46
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
@@ -0,0 +1,149 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.siddhi.extension;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+/**
+ * @since Apr 1, 2016
+ *
+ */
+public class AttributeCollectAggregatorTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(AttributeCollectAggregatorTest.class);
+
+    @Test
+    public void test() throws Exception {
+        String ql = "define stream s1(timestamp long, host string, type string);";
+        ql += " from s1#window.externalTime(timestamp, 1 sec)";
+        ql += " select eagle:collect(timestamp) as timestamps, eagle:collect(host) as hosts, type group by type insert into output;";
+
+        SiddhiManager sm = new SiddhiManager();
+        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
+
+        InputHandler input = runtime.getInputHandler("s1");
+        runtime.addCallback("output", new StreamCallback() {
+
+            @Override
+            public void receive(Event[] arg0) {
+                logger.info("output event length:" + arg0.length);
+
+                for (Event e : arg0) {
+                    StringBuilder sb = new StringBuilder("\t - [").append(e.getData().length).append("]");
+                    for (Object o : e.getData()) {
+                        sb.append("," + o);
+                    }
+                    logger.info(sb.toString());
+                }
+                logger.info("===end===");
+            }
+        });
+//        StreamDefinition definition = (StreamDefinition) runtime.getStreamDefinitionMap().get("output");
+
+        runtime.start();
+
+        Event[] events = generateEvents();
+        for (Event e : events) {
+            input.send(e);
+        }
+
+        Thread.sleep(1000);
+
+    }
+
+    private Event[] generateEvents() {
+        List<Event> events = new LinkedList<Event>();
+
+        Random r = new Random();
+        Event e = null;
+        long base = System.currentTimeMillis();
+        {
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+            base += 100;
+            events.add(e);
+        }
+
+        {
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+            base += 100;
+            events.add(e);
+        }
+
+        base += 10000;
+        {
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+            base += 100;
+            events.add(e);
+        }
+
+        {
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+            base += 100;
+            events.add(e);
+            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+            base += 100;
+            events.add(e);
+        }
+        base += 10000;
+        e = new Event(base, new Object[] { base, "host" + r.nextInt(), "mq" });
+
+        return events.toArray(new Event[0]);
+    }
+    
+    @Test
+    public void testQuery() {
+        String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );";
+        ql += "from perfmon_input_stream_cpu#window.length(3) select host, min(value) as min group by host having min>91.0 insert into perfmon_output_stream_cpu;";
+
+        SiddhiManager sm = new SiddhiManager();
+        sm.createExecutionPlanRuntime(ql);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
new file mode 100644
index 0000000..613be00
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mapdb.BTreeMap;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.Serializer;
+
+public class MapDBTestSuite {
+    @Test
+    public void testOnHeapDB(){
+        DB db = DBMaker.heapDB().make();
+        BTreeMap<Long,String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create();
+        Assert.assertFalse(map.putIfAbsentBoolean(1L,"val_1"));
+        Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_2"));
+        Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_3"));
+        Assert.assertFalse(map.putIfAbsentBoolean(2L,"val_4"));
+
+        Assert.assertEquals("val_1",map.get(1L));
+        Assert.assertEquals("val_4",map.get(2L));
+
+        Assert.assertTrue(map.replace(2L,"val_4","val_5"));
+        Assert.assertEquals("val_5",map.get(2L));
+
+        map.close();
+        db.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
new file mode 100644
index 0000000..6cadba7
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
+import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
+import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl;
+import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Slf4jReporter;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.google.common.collect.Ordering;
+
+/**
+ * -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+PrintGCTaskTimeStamps -XX:+PrintGCDetails -verbose:gc
+ */
+public class StreamSortHandlerTest {
+    private final static Logger LOG = LoggerFactory.getLogger(StreamSortHandlerTest.class);
+
+    static {
+        LOG.info(ManagementFactory.getRuntimeMXBean().getName());
+    }
+
+    private ScheduledReporter metricReporter;
+    @Before
+    public void setUp(){
+        final MetricRegistry metrics = new MetricRegistry();
+        metrics.registerAll(new MemoryUsageGaugeSet());
+        metrics.registerAll(new GarbageCollectorMetricSet());
+        metricReporter = Slf4jReporter.forRegistry(metrics)
+                .filter((name, metric) -> name.matches("(.*heap|pools.PS.*).usage"))
+                .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
+                .convertRatesTo(TimeUnit.SECONDS)
+                .convertDurationsTo(TimeUnit.MILLISECONDS)
+                .build();
+        metricReporter.start(60,TimeUnit.SECONDS);
+    }
+
+    /**
+     * Used to debug window bucket lifecycle
+     *
+     * Window period: PT1s, margin: 5s
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWithUnsortedEventsIn1MinuteWindow() throws InterruptedException {
+        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
+        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1m",5000),mockCollector);
+        List<PartitionedEvent> unsortedList = new LinkedList<>();
+
+        int i = 0;
+        while(i<1000) {
+            PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
+            sortHandler.nextEvent(event);
+            unsortedList.add(event);
+            if(event.getTimestamp()>timeClock.getTime()) {
+                timeClock.moveForward(event.getTimestamp());
+            }
+            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            i++;
+        }
+        sortHandler.close();
+        Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
+        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
+        Assert.assertTrue(mockCollector.get().size() > 0);
+    }
+
+    @Test
+    public void testStreamSortHandlerWithUnsortedEventsIn1HourWindow() throws InterruptedException {
+        testWithUnsortedEventsIn1hWindow(1000000);
+    }
+
+    @Test
+    public void testSortedInPatient() throws InterruptedException {
+        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
+        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+        List<PartitionedEvent> sortedList = new LinkedList<>();
+
+        int i = 0;
+        while(i<1000000) {
+            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",System.currentTimeMillis()+i);
+            sortHandler.nextEvent(event);
+            sortedList.add(event);
+            if(event.getTimestamp()>timeClock.getTime()) {
+                timeClock.moveForward(event.getTimestamp());
+            }
+            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            i++;
+        }
+        sortHandler.close();
+        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
+        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
+        Assert.assertEquals(1000000,mockCollector.get().size());
+    }
+
+    /**
+     * -XX:+PrintGC
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWithUnsortedEventsInLargeWindowBenchmark() throws InterruptedException {
+        metricReporter.report();
+        testWithUnsortedEventsIn1hWindow(1000);
+        metricReporter.report();
+        testWithUnsortedEventsIn1hWindow(10000);
+        metricReporter.report();
+        testWithUnsortedEventsIn1hWindow(100000);
+        metricReporter.report();
+        testWithUnsortedEventsIn1hWindow(1000000);
+        metricReporter.report();
+//        testWithUnsortedEventsIn1hWindow(10000000);
+//        metricReporter.report();
+    }
+
+    public void testWithUnsortedEventsIn1hWindow(int count) throws InterruptedException {
+        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
+        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+        List<PartitionedEvent> unsortedList = new LinkedList<>();
+
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        int i = 0;
+        while(i<count) {
+            PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
+            sortHandler.nextEvent(event);
+            unsortedList.add(event);
+            if(event.getEvent().getTimestamp()>timeClock.getTime()) {
+                timeClock.moveForward(event.getEvent().getTimestamp());
+            }
+            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            i++;
+        }
+        stopWatch.stop();
+        LOG.info("Produced {} events in {} ms",count,stopWatch.getTime());
+        sortHandler.close();
+        Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
+        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
+        Assert.assertTrue(mockCollector.get().size()>=0);
+    }
+
+    /**
+     * Used to debug window bucket lifecycle
+     *
+     * Window period: PT1h, margin: 5s
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWithSortedEvents() throws InterruptedException {
+        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
+        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+        List<PartitionedEvent> sortedList = new LinkedList<>();
+
+        int i = 0;
+        while(i<1000000) {
+            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",System.currentTimeMillis()+i);
+            sortHandler.nextEvent(event);
+            sortedList.add(event);
+            if(event.getTimestamp()>timeClock.getTime()) {
+                timeClock.moveForward(event.getTimestamp());
+            }
+            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            i++;
+        }
+        sortHandler.close();
+        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
+        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
+        Assert.assertEquals(1000000,mockCollector.get().size());
+    }
+
+    /**
+     * Used to debug window bucket lifecycle
+     *
+     * Window period: PT1h, margin: 5s
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testWithSortedEventsAndExpireBySystemTime() throws InterruptedException {
+        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
+        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT10s",1000),mockCollector);
+        List<PartitionedEvent> sortedList = new LinkedList<>();
+
+        PartitionedEvent event = MockSampleMetadataFactory.createRandomSortedEventGroupedByName("sampleStream_1");
+        sortHandler.nextEvent(event);
+        sortedList.add(event);
+        timeClock.moveForward(event.getTimestamp());
+        sortHandler.onTick(timeClock,System.currentTimeMillis());
+
+        // Triggered to become expired by System time
+        sortHandler.onTick(timeClock,System.currentTimeMillis()+10*1000+1000L + 1);
+
+        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
+        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
+        Assert.assertEquals(1,mockCollector.get().size());
+
+        sortHandler.close();
+    }
+
+//    @Test
+    public void testWithTimerLock() throws InterruptedException {
+        Timer timer = new Timer();
+        List<Long> collected = new ArrayList<>();
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+                synchronized (collected) {
+                    LOG.info("Ticking {}", DateTimeUtil.millisecondsToHumanDateWithMilliseconds(System.currentTimeMillis()));
+                    collected.add(System.currentTimeMillis());
+                    try {
+                        Thread.sleep(5000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        },0,100);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
new file mode 100644
index 0000000..dc9782e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+
+@Ignore
+public class StreamWindowBenchmarkTest {
+    private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowBenchmarkTest.class);
+    public void sendDESCOrderedEventsToWindow(StreamWindow window, StreamWindowRepository.StorageType storageType, int num) {
+        LOGGER.info("Sending {} events to {} ({})",num,window.getClass().getSimpleName(),storageType);
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        int i=0;
+        while(i<num) {
+            PartitionedEvent event = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream_1",(window.startTime()+i));
+            window.add(event);
+            i++;
+        }
+        stopWatch.stop();
+        performanceReport.put(num+"\tInsertTime\t"+storageType,stopWatch.getTime());
+        LOGGER.info("Inserted {} events in {} ms",num,stopWatch.getTime());
+        stopWatch.reset();
+        stopWatch.start();
+        window.flush();
+        stopWatch.stop();
+        performanceReport.put(num+"\tReadTime\t"+storageType,stopWatch.getTime());
+    }
+
+    private ScheduledReporter metricReporter;
+    private Map<String,Long> performanceReport;
+    @Before
+    public void setUp(){
+        final MetricRegistry metrics = new MetricRegistry();
+        metrics.registerAll(new MemoryUsageGaugeSet());
+        metrics.registerAll(new GarbageCollectorMetricSet());
+        metricReporter = ConsoleReporter.forRegistry(metrics)
+                .filter((name, metric) -> name.matches("(.*heap|total).(usage|used)"))
+//                .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
+                .convertRatesTo(TimeUnit.SECONDS)
+                .convertDurationsTo(TimeUnit.MILLISECONDS)
+                .build();
+        metricReporter.start(60,TimeUnit.SECONDS);
+        performanceReport = new TreeMap<>();
+    }
+
+    @After
+    public void after(){
+        StringBuilder sb = new StringBuilder();
+        for(Map.Entry<String,Long> entry:performanceReport.entrySet()){
+            sb.append(String.format("%-40s\t%s\n",entry.getKey(),entry.getValue()));
+        }
+        LOGGER.info("\n===== Benchmark Result Report =====\n\n{}",sb.toString());
+    }
+
+    private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
+    private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-05 00:00:00,000");
+    private final long margin = (stop - start)/3;
+
+    private void benchmarkTest(StreamWindow window, StreamWindowRepository.StorageType storageType){
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        LOGGER.info("\n===== Benchmark Test for {} ({}) =====",window.getClass().getSimpleName(),storageType);
+        metricReporter.report();
+        sendDESCOrderedEventsToWindow(window,storageType,1000);
+        metricReporter.report();
+        sendDESCOrderedEventsToWindow(window,storageType,10000);
+        metricReporter.report();
+        sendDESCOrderedEventsToWindow(window,storageType,100000);
+        metricReporter.report();
+        sendDESCOrderedEventsToWindow(window,storageType,1000000);
+        metricReporter.report();
+        stopWatch.stop();
+        LOGGER.info("\n===== Finished in total {} ms =====\n",stopWatch.getTime());
+    }
+
+    @Test @Ignore
+    public void testStreamWindowBenchmarkMain(){
+        testStreamSortedWindowOnHeap();
+        testStreamSortedWindowInSerializedMemory();
+        testStreamSortedWindowOffHeap();
+        testStreamSortedWindowFile();
+    }
+
+    @Test @Ignore
+    public void testStreamSortedWindowOnHeap() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
+        benchmarkTest(window,StreamWindowRepository.StorageType.ONHEAP);
+        window.close();
+    }
+
+    @Test @Ignore
+    public void testStreamSortedWindowInSerializedMemory() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
+        benchmarkTest(window,StreamWindowRepository.StorageType.MEMORY);
+        window.close();
+    }
+
+    @Test @Ignore
+    public void testStreamSortedWindowOffHeap() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
+        benchmarkTest(window,StreamWindowRepository.StorageType.DIRECT_MEMORY);
+        window.close();
+    }
+
+    @Test @Ignore
+    public void testStreamSortedWindowFile() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
+        benchmarkTest(window,StreamWindowRepository.StorageType.FILE_RAF);
+        window.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
new file mode 100644
index 0000000..950aa34
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.util.List;
+
+import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
+import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
+import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Ordering;
+
+@SuppressWarnings("unused")
+public class StreamWindowTestSuite {
+    private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowTestSuite.class);
+
+    private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
+    private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000");
+    private final long margin = (stop - start)/3;
+
+    @Test
+    public void testStreamSortedWindowOnHeap() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
+        streamSortedWindowMustTest(window);
+    }
+
+    @Test
+    public void testStreamSortedWindowInSerializedMemory() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
+        streamSortedWindowMustTest(window);
+    }
+
+    @Test
+    public void testStreamSortedWindowOffHeap() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
+        streamSortedWindowMustTest(window);
+    }
+
+    @Test
+    public void testStreamSortedWindowFile() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
+        streamSortedWindowMustTest(window);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void streamSortedWindowMustTest(StreamWindow window){
+        MockPartitionedCollector collector = new MockPartitionedCollector();
+        window.register(collector);
+
+        StreamTimeClock clock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"));
+
+        // Current time is: "2016-05-04 00:00:30"
+        window.onTick(clock,System.currentTimeMillis());
+
+        Assert.assertTrue(window.alive());
+        Assert.assertFalse(window.expired());
+
+        // Accepted
+        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000")));
+        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000")));
+        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000")));
+        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000")));
+        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000")));
+
+
+        // Rejected
+        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000")));
+        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000")));
+        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000")));
+
+        // Accepted
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
+
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"))));
+
+        // Should accept Duplicated
+        Assert.assertTrue("Should support duplicated timestamp",window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
+
+        Assert.assertEquals(6,window.size());
+
+        // Rejected
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000"))));
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000"))));
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000"))));
+
+        Assert.assertEquals(6,window.size());
+
+        // Now is: "2016-05-04 00:00:55"
+        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:55,000"));
+        window.onTick(clock,System.currentTimeMillis());
+        Assert.assertTrue(window.alive());
+        Assert.assertFalse(window.expired());
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
+        Assert.assertEquals(7,window.size());
+
+        // Flush when stream time delay too much after system time but window will still be alive
+        window.onTick(clock,System.currentTimeMillis() + 1 + stop - start + margin);
+        Assert.assertTrue(window.alive());
+        Assert.assertFalse(window.expired());
+        Assert.assertEquals(0,window.size());
+        Assert.assertEquals(7,collector.size());
+
+        Assert.assertFalse("Because window has flushed but not expired, window should reject future events < last flush stream time",
+                window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:54,000"))));
+        Assert.assertTrue("Because window has flushed but not expired, window should still accept future events >= last flush stream time",
+                window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"))));
+        Assert.assertEquals(1,window.size());
+        Assert.assertEquals(7,collector.size());
+
+        // Now is: "2016-05-04 00:01:10", not expire,
+        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:10,000"));
+        window.onTick(clock,System.currentTimeMillis() + 2 * (1+ stop - start + margin));
+        Assert.assertEquals(8,collector.size());
+
+        // Now is: "2016-05-04 00:01:20", expire
+        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:20,000"));
+        window.onTick(clock,System.currentTimeMillis());
+        Assert.assertFalse(window.alive());
+        Assert.assertTrue(window.expired());
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
+        Assert.assertEquals(0,window.size());
+
+        Assert.assertEquals(8,collector.size());
+
+        Ordering ordering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        Assert.assertTrue(ordering.isOrdered(collector.get()));
+
+        List<PartitionedEvent> list = collector.get();
+        Assert.assertEquals(8,list.size());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"),list.get(0).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"),list.get(1).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"),list.get(2).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"),list.get(3).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"),list.get(4).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"),list.get(5).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"),list.get(6).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"),list.get(7).getTimestamp());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
new file mode 100644
index 0000000..b7e76b8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.TreeMultiset;
+
+/**
+ * Since 5/10/16.
+ */
+public class TreeMultisetComparatorTest {
+    /**
+     * if 2 different events have the same timestamp, and comparator return 0 when timestamp is same,
+     * when they are added into TreeMultiset, the second event will be replaced by the first event
+     */
+    @Test
+    public void testComparator(){
+        TreeMultiset<PartitionedEvent> set = TreeMultiset.create(PartitionedEventTimeOrderingComparator.INSTANCE);
+
+        // construct PartitionEvent1
+        PartitionedEvent event1 = new PartitionedEvent();
+        StreamPartition sp = new StreamPartition();
+        sp.setColumns(Arrays.asList("host"));
+        sp.setSortSpec(null);
+        sp.setStreamId("testStreamId");
+        sp.setType(StreamPartition.Type.GROUPBY);
+        event1.setPartition(sp);
+        event1.setPartitionKey(1000);
+        StreamEvent e1 = new StreamEvent();
+        e1.setData(new Object[]{18.4});
+        e1.setStreamId("testStreamId");
+        e1.setTimestamp(1462909984000L);
+        event1.setEvent(e1);
+
+        // construct PartitionEvent2 with same timestamp but different value
+        PartitionedEvent event2 = new PartitionedEvent();
+        event2.setPartition(sp);
+        event2.setPartitionKey(1000);
+        StreamEvent e2 = new StreamEvent();
+        e2.setData(new Object[]{16.3});
+        e2.setStreamId("testStreamId");
+        e2.setTimestamp(1462909984000L);
+        event2.setEvent(e2);
+
+        // construct PartitionEvent2 with same timestamp but different value
+        PartitionedEvent event3 = new PartitionedEvent();
+        event3.setPartition(sp);
+        event3.setPartitionKey(1000);
+        StreamEvent e3 = new StreamEvent();
+        e3.setData(new Object[]{14.3});
+        e3.setStreamId("testStreamId");
+        e3.setTimestamp(1462909984001L);
+        event3.setEvent(e3);
+
+        PartitionedEvent event4 = new PartitionedEvent();
+        event4.setPartition(sp);
+        event4.setPartitionKey(1000);
+        StreamEvent e4 = new StreamEvent();
+        e4.setData(new Object[]{14.3});
+        e4.setStreamId("testStreamId");
+        e4.setTimestamp(1462909984001L);
+        event4.setEvent(e4);
+
+        Assert.assertNotEquals(event2,event3);
+        Assert.assertEquals(event3,event4);
+
+        // check content in set
+        set.add(event1);
+        set.add(event2);
+        set.add(event3);
+        set.add(event4);
+        Assert.assertEquals(4, set.size());
+        set.forEach(System.out::println);
+
+
+        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1,event2));
+        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1,event3));
+        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event2,event3));
+        Assert.assertEquals(0,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event3,event4));
+
+        Iterator<PartitionedEvent> it = set.iterator();
+        Assert.assertEquals(16.3,it.next().getData()[0]);
+        Assert.assertEquals(18.4,it.next().getData()[0]);
+        Assert.assertEquals(14.3,it.next().getData()[0]);
+        Assert.assertEquals(14.3,it.next().getData()[0]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
new file mode 100644
index 0000000..62427e0
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
@@ -0,0 +1,138 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+@SuppressWarnings({"serial", "unused"})
+public class AlertTopologyTest implements Serializable{
+    private static final Logger LOG = LoggerFactory.getLogger(AlertTopologyTest.class);
+    char[] alphabets = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'};
+    @Ignore
+    @Test
+    public void testMultipleTopics() throws Exception {
+        final String topoId = "myTopology";
+        int numGroupbyBolts = 2;
+        int numTotalGroupbyBolts = 3;
+        System.setProperty("eagle.correlation.numGroupbyBolts", String.valueOf(numGroupbyBolts));
+        System.setProperty("eagle.correlation.topologyName", topoId);
+        System.setProperty("eagle.correlation.mode", "local");
+        System.setProperty("eagle.correlation.zkHosts", "localhost:2181");
+        final String topicName1 = "testTopic3";
+        final String topicName2 = "testTopic4";
+        // ensure topic ready
+        LogManager.getLogger(CorrelationSpout.class).setLevel(Level.DEBUG);
+        Config config = ConfigFactory.load();
+
+        CreateTopicUtils.ensureTopicReady(System.getProperty("eagle.correlation.zkHosts"), topicName1);
+        CreateTopicUtils.ensureTopicReady(System.getProperty("eagle.correlation.zkHosts"), topicName2);
+
+        TopologyBuilder topoBuilder = new TopologyBuilder();
+
+        int numBolts = config.getInt("eagle.correlation.numGroupbyBolts");
+        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, numBolts);
+        String spoutId = "correlation-spout";
+        SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout);
+        for (int i = 0; i < numBolts; i++) {
+            TestBolt bolt = new TestBolt();
+            BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt);
+            boltDecl.fieldsGrouping(spoutId, "stream_" + i, new Fields());
+        }
+
+        String topoName = config.getString("eagle.correlation.topologyName");
+        LOG.info("start topology in local mode");
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(topoName, new HashMap<>(), topoBuilder.createTopology());
+
+        while(true) {
+            try {
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Ignore
+    @Test
+    public void generateRandomStringsToKafka(){
+        String topic = "testTopic3";
+        int max = 1000;
+        Properties configMap = new Properties();
+        configMap.put("bootstrap.servers", "sandbox.hortonworks.com:6667");
+        configMap.put("metadata.broker.list", "sandbox.hortonworks.com:6667");
+        configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("request.required.acks", "1");
+        configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+        configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+        KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
+
+        int i = 0;
+        while(i++ < max) {
+            String randomString = generateRandomString();
+            System.out.println("sending string : " + randomString);
+            ProducerRecord record = new ProducerRecord(topic, randomString);
+            producer.send(record);
+            if(i % 10 == 0){
+                try {
+                    Thread.sleep(10);
+                }catch(Exception ex){
+                }
+            }
+        }
+        producer.close();
+    }
+
+    private String generateRandomString(){
+        long count = Math.round(Math.random() * 10);
+        if(count == 0)
+            count = 1;
+        StringBuilder sb = new StringBuilder();
+        while(count-- > 0) {
+            int index = (int)(Math.floor(Math.random()*26));
+            sb.append(alphabets[index]);
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
new file mode 100644
index 0000000..deca9b4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
@@ -0,0 +1,101 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 4/28/16.
+ */
+@SuppressWarnings({"serial", "unused", "rawtypes"})
+public class CoordinatorSpoutIntegrationTest implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorSpoutIntegrationTest.class);
+    @Ignore  // this test need zookeeper
+    @Test
+    public void testConfigNotify() throws Exception{
+        final String topoId = "myTopology";
+        int numGroupbyBolts = 2;
+        int numTotalGroupbyBolts = 3;
+        System.setProperty("correlation.serviceHost", "sandbox.hortonworks.com");
+        System.setProperty("correlation.servicePort", "58080");
+        System.setProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum", "sandbox.hortonworks.com:2181");
+        System.setProperty("correlation.numGroupbyBolts", String.valueOf(numGroupbyBolts));
+        System.setProperty("correlation.topologyName", topoId);
+        System.setProperty("correlation.mode", "local");
+        System.setProperty("correlation.zkHosts", "sandbox.hortonworks.com:2181");
+        final String topicName1 = "testTopic3";
+        final String topicName2 = "testTopic4";
+        // ensure topic ready
+        LogManager.getLogger(CorrelationSpout.class).setLevel(Level.DEBUG);
+        Config config = ConfigFactory.load();
+
+        CreateTopicUtils.ensureTopicReady(System.getProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum"), topicName1);
+        CreateTopicUtils.ensureTopicReady(System.getProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum"), topicName2);
+
+        TopologyBuilder topoBuilder = new TopologyBuilder();
+
+        int numBolts = config.getInt("correlation.numGroupbyBolts");
+        String spoutId = "correlation-spout";
+        CorrelationSpout spout = new CorrelationSpout(config, topoId,
+                new MockMetadataChangeNotifyService(topoId, spoutId), numBolts);
+        SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout);
+        declarer.setNumTasks(2);
+        for (int i = 0; i < numBolts; i++) {
+            TestBolt bolt = new TestBolt();
+            BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt);
+            boltDecl.fieldsGrouping(spoutId,
+                    StreamIdConversion.generateStreamIdBetween(AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME+i), new Fields());
+        }
+
+        String topoName = config.getString("correlation.topologyName");
+        LOG.info("start topology in local mode");
+        LocalCluster cluster = new LocalCluster();
+        StormTopology topology = topoBuilder.createTopology();
+        cluster.submitTopology(topoName, new HashMap(), topology);
+
+
+        Utils.sleep(Long.MAX_VALUE);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
new file mode 100644
index 0000000..6960537
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
@@ -0,0 +1,184 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import storm.kafka.KafkaSpoutWrapper;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+@Ignore
+public class CorrelationSpoutTest {
+    @SuppressWarnings("unused")
+    private static final String TEST_SPOUT_ID = "spout-id-1";
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CorrelationSpoutTest.class);
+    private String dataSourceName = "ds-name";
+
+    @SuppressWarnings({ "serial", "rawtypes" })
+    @Test
+    public void testMetadataInjestion_emptyMetadata() throws Exception{
+        String topoId = "testMetadataInjection";
+        Config config = ConfigFactory.load();
+        AtomicBoolean validated = new AtomicBoolean(false);
+        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
+            @Override
+            protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context,
+                    SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds)
+                    throws Exception {
+                validated.set(true);
+                return null;
+            }
+        };
+        Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
+        ds.setName("ds-name");
+        ds.setType("KAFKA");
+        ds.setProperties(new HashMap<String, String>());
+        ds.setTopic("name-of-topic1");
+        ds.setSchemeCls("PlainStringScheme");
+        ds.setCodec(new Tuple2StreamMetadata());
+        Map<String,Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
+        dsMap.put(ds.getName(), ds);
+
+        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(ds.getName(), "s1");
+
+        Map<String, List<StreamRepartitionMetadata>> dataSources = new HashMap<String, List<StreamRepartitionMetadata>>();
+        dataSources.put(ds.getName(), Arrays.asList(m1));
+
+        SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap);
+        
+        spout.onReload(newMetadata, null);
+        Assert.assertTrue(validated.get());
+    }
+
+    @SuppressWarnings({ "serial", "rawtypes" })
+    @Test
+    public void testMetadataInjestion_oneNewTopic2Streams() throws Exception {
+        String topoId = "testMetadataInjection";
+        final String topicName = "testTopic";
+
+        Config config = ConfigFactory.load();
+        final AtomicBoolean verified = new AtomicBoolean(false);
+        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1)  {
+            @Override
+            protected KafkaSpoutWrapper createKafkaSpout(Map conf, 
+                    TopologyContext context,
+                    SpoutOutputCollector collector, 
+                    String topic, 
+                    String topic2SchemeClsName,
+                    SpoutSpec streamMetadatas,
+                    Map<String, StreamDefinition> sds) {
+                Assert.assertEquals(1, streamMetadatas.getStreamRepartitionMetadataMap().size());
+                Assert.assertTrue(streamMetadatas.getStream("s1") != null);
+                Assert.assertTrue(streamMetadatas.getStream("s2") != null);
+                Assert.assertEquals(dataSourceName, streamMetadatas.getStream("s1").getTopicName());
+                Assert.assertEquals(dataSourceName, streamMetadatas.getStream("s2").getTopicName());
+                LOG.info("successfully verified new topic and streams");
+                verified.set(true);
+                return null;
+            }
+        };
+
+        Map<String, Kafka2TupleMetadata> dsMap = createDatasource(topicName, dataSourceName);
+
+        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(dataSourceName, "s1");
+        StreamRepartitionMetadata m2 = new StreamRepartitionMetadata(dataSourceName, "s2");
+        Map<String, List<StreamRepartitionMetadata>> dataSources = new HashMap<String, List<StreamRepartitionMetadata>>();
+        dataSources.put(dataSourceName, Arrays.asList(m1, m2));
+
+        SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap);
+        spout.onReload(newMetadata, null);
+        Assert.assertTrue(verified.get());
+    }
+
+    private Map<String, Kafka2TupleMetadata> createDatasource(final String topicName, final String dataSourceName) {
+        Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
+        
+        ds.setName(dataSourceName);
+        ds.setType("KAFKA");
+        ds.setProperties(new HashMap<String, String>());
+        ds.setTopic(topicName);
+        ds.setSchemeCls("PlainStringScheme");
+        ds.setCodec(new Tuple2StreamMetadata());
+        Map<String, Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
+        dsMap.put(ds.getName(), ds);
+        return dsMap;
+    }
+
+    @SuppressWarnings({ "serial", "rawtypes" })
+    @Test
+    public void testMetadataInjestion_deleteOneTopic() throws Exception{
+        String topoId = "testMetadataInjection";
+        final String topicName = "testTopic";
+        Config config = ConfigFactory.load();
+        final AtomicBoolean verified = new AtomicBoolean(false);
+        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
+            @Override
+            protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
+                                                         String schemeClsName, SpoutSpec streamMetadatas,
+                                                         Map<String, StreamDefinition> sds){
+                return new KafkaSpoutWrapper(null, null);
+            }
+            @Override
+            protected void removeKafkaSpout(KafkaSpoutWrapper wrapper){
+                LOG.info("successfully verified removed topic and streams");
+                verified.set(true);
+            }
+        };
+
+        Map<String, Kafka2TupleMetadata> dsMap = createDatasource(topicName, dataSourceName);
+
+        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(dataSourceName, "s1");
+
+        Map<String, List<StreamRepartitionMetadata>> streamMetadatas = new HashMap<String, List<StreamRepartitionMetadata>>();
+        streamMetadatas.put(dataSourceName, Arrays.asList(m1));
+
+        SpoutSpec cachedMetadata = new SpoutSpec(topoId, streamMetadatas, null, dsMap);
+        // add new topic
+        spout.onReload(cachedMetadata, null);
+        // delete new topic
+        try {
+            spout.onReload(new SpoutSpec(topoId, new HashMap<String, List<StreamRepartitionMetadata>>(), new HashMap<>(), new HashMap<String, Kafka2TupleMetadata>()),
+                    null);
+        }catch(Exception ex){
+            LOG.error("error reloading spout metadata", ex);
+            throw ex;
+        }
+        Assert.assertTrue(verified.get());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
new file mode 100644
index 0000000..4779fac
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.ExecutorSummary;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.SpoutStats;
+import backtype.storm.generated.TopologyInfo;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.metric.LoggingMetricsConsumer;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+/**
+ * WordCount but the spout does not stop, and the bolts are implemented in
+ * java.  This can show how fast the word count can run.
+ */
+@SuppressWarnings("serial")
+public class FastWordCountTopology {
+    public static class FastRandomSentenceSpout extends BaseRichSpout {
+        private final static Logger LOG = LoggerFactory.getLogger(FastRandomSentenceSpout.class);
+        SpoutOutputCollector _collector;
+        Random _rand;
+        private static final String[] CHOICES = {
+                "marry had a little lamb whos fleese was white as snow",
+                "and every where that marry went the lamb was sure to go",
+                "one two three four five six seven eight nine ten",
+                "this is a test of the emergency broadcast system this is only a test",
+                "peter piper picked a peck of pickeled peppers"
+        };
+
+        @SuppressWarnings("rawtypes")
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            _collector = collector;
+            _rand = ThreadLocalRandom.current();
+        }
+
+        @Override
+        public void nextTuple() {
+            String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
+            _collector.emit(new Values(sentence), sentence);
+            LOG.debug("Emit tuple: {}, id:{}",new Values(sentence),sentence);
+        }
+
+        @Override
+        public void ack(Object id) {
+            //Ignored
+        }
+
+        @Override
+        public void fail(Object id) {
+            _collector.emit(new Values(id), id);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("sentence"));
+        }
+    }
+
+    public static class SplitSentence extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word: sentence.split("\\s+")) {
+                collector.emit(new Values(word, 1));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null)
+                count = 0;
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    public static void printMetrics(Nimbus.Client client, String name) throws Exception {
+        ClusterSummary summary = client.getClusterInfo();
+        String id = null;
+        for (TopologySummary ts: summary.get_topologies()) {
+            if (name.equals(ts.get_name())) {
+                id = ts.get_id();
+            }
+        }
+        if (id == null) {
+            throw new Exception("Could not find a topology named "+name);
+        }
+        TopologyInfo info = client.getTopologyInfo(id);
+        int uptime = info.get_uptime_secs();
+        long acked = 0;
+        long failed = 0;
+        double weightedAvgTotal = 0.0;
+        for (ExecutorSummary exec: info.get_executors()) {
+            if ("spout".equals(exec.get_component_id())) {
+                SpoutStats stats = exec.get_stats().get_specific().get_spout();
+                Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+                Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+                Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
+                for (String key: ackedMap.keySet()) {
+                    if (failedMap != null) {
+                        Long tmp = failedMap.get(key);
+                        if (tmp != null) {
+                            failed += tmp;
+                        }
+                    }
+                    long ackVal = ackedMap.get(key);
+                    double latVal = avgLatMap.get(key) * ackVal;
+                    acked += ackVal;
+                    weightedAvgTotal += latVal;
+                }
+            }
+        }
+        double avgLatency = weightedAvgTotal/acked;
+        System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
+    }
+
+    public static void kill(Nimbus.Client client, String name) throws Exception {
+        KillOptions opts = new KillOptions();
+        opts.set_wait_secs(0);
+        client.killTopologyWithOpts(name, opts);
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new FastRandomSentenceSpout(), 4);
+
+        builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word"));
+
+        Config conf = new Config();
+        conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+
+        String name = "wc-test";
+        if (args != null && args.length > 0) {
+            name = args[0];
+        }
+
+//        conf.setNumWorkers(1);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(name, conf, builder.createTopology());
+
+        Utils.sleep(Long.MAX_VALUE);
+//
+//        Map clusterConf = Utils.readStormConfig();
+//        clusterConf.putAll(Utils.readCommandLineOpts());
+//        Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+//
+//        //Sleep for 5 mins
+//        for (int i = 0; i < 10; i++) {
+//            Thread.sleep(30 * 1000);
+//            printMetrics(client, name);
+//        }
+//        kill(client, name);
+    }
+}
\ No newline at end of file



Mime
View raw message