eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [04/18] incubator-eagle git commit: EAGLE-324 Init branch-v0.5
Date Wed, 01 Jun 2016 06:00:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
new file mode 100644
index 0000000..dc9782e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+
+@Ignore
+public class StreamWindowBenchmarkTest {
+    private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowBenchmarkTest.class);
+    public void sendDESCOrderedEventsToWindow(StreamWindow window, StreamWindowRepository.StorageType storageType, int num) {
+        LOGGER.info("Sending {} events to {} ({})",num,window.getClass().getSimpleName(),storageType);
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        int i=0;
+        while(i<num) {
+            PartitionedEvent event = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream_1",(window.startTime()+i));
+            window.add(event);
+            i++;
+        }
+        stopWatch.stop();
+        performanceReport.put(num+"\tInsertTime\t"+storageType,stopWatch.getTime());
+        LOGGER.info("Inserted {} events in {} ms",num,stopWatch.getTime());
+        stopWatch.reset();
+        stopWatch.start();
+        window.flush();
+        stopWatch.stop();
+        performanceReport.put(num+"\tReadTime\t"+storageType,stopWatch.getTime());
+    }
+
+    private ScheduledReporter metricReporter;
+    private Map<String,Long> performanceReport;
+    @Before
+    public void setUp(){
+        final MetricRegistry metrics = new MetricRegistry();
+        metrics.registerAll(new MemoryUsageGaugeSet());
+        metrics.registerAll(new GarbageCollectorMetricSet());
+        metricReporter = ConsoleReporter.forRegistry(metrics)
+                .filter((name, metric) -> name.matches("(.*heap|total).(usage|used)"))
+//                .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
+                .convertRatesTo(TimeUnit.SECONDS)
+                .convertDurationsTo(TimeUnit.MILLISECONDS)
+                .build();
+        metricReporter.start(60,TimeUnit.SECONDS);
+        performanceReport = new TreeMap<>();
+    }
+
+    @After
+    public void after(){
+        StringBuilder sb = new StringBuilder();
+        for(Map.Entry<String,Long> entry:performanceReport.entrySet()){
+            sb.append(String.format("%-40s\t%s\n",entry.getKey(),entry.getValue()));
+        }
+        LOGGER.info("\n===== Benchmark Result Report =====\n\n{}",sb.toString());
+    }
+
+    private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
+    private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-05 00:00:00,000");
+    private final long margin = (stop - start)/3;
+
+    private void benchmarkTest(StreamWindow window, StreamWindowRepository.StorageType storageType){
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        LOGGER.info("\n===== Benchmark Test for {} ({}) =====",window.getClass().getSimpleName(),storageType);
+        metricReporter.report();
+        sendDESCOrderedEventsToWindow(window,storageType,1000);
+        metricReporter.report();
+        sendDESCOrderedEventsToWindow(window,storageType,10000);
+        metricReporter.report();
+        sendDESCOrderedEventsToWindow(window,storageType,100000);
+        metricReporter.report();
+        sendDESCOrderedEventsToWindow(window,storageType,1000000);
+        metricReporter.report();
+        stopWatch.stop();
+        LOGGER.info("\n===== Finished in total {} ms =====\n",stopWatch.getTime());
+    }
+
+    @Test @Ignore
+    public void testStreamWindowBenchmarkMain(){
+        testStreamSortedWindowOnHeap();
+        testStreamSortedWindowInSerializedMemory();
+        testStreamSortedWindowOffHeap();
+        testStreamSortedWindowFile();
+    }
+
+    @Test @Ignore
+    public void testStreamSortedWindowOnHeap() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
+        benchmarkTest(window,StreamWindowRepository.StorageType.ONHEAP);
+        window.close();
+    }
+
+    @Test @Ignore
+    public void testStreamSortedWindowInSerializedMemory() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
+        benchmarkTest(window,StreamWindowRepository.StorageType.MEMORY);
+        window.close();
+    }
+
+    @Test @Ignore
+    public void testStreamSortedWindowOffHeap() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
+        benchmarkTest(window,StreamWindowRepository.StorageType.DIRECT_MEMORY);
+        window.close();
+    }
+
+    @Test @Ignore
+    public void testStreamSortedWindowFile() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
+        benchmarkTest(window,StreamWindowRepository.StorageType.FILE_RAF);
+        window.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
new file mode 100644
index 0000000..950aa34
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.util.List;
+
+import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
+import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
+import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Ordering;
+
+@SuppressWarnings("unused")
+public class StreamWindowTestSuite {
+    private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowTestSuite.class);
+
+    private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
+    private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000");
+    private final long margin = (stop - start)/3;
+
+    @Test
+    public void testStreamSortedWindowOnHeap() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
+        streamSortedWindowMustTest(window);
+    }
+
+    @Test
+    public void testStreamSortedWindowInSerializedMemory() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
+        streamSortedWindowMustTest(window);
+    }
+
+    @Test
+    public void testStreamSortedWindowOffHeap() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
+        streamSortedWindowMustTest(window);
+    }
+
+    @Test
+    public void testStreamSortedWindowFile() {
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
+        streamSortedWindowMustTest(window);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void streamSortedWindowMustTest(StreamWindow window){
+        MockPartitionedCollector collector = new MockPartitionedCollector();
+        window.register(collector);
+
+        StreamTimeClock clock = new StreamTimeClockInLocalMemory("sampleStream_1");
+        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"));
+
+        // Current time is: "2016-05-04 00:00:30"
+        window.onTick(clock,System.currentTimeMillis());
+
+        Assert.assertTrue(window.alive());
+        Assert.assertFalse(window.expired());
+
+        // Accepted
+        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000")));
+        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000")));
+        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000")));
+        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000")));
+        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000")));
+
+
+        // Rejected
+        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000")));
+        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000")));
+        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000")));
+
+        // Accepted
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
+
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"))));
+
+        // Should accept Duplicated
+        Assert.assertTrue("Should support duplicated timestamp",window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
+
+        Assert.assertEquals(6,window.size());
+
+        // Rejected
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000"))));
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000"))));
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000"))));
+
+        Assert.assertEquals(6,window.size());
+
+        // Now is: "2016-05-04 00:00:55"
+        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:55,000"));
+        window.onTick(clock,System.currentTimeMillis());
+        Assert.assertTrue(window.alive());
+        Assert.assertFalse(window.expired());
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
+        Assert.assertEquals(7,window.size());
+
+        // Flush when stream time delay too much after system time but window will still be alive
+        window.onTick(clock,System.currentTimeMillis() + 1 + stop - start + margin);
+        Assert.assertTrue(window.alive());
+        Assert.assertFalse(window.expired());
+        Assert.assertEquals(0,window.size());
+        Assert.assertEquals(7,collector.size());
+
+        Assert.assertFalse("Because window has flushed but not expired, window should reject future events < last flush stream time",
+                window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:54,000"))));
+        Assert.assertTrue("Because window has flushed but not expired, window should still accept future events >= last flush stream time",
+                window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"))));
+        Assert.assertEquals(1,window.size());
+        Assert.assertEquals(7,collector.size());
+
+        // Now is: "2016-05-04 00:01:10", not expire,
+        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:10,000"));
+        window.onTick(clock,System.currentTimeMillis() + 2 * (1+ stop - start + margin));
+        Assert.assertEquals(8,collector.size());
+
+        // Now is: "2016-05-04 00:01:20", expire
+        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:20,000"));
+        window.onTick(clock,System.currentTimeMillis());
+        Assert.assertFalse(window.alive());
+        Assert.assertTrue(window.expired());
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
+        Assert.assertEquals(0,window.size());
+
+        Assert.assertEquals(8,collector.size());
+
+        Ordering ordering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
+        Assert.assertTrue(ordering.isOrdered(collector.get()));
+
+        List<PartitionedEvent> list = collector.get();
+        Assert.assertEquals(8,list.size());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"),list.get(0).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"),list.get(1).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"),list.get(2).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"),list.get(3).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"),list.get(4).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"),list.get(5).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"),list.get(6).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"),list.get(7).getTimestamp());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
new file mode 100644
index 0000000..deffed5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
@@ -0,0 +1,94 @@
+package org.apache.eagle.alert.engine.sorter;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.TreeMultiset;
+
+/**
+ * Since 5/10/16.
+ */
+public class TreeMultisetComparatorTest {
+    /**
+     * if 2 different events have the same timestamp, and comparator return 0 when timestamp is same,
+     * when they are added into TreeMultiset, the second event will be replaced by the first event
+     */
+    @Test
+    public void testComparator(){
+        TreeMultiset<PartitionedEvent> set = TreeMultiset.create(PartitionedEventTimeOrderingComparator.INSTANCE);
+
+        // construct PartitionEvent1
+        PartitionedEvent event1 = new PartitionedEvent();
+        StreamPartition sp = new StreamPartition();
+        sp.setColumns(Arrays.asList("host"));
+        sp.setSortSpec(null);
+        sp.setStreamId("testStreamId");
+        sp.setType(StreamPartition.Type.GROUPBY);
+        event1.setPartition(sp);
+        event1.setPartitionKey(1000);
+        StreamEvent e1 = new StreamEvent();
+        e1.setData(new Object[]{18.4});
+        e1.setStreamId("testStreamId");
+        e1.setTimestamp(1462909984000L);
+        event1.setEvent(e1);
+
+        // construct PartitionEvent2 with same timestamp but different value
+        PartitionedEvent event2 = new PartitionedEvent();
+        event2.setPartition(sp);
+        event2.setPartitionKey(1000);
+        StreamEvent e2 = new StreamEvent();
+        e2.setData(new Object[]{16.3});
+        e2.setStreamId("testStreamId");
+        e2.setTimestamp(1462909984000L);
+        event2.setEvent(e2);
+
+        // construct PartitionEvent2 with same timestamp but different value
+        PartitionedEvent event3 = new PartitionedEvent();
+        event3.setPartition(sp);
+        event3.setPartitionKey(1000);
+        StreamEvent e3 = new StreamEvent();
+        e3.setData(new Object[]{14.3});
+        e3.setStreamId("testStreamId");
+        e3.setTimestamp(1462909984001L);
+        event3.setEvent(e3);
+
+        PartitionedEvent event4 = new PartitionedEvent();
+        event4.setPartition(sp);
+        event4.setPartitionKey(1000);
+        StreamEvent e4 = new StreamEvent();
+        e4.setData(new Object[]{14.3});
+        e4.setStreamId("testStreamId");
+        e4.setTimestamp(1462909984001L);
+        event4.setEvent(e4);
+
+        Assert.assertNotEquals(event2,event3);
+        Assert.assertEquals(event3,event4);
+
+        // check content in set
+        set.add(event1);
+        set.add(event2);
+        set.add(event3);
+        set.add(event4);
+        Assert.assertEquals(4, set.size());
+        set.forEach(System.out::println);
+
+
+        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1,event2));
+        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1,event3));
+        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event2,event3));
+        Assert.assertEquals(0,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event3,event4));
+
+        Iterator<PartitionedEvent> it = set.iterator();
+        Assert.assertEquals(16.3,it.next().getData()[0]);
+        Assert.assertEquals(18.4,it.next().getData()[0]);
+        Assert.assertEquals(14.3,it.next().getData()[0]);
+        Assert.assertEquals(14.3,it.next().getData()[0]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
new file mode 100644
index 0000000..62427e0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
@@ -0,0 +1,138 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+@SuppressWarnings({"serial", "unused"})
+public class AlertTopologyTest implements Serializable{
+    private static final Logger LOG = LoggerFactory.getLogger(AlertTopologyTest.class);
+    char[] alphabets = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'};
+    @Ignore
+    @Test
+    public void testMultipleTopics() throws Exception {
+        final String topoId = "myTopology";
+        int numGroupbyBolts = 2;
+        int numTotalGroupbyBolts = 3;
+        System.setProperty("eagle.correlation.numGroupbyBolts", String.valueOf(numGroupbyBolts));
+        System.setProperty("eagle.correlation.topologyName", topoId);
+        System.setProperty("eagle.correlation.mode", "local");
+        System.setProperty("eagle.correlation.zkHosts", "localhost:2181");
+        final String topicName1 = "testTopic3";
+        final String topicName2 = "testTopic4";
+        // ensure topic ready
+        LogManager.getLogger(CorrelationSpout.class).setLevel(Level.DEBUG);
+        Config config = ConfigFactory.load();
+
+        CreateTopicUtils.ensureTopicReady(System.getProperty("eagle.correlation.zkHosts"), topicName1);
+        CreateTopicUtils.ensureTopicReady(System.getProperty("eagle.correlation.zkHosts"), topicName2);
+
+        TopologyBuilder topoBuilder = new TopologyBuilder();
+
+        int numBolts = config.getInt("eagle.correlation.numGroupbyBolts");
+        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, numBolts);
+        String spoutId = "correlation-spout";
+        SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout);
+        for (int i = 0; i < numBolts; i++) {
+            TestBolt bolt = new TestBolt();
+            BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt);
+            boltDecl.fieldsGrouping(spoutId, "stream_" + i, new Fields());
+        }
+
+        String topoName = config.getString("eagle.correlation.topologyName");
+        LOG.info("start topology in local mode");
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(topoName, new HashMap<>(), topoBuilder.createTopology());
+
+        while(true) {
+            try {
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Ignore
+    @Test
+    public void generateRandomStringsToKafka(){
+        String topic = "testTopic3";
+        int max = 1000;
+        Properties configMap = new Properties();
+        configMap.put("bootstrap.servers", "sandbox.hortonworks.com:6667");
+        configMap.put("metadata.broker.list", "sandbox.hortonworks.com:6667");
+        configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("request.required.acks", "1");
+        configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+        configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+        KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
+
+        int i = 0;
+        while(i++ < max) {
+            String randomString = generateRandomString();
+            System.out.println("sending string : " + randomString);
+            ProducerRecord record = new ProducerRecord(topic, randomString);
+            producer.send(record);
+            if(i % 10 == 0){
+                try {
+                    Thread.sleep(10);
+                }catch(Exception ex){
+                }
+            }
+        }
+        producer.close();
+    }
+
+    private String generateRandomString(){
+        long count = Math.round(Math.random() * 10);
+        if(count == 0)
+            count = 1;
+        StringBuilder sb = new StringBuilder();
+        while(count-- > 0) {
+            int index = (int)(Math.floor(Math.random()*26));
+            sb.append(alphabets[index]);
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
new file mode 100644
index 0000000..deca9b4
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
@@ -0,0 +1,101 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 4/28/16.
+ */
+@SuppressWarnings({"serial", "unused", "rawtypes"})
+public class CoordinatorSpoutIntegrationTest implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorSpoutIntegrationTest.class);
+    @Ignore  // this test need zookeeper
+    @Test
+    public void testConfigNotify() throws Exception{
+        final String topoId = "myTopology";
+        int numGroupbyBolts = 2;
+        int numTotalGroupbyBolts = 3;
+        System.setProperty("correlation.serviceHost", "sandbox.hortonworks.com");
+        System.setProperty("correlation.servicePort", "58080");
+        System.setProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum", "sandbox.hortonworks.com:2181");
+        System.setProperty("correlation.numGroupbyBolts", String.valueOf(numGroupbyBolts));
+        System.setProperty("correlation.topologyName", topoId);
+        System.setProperty("correlation.mode", "local");
+        System.setProperty("correlation.zkHosts", "sandbox.hortonworks.com:2181");
+        final String topicName1 = "testTopic3";
+        final String topicName2 = "testTopic4";
+        // ensure topic ready
+        LogManager.getLogger(CorrelationSpout.class).setLevel(Level.DEBUG);
+        Config config = ConfigFactory.load();
+
+        CreateTopicUtils.ensureTopicReady(System.getProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum"), topicName1);
+        CreateTopicUtils.ensureTopicReady(System.getProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum"), topicName2);
+
+        TopologyBuilder topoBuilder = new TopologyBuilder();
+
+        int numBolts = config.getInt("correlation.numGroupbyBolts");
+        String spoutId = "correlation-spout";
+        CorrelationSpout spout = new CorrelationSpout(config, topoId,
+                new MockMetadataChangeNotifyService(topoId, spoutId), numBolts);
+        SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout);
+        declarer.setNumTasks(2);
+        for (int i = 0; i < numBolts; i++) {
+            TestBolt bolt = new TestBolt();
+            BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt);
+            boltDecl.fieldsGrouping(spoutId,
+                    StreamIdConversion.generateStreamIdBetween(AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME+i), new Fields());
+        }
+
+        String topoName = config.getString("correlation.topologyName");
+        LOG.info("start topology in local mode");
+        LocalCluster cluster = new LocalCluster();
+        StormTopology topology = topoBuilder.createTopology();
+        cluster.submitTopology(topoName, new HashMap(), topology);
+
+
+        Utils.sleep(Long.MAX_VALUE);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
new file mode 100644
index 0000000..6960537
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
@@ -0,0 +1,184 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import storm.kafka.KafkaSpoutWrapper;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+@Ignore
+public class CorrelationSpoutTest {
+    @SuppressWarnings("unused")
+    private static final String TEST_SPOUT_ID = "spout-id-1";
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CorrelationSpoutTest.class);
+    private String dataSourceName = "ds-name";
+
+    @SuppressWarnings({ "serial", "rawtypes" })
+    @Test
+    public void testMetadataInjestion_emptyMetadata() throws Exception{
+        String topoId = "testMetadataInjection";
+        Config config = ConfigFactory.load();
+        AtomicBoolean validated = new AtomicBoolean(false);
+        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
+            @Override
+            protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context,
+                    SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds)
+                    throws Exception {
+                validated.set(true);
+                return null;
+            }
+        };
+        Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
+        ds.setName("ds-name");
+        ds.setType("KAFKA");
+        ds.setProperties(new HashMap<String, String>());
+        ds.setTopic("name-of-topic1");
+        ds.setSchemeCls("PlainStringScheme");
+        ds.setCodec(new Tuple2StreamMetadata());
+        Map<String,Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
+        dsMap.put(ds.getName(), ds);
+
+        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(ds.getName(), "s1");
+
+        Map<String, List<StreamRepartitionMetadata>> dataSources = new HashMap<String, List<StreamRepartitionMetadata>>();
+        dataSources.put(ds.getName(), Arrays.asList(m1));
+
+        SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap);
+        
+        spout.onReload(newMetadata, null);
+        Assert.assertTrue(validated.get());
+    }
+
+    @SuppressWarnings({ "serial", "rawtypes" })
+    @Test
+    public void testMetadataInjestion_oneNewTopic2Streams() throws Exception {
+        String topoId = "testMetadataInjection";
+        final String topicName = "testTopic";
+
+        Config config = ConfigFactory.load();
+        final AtomicBoolean verified = new AtomicBoolean(false);
+        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1)  {
+            @Override
+            protected KafkaSpoutWrapper createKafkaSpout(Map conf, 
+                    TopologyContext context,
+                    SpoutOutputCollector collector, 
+                    String topic, 
+                    String topic2SchemeClsName,
+                    SpoutSpec streamMetadatas,
+                    Map<String, StreamDefinition> sds) {
+                Assert.assertEquals(1, streamMetadatas.getStreamRepartitionMetadataMap().size());
+                Assert.assertTrue(streamMetadatas.getStream("s1") != null);
+                Assert.assertTrue(streamMetadatas.getStream("s2") != null);
+                Assert.assertEquals(dataSourceName, streamMetadatas.getStream("s1").getTopicName());
+                Assert.assertEquals(dataSourceName, streamMetadatas.getStream("s2").getTopicName());
+                LOG.info("successfully verified new topic and streams");
+                verified.set(true);
+                return null;
+            }
+        };
+
+        Map<String, Kafka2TupleMetadata> dsMap = createDatasource(topicName, dataSourceName);
+
+        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(dataSourceName, "s1");
+        StreamRepartitionMetadata m2 = new StreamRepartitionMetadata(dataSourceName, "s2");
+        Map<String, List<StreamRepartitionMetadata>> dataSources = new HashMap<String, List<StreamRepartitionMetadata>>();
+        dataSources.put(dataSourceName, Arrays.asList(m1, m2));
+
+        SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap);
+        spout.onReload(newMetadata, null);
+        Assert.assertTrue(verified.get());
+    }
+
+    private Map<String, Kafka2TupleMetadata> createDatasource(final String topicName, final String dataSourceName) {
+        Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
+        
+        ds.setName(dataSourceName);
+        ds.setType("KAFKA");
+        ds.setProperties(new HashMap<String, String>());
+        ds.setTopic(topicName);
+        ds.setSchemeCls("PlainStringScheme");
+        ds.setCodec(new Tuple2StreamMetadata());
+        Map<String, Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
+        dsMap.put(ds.getName(), ds);
+        return dsMap;
+    }
+
+    @SuppressWarnings({ "serial", "rawtypes" })
+    @Test
+    public void testMetadataInjestion_deleteOneTopic() throws Exception{
+        String topoId = "testMetadataInjection";
+        final String topicName = "testTopic";
+        Config config = ConfigFactory.load();
+        final AtomicBoolean verified = new AtomicBoolean(false);
+        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
+            @Override
+            protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
+                                                         String schemeClsName, SpoutSpec streamMetadatas,
+                                                         Map<String, StreamDefinition> sds){
+                return new KafkaSpoutWrapper(null, null);
+            }
+            @Override
+            protected void removeKafkaSpout(KafkaSpoutWrapper wrapper){
+                LOG.info("successfully verified removed topic and streams");
+                verified.set(true);
+            }
+        };
+
+        Map<String, Kafka2TupleMetadata> dsMap = createDatasource(topicName, dataSourceName);
+
+        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(dataSourceName, "s1");
+
+        Map<String, List<StreamRepartitionMetadata>> streamMetadatas = new HashMap<String, List<StreamRepartitionMetadata>>();
+        streamMetadatas.put(dataSourceName, Arrays.asList(m1));
+
+        SpoutSpec cachedMetadata = new SpoutSpec(topoId, streamMetadatas, null, dsMap);
+        // add new topic
+        spout.onReload(cachedMetadata, null);
+        // delete new topic
+        try {
+            spout.onReload(new SpoutSpec(topoId, new HashMap<String, List<StreamRepartitionMetadata>>(), new HashMap<>(), new HashMap<String, Kafka2TupleMetadata>()),
+                    null);
+        }catch(Exception ex){
+            LOG.error("error reloading spout metadata", ex);
+            throw ex;
+        }
+        Assert.assertTrue(verified.get());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
new file mode 100644
index 0000000..4779fac
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.ExecutorSummary;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.SpoutStats;
+import backtype.storm.generated.TopologyInfo;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.metric.LoggingMetricsConsumer;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+/**
+ * WordCount but the spout does not stop, and the bolts are implemented in
+ * java.  This can show how fast the word count can run.
+ */
+@SuppressWarnings("serial")
+public class FastWordCountTopology {
+    public static class FastRandomSentenceSpout extends BaseRichSpout {
+        private final static Logger LOG = LoggerFactory.getLogger(FastRandomSentenceSpout.class);
+        SpoutOutputCollector _collector;
+        Random _rand;
+        private static final String[] CHOICES = {
+                "marry had a little lamb whos fleese was white as snow",
+                "and every where that marry went the lamb was sure to go",
+                "one two three four five six seven eight nine ten",
+                "this is a test of the emergency broadcast system this is only a test",
+                "peter piper picked a peck of pickeled peppers"
+        };
+
+        @SuppressWarnings("rawtypes")
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            _collector = collector;
+            _rand = ThreadLocalRandom.current();
+        }
+
+        @Override
+        public void nextTuple() {
+            String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
+            _collector.emit(new Values(sentence), sentence);
+            LOG.debug("Emit tuple: {}, id:{}",new Values(sentence),sentence);
+        }
+
+        @Override
+        public void ack(Object id) {
+            //Ignored
+        }
+
+        @Override
+        public void fail(Object id) {
+            _collector.emit(new Values(id), id);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("sentence"));
+        }
+    }
+
+    public static class SplitSentence extends BaseBasicBolt {
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word: sentence.split("\\s+")) {
+                collector.emit(new Values(word, 1));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null)
+                count = 0;
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    public static void printMetrics(Nimbus.Client client, String name) throws Exception {
+        ClusterSummary summary = client.getClusterInfo();
+        String id = null;
+        for (TopologySummary ts: summary.get_topologies()) {
+            if (name.equals(ts.get_name())) {
+                id = ts.get_id();
+            }
+        }
+        if (id == null) {
+            throw new Exception("Could not find a topology named "+name);
+        }
+        TopologyInfo info = client.getTopologyInfo(id);
+        int uptime = info.get_uptime_secs();
+        long acked = 0;
+        long failed = 0;
+        double weightedAvgTotal = 0.0;
+        for (ExecutorSummary exec: info.get_executors()) {
+            if ("spout".equals(exec.get_component_id())) {
+                SpoutStats stats = exec.get_stats().get_specific().get_spout();
+                Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+                Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+                Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
+                for (String key: ackedMap.keySet()) {
+                    if (failedMap != null) {
+                        Long tmp = failedMap.get(key);
+                        if (tmp != null) {
+                            failed += tmp;
+                        }
+                    }
+                    long ackVal = ackedMap.get(key);
+                    double latVal = avgLatMap.get(key) * ackVal;
+                    acked += ackVal;
+                    weightedAvgTotal += latVal;
+                }
+            }
+        }
+        double avgLatency = weightedAvgTotal/acked;
+        System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
+    }
+
+    public static void kill(Nimbus.Client client, String name) throws Exception {
+        KillOptions opts = new KillOptions();
+        opts.set_wait_secs(0);
+        client.killTopologyWithOpts(name, opts);
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new FastRandomSentenceSpout(), 4);
+
+        builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word"));
+
+        Config conf = new Config();
+        conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+
+        String name = "wc-test";
+        if (args != null && args.length > 0) {
+            name = args[0];
+        }
+
+//        conf.setNumWorkers(1);
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology(name, conf, builder.createTopology());
+
+        Utils.sleep(Long.MAX_VALUE);
+//
+//        Map clusterConf = Utils.readStormConfig();
+//        clusterConf.putAll(Utils.readCommandLineOpts());
+//        Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+//
+//        //Sleep for 5 mins
+//        for (int i = 0; i < 10; i++) {
+//            Thread.sleep(30 * 1000);
+//            printMetrics(client, name);
+//        }
+//        kill(client, name);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
new file mode 100644
index 0000000..a8fb6d0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
@@ -0,0 +1,111 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.coordination.model.PublishSpec;
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * Since 5/4/16.
+ */
+@SuppressWarnings({"serial"})
+public class MockMetadataChangeNotifyService extends AbstractMetadataChangeNotifyService implements Runnable {
+    private final static Logger LOG = LoggerFactory.getLogger(MockMetadataChangeNotifyService.class);
+    @SuppressWarnings("unused")
+    private static final String[] topics = new String[]{"testTopic3", "testTopic4", "testTopic5"};
+    @SuppressWarnings("unused")
+    private String topologyName;
+    @SuppressWarnings("unused")
+    private String spoutId;
+    private Map<String, StreamDefinition> sds;
+
+    public MockMetadataChangeNotifyService(String topologyName, String spoutId){
+        this.topologyName = topologyName;
+        this.spoutId = spoutId;
+    }
+
+    @Override
+    public void init(Config config, MetadataType type) {
+        super.init(config, type);
+        this.sds = defineStreamDefinitions();
+        new Thread(this).start();
+    }
+
+    @Override
+    public void run() {
+        switch (type) {
+            case SPOUT:
+                notifySpout(Arrays.asList("testTopic3", "testTopic4"), Arrays.asList("testTopic5"));
+                break;
+            case STREAM_ROUTER_BOLT:
+                populateRouterMetadata();
+                break;
+            case ALERT_BOLT:
+                populateAlertBoltSpec();
+                break;
+            case ALERT_PUBLISH_BOLT:
+                notifyAlertPublishBolt();
+                break;
+            default:
+                LOG.error("that is not possible man!");
+        }
+    }
+
+    private Map<String, StreamDefinition> defineStreamDefinitions(){
+        Map<String, StreamDefinition> sds = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamDefinitionsSpec.json"),
+                new TypeReference<Map<String, StreamDefinition>>() {});
+        return sds;
+    }
+
+    private void notifySpout(List<String> plainStringTopics, List<String> jsonStringTopics){
+        SpoutSpec newSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testSpoutSpec.json"), SpoutSpec.class);
+        notifySpout(newSpec, sds);
+    }
+
+    private void populateRouterMetadata(){
+        RouterSpec boltSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamRouterBoltSpec.json"), RouterSpec.class);
+        notifyStreamRouterBolt(boltSpec, sds);
+    }
+
+    private void populateAlertBoltSpec(){
+        AlertBoltSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testAlertBoltSpec.json"), AlertBoltSpec.class);
+        notifyAlertBolt(spec, sds);
+    }
+
+    private void notifyAlertPublishBolt(){
+        PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
+        notifyAlertPublishBolt(spec, sds);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
new file mode 100644
index 0000000..fa2f0a6
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
@@ -0,0 +1,103 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.topology;
+
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Created on 3/12/16.
+ */
+@SuppressWarnings({"serial", "unchecked", "rawtypes", "resource"})
+public class SendData2KafkaTest implements Serializable{
+    /**
+     *
+     * {"timestamp": 10000, "metric": "esErrorLogEvent", "instanceUuid": "vm-InstanceId1", "host":"test-host1", "type":"nova", "stack":"NullPointException-..........."}
+     {"timestamp": 10000, "metric": "instanceFailureLogEvent", "instanceUuid": "vm-InstanceId1", "message":"instance boot failure for user liasu!"}
+     *
+     */
+    @Test
+    @Ignore
+    public void testOutput() throws Exception {
+        String s1 = "{\"timestamp\": %d, \"metric\": \"esErrorLogEvent\", \"instanceUuid\": \"vm-InstanceId%d\", \"host\":\"test-host1\", \"type\":\"nova\", \"stack\":\"NullPointException-...........\"}";
+        String s2 = "{\"timestamp\": %d, \"metric\": \"instanceFailureLogEvent\", \"instanceUuid\": \"vm-InstanceId%d\", \"message\":\"instance boot failure for user liasu!\"}";
+
+        PrintWriter espw = new PrintWriter(new FileWriter("src/test/resources/es.log"));
+        PrintWriter ifpw = new PrintWriter(new FileWriter("src/test/resources/if.log"));
+
+        long base = System.currentTimeMillis();
+        long timestamp = 10000;
+
+        for (int i = 0; i < 10; i++) {
+
+            timestamp = base + i * 10000;
+            for (int j = 0; j < 10; j++) {
+                timestamp = timestamp + j * 1000;
+
+                espw.println(String.format(s1, timestamp, i));
+            }
+
+            ifpw.println(String.format(s2, timestamp, i));
+        }
+
+        espw.flush();
+        ifpw.flush();
+    }
+
+    @Test
+    @Ignore
+    public void sendKakfa() throws Exception {
+        List<String> eslogs = Files.readAllLines(Paths.get(SendData2KafkaTest.class.getResource("/es.log").toURI()), Charset.defaultCharset());
+        List<String> iflogs = Files.readAllLines(Paths.get(SendData2KafkaTest.class.getResource("/if.log").toURI()), Charset.defaultCharset());
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("value.serializer", StringSerializer.class.getCanonicalName());
+        props.put("key.serializer", StringSerializer.class.getCanonicalName());
+
+        KafkaProducer producer = new KafkaProducer<>(props);
+        while (true) {
+            for (String s : eslogs) {
+                ProducerRecord<String, String> record = new ProducerRecord<>("nn_jmx_metric_sandbox", s);
+                producer.send(record);
+            }
+
+            for (String s : iflogs) {
+                ProducerRecord<String, String> record = new ProducerRecord<>("nn_jmx_metric_sandbox", s);
+                producer.send(record);
+            }
+
+            Thread.sleep(5000);
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
new file mode 100644
index 0000000..493d30e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
@@ -0,0 +1,61 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Created by yonzhang on 4/7/16.
+ */
+@SuppressWarnings({"rawtypes", "serial"})
+public class TestBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class);
+    private OutputCollector collector;
+    private long count;
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        LOG.info("data is coming: " + input);
+        count++;
+        if(count % 10 == 0){
+            LOG.info("count = " + count);
+        }
+        collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
new file mode 100644
index 0000000..666d167
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
@@ -0,0 +1,53 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Storm 1.0.0 uses ByteBuffer, we need test it
+ */
+public class TestByteBuffer {
+    @Test
+    public void testBB(){
+        ByteBuffer bb = ByteBuffer.allocate(100);
+        bb.put((byte)12);
+        Assert.assertTrue(bb.hasArray());
+        bb.rewind();
+        Assert.assertEquals(12, bb.get());
+    }
+
+    @Test
+    public void testMultipleStrings() throws Exception{
+        ByteBuffer bb = ByteBuffer.allocate(100);
+        bb.put("abc".getBytes("UTF-8"));
+        bb.put("xyz".getBytes("UTF-8"));
+        bb.put("1234".getBytes("UTF-8"));
+        bb.rewind();
+        if (bb.hasArray()) {
+            int base = bb.arrayOffset();
+            String ret = new String(bb.array(), base + bb.position(), bb.remaining());
+            System.out.println("string is: " + ret);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestMetadataSpecSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestMetadataSpecSerDeser.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestMetadataSpecSerDeser.java
new file mode 100644
index 0000000..65b16d9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestMetadataSpecSerDeser.java
@@ -0,0 +1,266 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
+import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
+import org.codehaus.jackson.type.TypeReference;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Since 5/6/16.
+ */
+public class TestMetadataSpecSerDeser {
+    private String getStreamNameByTopic(String topic){
+        return topic + "Stream";
+    }
+
+    @Test
+    public void testStreamDefinitions(){
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        List<String> topics = Arrays.asList("testTopic3", "testTopic4", "testTopic5");
+        for(String topic : topics) {
+            String streamId = getStreamNameByTopic(topic);
+            if (topic.equals("testTopic3") || topic.equals("testTopic4")) {
+                StreamDefinition schema = new StreamDefinition();
+                schema.setStreamId(streamId);
+                StreamColumn column = new StreamColumn();
+                column.setName("value");
+                column.setType(StreamColumn.Type.STRING);
+                schema.setColumns(Collections.singletonList(column));
+                sds.put(schema.getStreamId(), schema);
+            }else if(topic.equals("testTopic5")){
+                StreamDefinition schema = new StreamDefinition();
+                schema.setStreamId(streamId);
+                StreamColumn column = new StreamColumn();
+                column.setName("value");
+                column.setType(StreamColumn.Type.STRING);
+                schema.setColumns(Collections.singletonList(column));
+                sds.put(schema.getStreamId(), schema);
+            }
+        }
+
+        String json = MetadataSerDeser.serialize(sds);
+        System.out.println(json);
+
+        Map<String, StreamDefinition> deserializedSpec = MetadataSerDeser.deserialize(json, new TypeReference<Map<String, StreamDefinition>>(){});
+        Assert.assertEquals(3, deserializedSpec.size());
+    }
+
+    @SuppressWarnings("unused")
+    @Test
+    public void testSpoutSpec(){
+        String topologyName = "testTopology";
+        String spoutId = "alertEngineSpout";
+        List<String> plainStringTopics = Arrays.asList("testTopic3", "testTopic4");
+        List<String> jsonStringTopics = Arrays.asList("testTopic5");
+        Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<>();
+        for(String topic : plainStringTopics) {
+            Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
+            kafka2TupleMetadata.setName(topic);
+            kafka2TupleMetadata.setTopic(topic);
+            kafka2TupleMetadata.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme");
+            kafka2TupleMetadataMap.put(topic, kafka2TupleMetadata);
+        }
+        for(String topic : jsonStringTopics){
+            Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
+            kafka2TupleMetadata.setName(topic);
+            kafka2TupleMetadata.setTopic(topic);
+            kafka2TupleMetadata.setSchemeCls("org.apache.eagle.alert.engine.scheme.JsonScheme");
+            kafka2TupleMetadataMap.put(topic, kafka2TupleMetadata);
+        }
+
+        // construct Tuple2StreamMetadata
+        Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap = new HashMap<>();
+        for(String topic : plainStringTopics) {
+            String streamId = getStreamNameByTopic(topic);
+            Tuple2StreamMetadata tuple2StreamMetadata = new Tuple2StreamMetadata();
+            Set<String> activeStreamNames = new HashSet<>();
+            activeStreamNames.add(streamId);
+            tuple2StreamMetadata.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+            tuple2StreamMetadata.setStreamNameSelectorProp(new Properties());
+            tuple2StreamMetadata.getStreamNameSelectorProp().put("userProvidedStreamName", streamId);
+            tuple2StreamMetadata.setActiveStreamNames(activeStreamNames);
+            tuple2StreamMetadata.setTimestampColumn("timestamp");
+            tuple2StreamMetadataMap.put(topic, tuple2StreamMetadata);
+        }
+
+        for(String topic : jsonStringTopics) {
+            String streamId = getStreamNameByTopic(topic);
+            Tuple2StreamMetadata tuple2StreamMetadata = new Tuple2StreamMetadata();
+            Set<String> activeStreamNames = new HashSet<>();
+            activeStreamNames.add(streamId);
+            tuple2StreamMetadata.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector");
+            tuple2StreamMetadata.setStreamNameSelectorProp(new Properties());
+            tuple2StreamMetadata.getStreamNameSelectorProp().put("userProvidedStreamName", streamId);
+            tuple2StreamMetadata.setActiveStreamNames(activeStreamNames);
+            tuple2StreamMetadata.setTimestampColumn("timestamp");
+            tuple2StreamMetadataMap.put(topic, tuple2StreamMetadata);
+        }
+
+        // construct StreamRepartitionMetadata
+        Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap = new HashMap<>();
+        for(String topic : plainStringTopics) {
+            String streamId = getStreamNameByTopic(topic);
+            StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata(topic, "defaultStringStream");
+            StreamRepartitionStrategy gs = new StreamRepartitionStrategy();
+            // StreamPartition, groupby col1 for stream cpuUsageStream
+            StreamPartition sp = new StreamPartition();
+            sp.setStreamId(streamId);
+            sp.setColumns(Arrays.asList("value"));
+            sp.setType(StreamPartition.Type.GROUPBY);
+            StreamSortSpec sortSpec = new StreamSortSpec();
+            sortSpec.setWindowMargin(1000);
+            sortSpec.setWindowPeriod2(Period.seconds(10));
+            sp.setSortSpec(sortSpec);
+
+            gs.partition = sp;
+            gs.numTotalParticipatingRouterBolts = 1;
+            gs.startSequence = 0;
+            streamRepartitionMetadata.addGroupStrategy(gs);
+            streamRepartitionMetadataMap.put(topic, Arrays.asList(streamRepartitionMetadata));
+        }
+
+        for(String topic : jsonStringTopics) {
+            String streamId = getStreamNameByTopic(topic);
+            StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata(topic, "defaultStringStream");
+            StreamRepartitionStrategy gs = new StreamRepartitionStrategy();
+            // StreamPartition, groupby col1 for stream cpuUsageStream
+            StreamPartition sp = new StreamPartition();
+            sp.setStreamId(streamId);
+            sp.setColumns(Arrays.asList("value"));
+            sp.setType(StreamPartition.Type.GROUPBY);
+            StreamSortSpec sortSpec = new StreamSortSpec();
+            sortSpec.setWindowMargin(1000);
+            sortSpec.setWindowPeriod2(Period.seconds(10));
+            sp.setSortSpec(sortSpec);
+
+            gs.partition = sp;
+            gs.numTotalParticipatingRouterBolts = 1;
+            gs.startSequence = 0;
+            streamRepartitionMetadata.addGroupStrategy(gs);
+            streamRepartitionMetadataMap.put(topic, Arrays.asList(streamRepartitionMetadata));
+        }
+
+        SpoutSpec newSpec = new SpoutSpec(topologyName, streamRepartitionMetadataMap, tuple2StreamMetadataMap, kafka2TupleMetadataMap);
+        String json = MetadataSerDeser.serialize(newSpec);
+        System.out.println(json);
+
+        SpoutSpec deserializedSpec = MetadataSerDeser.deserialize(json, SpoutSpec.class);
+        Assert.assertNotNull(deserializedSpec);
+//        Assert.assertEquals(spoutId, deserializedSpec.getSpoutId());
+    }
+
+    @Test
+    public void testRouterBoltSpec(){
+        List<String> topics = Arrays.asList("testTopic3", "testTopic4", "testTopic5");
+        RouterSpec boltSpec = new RouterSpec();
+        for(String topic : topics) {
+            String streamId = getStreamNameByTopic(topic);
+            // StreamPartition, groupby col1 for stream cpuUsageStream
+            StreamPartition sp = new StreamPartition();
+            sp.setStreamId(streamId);
+            sp.setColumns(Arrays.asList("value"));
+            sp.setType(StreamPartition.Type.GROUPBY);
+
+            StreamSortSpec sortSpec = new StreamSortSpec();
+            sortSpec.setWindowMargin(1000);
+            sortSpec.setWindowPeriod2(Period.seconds(10));
+            sp.setSortSpec(sortSpec);
+
+            // set StreamRouterSpec to have 2 WorkSlot
+            StreamRouterSpec routerSpec = new StreamRouterSpec();
+            routerSpec.setPartition(sp);
+            routerSpec.setStreamId(streamId);
+            PolicyWorkerQueue queue = new PolicyWorkerQueue();
+            queue.setPartition(sp);
+            queue.setWorkers(Arrays.asList(new WorkSlot("testTopology", "alertBolt0"), new WorkSlot("testTopology", "alertBolt1")));
+            routerSpec.setTargetQueue(Arrays.asList(queue));
+            boltSpec.addRouterSpec(routerSpec);
+        }
+
+        String json = MetadataSerDeser.serialize(boltSpec);
+        System.out.println(json);
+        RouterSpec deserializedSpec = MetadataSerDeser.deserialize(json, RouterSpec.class);
+        Assert.assertEquals(3, deserializedSpec.getRouterSpecs().size());
+    }
+
+    @Test
+    public void testAlertBoltSpec(){
+        String topologyName = "testTopology";
+        AlertBoltSpec spec = new AlertBoltSpec();
+        List<String> topics = Arrays.asList("testTopic3", "testTopic4", "testTopic5");
+        for(String topic : topics) {
+            String streamId = getStreamNameByTopic(topic);
+
+            // construct StreamPartition
+            StreamPartition sp = new StreamPartition();
+            sp.setColumns(Collections.singletonList("value"));
+            sp.setStreamId(streamId);
+            sp.setType(StreamPartition.Type.GROUPBY);
+            StreamSortSpec sortSpec = new StreamSortSpec();
+            sortSpec.setWindowMargin(1000);
+            sortSpec.setWindowPeriod2(Period.seconds(10));
+            sp.setSortSpec(sortSpec);
+
+            spec.setVersion("version1");
+            spec.setTopologyName(topologyName);
+            PolicyDefinition pd = new PolicyDefinition();
+            pd.setName("policy1");
+            pd.setPartitionSpec(Collections.singletonList(sp));
+            pd.setOutputStreams(Collections.singletonList("testAlertStream"));
+            pd.setInputStreams(Collections.singletonList(streamId));
+            pd.setDefinition(new PolicyDefinition.Definition());
+            pd.getDefinition().type = PolicyStreamHandlers.SIDDHI_ENGINE;
+            pd.getDefinition().value = String.format("from %s[value=='xyz'] select value insert into testAlertStream;", streamId);
+            spec.addBoltPolicy("alertBolt0", pd.getName());
+        }
+        String json = MetadataSerDeser.serialize(spec);
+        System.out.println(json);
+        AlertBoltSpec deserializedSpec = MetadataSerDeser.deserialize(json, AlertBoltSpec.class);
+        Assert.assertEquals(topologyName, deserializedSpec.getTopologyName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormCustomGroupingRouting.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormCustomGroupingRouting.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormCustomGroupingRouting.java
new file mode 100644
index 0000000..0466ed3
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormCustomGroupingRouting.java
@@ -0,0 +1,144 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.task.WorkerTopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Since 4/29/16.
+ */
+@SuppressWarnings({ "rawtypes", "serial" })
+public class TestStormCustomGroupingRouting implements Serializable {
+    @Ignore
+    @Test
+    public void testRoutingByCustomGrouping() throws Exception{
+        Config conf = new Config();
+        conf.setNumWorkers(2); // use two worker processes
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout("blue-spout", new BlueSpout()); // parallelism hint
+
+        topologyBuilder.setBolt("green-bolt-1", new GreenBolt(0)).setNumTasks(2)
+                .customGrouping("blue-spout", new CustomStreamGrouping(){
+                    int count = 0;
+                    List<Integer> targetTask;
+                    @Override
+                    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+                        this.targetTask = targetTasks;
+                    }
+
+                    @Override
+                    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+                        if(count % 2 == 0) {
+                            count++;
+                            return Arrays.asList(targetTask.get(0));
+                        }else{
+                            count++;
+                            return Arrays.asList(targetTask.get(1));
+                        }
+                    }
+                });
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
+
+        while(true) {
+            try {
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private static class BlueSpout extends BaseRichSpout {
+        int count = 0;
+        private SpoutOutputCollector collector;
+        public BlueSpout(){
+        }
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("a"));
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            if(count % 2 == 0) {
+                this.collector.emit(Arrays.asList("testdata" + count));
+                count++;
+            }else{
+                this.collector.emit(Arrays.asList("testdata" + count));
+                count++;
+            }
+            try{
+                Thread.sleep(10000);
+            }catch(Exception ex){
+
+            }
+        }
+    }
+
+    private static class GreenBolt extends BaseRichBolt {
+        private int id;
+        public GreenBolt(int id){
+            this.id = id;
+        }
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            System.out.println("bolt " + id + " received data " + input.getString(0));
+            System.out.flush();
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("a"));
+        }
+    }
+}



Mime
View raw message