eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [13/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:21 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
deleted file mode 100644
index d6d4f22..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
-@Ignore("Ignore automatic heavy benchmark test")
-public class StreamWindowBenchmarkTest {
-    private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowBenchmarkTest.class);
-
-    public void sendDESCOrderedEventsToWindow(StreamWindow window, StreamWindowRepository.StorageType storageType, int num) {
-        LOGGER.info("Sending {} events to {} ({})", num, window.getClass().getSimpleName(), storageType);
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        int i = 0;
-        while (i < num) {
-            PartitionedEvent event = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream_1", (window.startTime() + i));
-            window.add(event);
-            i++;
-        }
-        stopWatch.stop();
-        performanceReport.put(num + "\tInsertTime\t" + storageType, stopWatch.getTime());
-        LOGGER.info("Inserted {} events in {} ms", num, stopWatch.getTime());
-        stopWatch.reset();
-        stopWatch.start();
-        window.flush();
-        stopWatch.stop();
-        performanceReport.put(num + "\tReadTime\t" + storageType, stopWatch.getTime());
-    }
-
-    private ScheduledReporter metricReporter;
-    private Map<String, Long> performanceReport;
-
-    @Before
-    public void setUp() {
-        final MetricRegistry metrics = new MetricRegistry();
-        metrics.registerAll(new MemoryUsageGaugeSet());
-        metrics.registerAll(new GarbageCollectorMetricSet());
-        metricReporter = ConsoleReporter.forRegistry(metrics)
-            .filter((name, metric) -> name.matches("(.*heap|total).(usage|used)"))
-//                .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
-            .convertRatesTo(TimeUnit.SECONDS)
-            .convertDurationsTo(TimeUnit.MILLISECONDS)
-            .build();
-        metricReporter.start(60, TimeUnit.SECONDS);
-        performanceReport = new TreeMap<>();
-    }
-
-    @After
-    public void after() {
-        StringBuilder sb = new StringBuilder();
-        for (Map.Entry<String, Long> entry : performanceReport.entrySet()) {
-            sb.append(String.format("%-40s\t%s\n", entry.getKey(), entry.getValue()));
-        }
-        LOGGER.info("\n===== Benchmark Result Report =====\n\n{}", sb.toString());
-    }
-
-    private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
-    private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-05 00:00:00,000");
-    private final long margin = (stop - start) / 3;
-
-    private void benchmarkTest(StreamWindow window, StreamWindowRepository.StorageType storageType) {
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        LOGGER.info("\n===== Benchmark Test for {} ({}) =====", window.getClass().getSimpleName(), storageType);
-        metricReporter.report();
-        sendDESCOrderedEventsToWindow(window, storageType, 1000);
-        metricReporter.report();
-        sendDESCOrderedEventsToWindow(window, storageType, 10000);
-        metricReporter.report();
-        sendDESCOrderedEventsToWindow(window, storageType, 100000);
-        metricReporter.report();
-        sendDESCOrderedEventsToWindow(window, storageType, 1000000);
-        metricReporter.report();
-        stopWatch.stop();
-        LOGGER.info("\n===== Finished in total {} ms =====\n", stopWatch.getTime());
-    }
-
-    @Test
-    @Ignore
-    public void testStreamWindowBenchmarkMain() {
-        testStreamSortedWindowOnHeap();
-        testStreamSortedWindowInSerializedMemory();
-        testStreamSortedWindowOffHeap();
-        testStreamSortedWindowFile();
-    }
-
-    @Test
-    @Ignore
-    public void testStreamSortedWindowOnHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.ONHEAP);
-        benchmarkTest(window, StreamWindowRepository.StorageType.ONHEAP);
-        window.close();
-    }
-
-    @Test
-    @Ignore
-    public void testStreamSortedWindowInSerializedMemory() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.MEMORY);
-        benchmarkTest(window, StreamWindowRepository.StorageType.MEMORY);
-        window.close();
-    }
-
-    @Test
-    @Ignore
-    public void testStreamSortedWindowOffHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
-        benchmarkTest(window, StreamWindowRepository.StorageType.DIRECT_MEMORY);
-        window.close();
-    }
-
-    @Test
-    @Ignore
-    public void testStreamSortedWindowFile() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.FILE_RAF);
-        benchmarkTest(window, StreamWindowRepository.StorageType.FILE_RAF);
-        window.close();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
deleted file mode 100644
index 57e577c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import com.google.common.collect.Ordering;
-import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
-import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-@SuppressWarnings("unused")
-public class StreamWindowTestSuite {
-    private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowTestSuite.class);
-
-    private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
-    private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000");
-    private final long margin = (stop - start) / 3;
-
-    @Test
-    public void testStreamSortedWindowOnHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.ONHEAP);
-        streamSortedWindowMustTest(window);
-    }
-
-    @Test
-    public void testStreamSortedWindowInSerializedMemory() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.MEMORY);
-        streamSortedWindowMustTest(window);
-    }
-
-    @Test
-    public void testStreamSortedWindowOffHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
-        streamSortedWindowMustTest(window);
-    }
-
-    @Test
-    public void testStreamSortedWindowFile() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.FILE_RAF);
-        streamSortedWindowMustTest(window);
-    }
-
-    @SuppressWarnings( {"unchecked", "rawtypes"})
-    private void streamSortedWindowMustTest(StreamWindow window) {
-        MockPartitionedCollector collector = new MockPartitionedCollector();
-        window.register(collector);
-
-        StreamTimeClock clock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"));
-
-        // Current time is: "2016-05-04 00:00:30"
-        window.onTick(clock, System.currentTimeMillis());
-
-        Assert.assertTrue(window.alive());
-        Assert.assertFalse(window.expired());
-
-        // Accepted
-        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000")));
-        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000")));
-        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000")));
-        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000")));
-        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000")));
-
-
-        // Rejected
-        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000")));
-        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000")));
-        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000")));
-
-        // Accepted
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
-
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"))));
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"))));
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"))));
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"))));
-
-        // Should accept Duplicated
-        Assert.assertTrue("Should support duplicated timestamp", window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
-
-        Assert.assertEquals(6, window.size());
-
-        // Rejected
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000"))));
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000"))));
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000"))));
-
-        Assert.assertEquals(6, window.size());
-
-        // Now is: "2016-05-04 00:00:55"
-        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:55,000"));
-        window.onTick(clock, System.currentTimeMillis());
-        Assert.assertTrue(window.alive());
-        Assert.assertFalse(window.expired());
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
-        Assert.assertEquals(7, window.size());
-
-        // Flush when stream time delay too much after system time but window will still be alive
-        window.onTick(clock, System.currentTimeMillis() + 1 + stop - start + margin);
-        Assert.assertTrue(window.alive());
-        Assert.assertFalse(window.expired());
-        Assert.assertEquals(0, window.size());
-        Assert.assertEquals(7, collector.size());
-
-        Assert.assertFalse("Because window has flushed but not expired, window should reject future events < last flush stream time",
-            window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:54,000"))));
-        Assert.assertTrue("Because window has flushed but not expired, window should still accept future events >= last flush stream time",
-            window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"))));
-        Assert.assertEquals(1, window.size());
-        Assert.assertEquals(7, collector.size());
-
-        // Now is: "2016-05-04 00:01:10", not expire,
-        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:10,000"));
-        window.onTick(clock, System.currentTimeMillis() + 2 * (1 + stop - start + margin));
-        Assert.assertEquals(8, collector.size());
-
-        // Now is: "2016-05-04 00:01:20", expire
-        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:20,000"));
-        window.onTick(clock, System.currentTimeMillis());
-        Assert.assertFalse(window.alive());
-        Assert.assertTrue(window.expired());
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
-        Assert.assertEquals(0, window.size());
-
-        Assert.assertEquals(8, collector.size());
-
-        Ordering ordering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        Assert.assertTrue(ordering.isOrdered(collector.get()));
-
-        List<PartitionedEvent> list = collector.get();
-        Assert.assertEquals(8, list.size());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"), list.get(0).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"), list.get(1).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"), list.get(2).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"), list.get(3).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"), list.get(4).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"), list.get(5).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"), list.get(6).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"), list.get(7).getTimestamp());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
deleted file mode 100644
index c59f0de..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import com.google.common.collect.TreeMultiset;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-/**
- * Since 5/10/16.
- */
-public class TreeMultisetComparatorTest {
-    /**
-     * if 2 different events have the same timestamp, and comparator return 0 when timestamp is same,
-     * when they are added into TreeMultiset, the second event will be replaced by the first event
-     */
-    @Test
-    public void testComparator() {
-        TreeMultiset<PartitionedEvent> set = TreeMultiset.create(PartitionedEventTimeOrderingComparator.INSTANCE);
-
-        // construct PartitionEvent1
-        PartitionedEvent event1 = new PartitionedEvent();
-        StreamPartition sp = new StreamPartition();
-        sp.setColumns(Arrays.asList("host"));
-        sp.setSortSpec(null);
-        sp.setStreamId("testStreamId");
-        sp.setType(StreamPartition.Type.GROUPBY);
-        event1.setPartition(sp);
-        event1.setPartitionKey(1000);
-        StreamEvent e1 = new StreamEvent();
-        e1.setData(new Object[] {18.4});
-        e1.setStreamId("testStreamId");
-        e1.setTimestamp(1462909984000L);
-        event1.setEvent(e1);
-
-        // construct PartitionEvent2 with same timestamp but different value
-        PartitionedEvent event2 = new PartitionedEvent();
-        event2.setPartition(sp);
-        event2.setPartitionKey(1000);
-        StreamEvent e2 = new StreamEvent();
-        e2.setData(new Object[] {16.3});
-        e2.setStreamId("testStreamId");
-        e2.setTimestamp(1462909984000L);
-        event2.setEvent(e2);
-
-        // construct PartitionEvent2 with same timestamp but different value
-        PartitionedEvent event3 = new PartitionedEvent();
-        event3.setPartition(sp);
-        event3.setPartitionKey(1000);
-        StreamEvent e3 = new StreamEvent();
-        e3.setData(new Object[] {14.3});
-        e3.setStreamId("testStreamId");
-        e3.setTimestamp(1462909984001L);
-        event3.setEvent(e3);
-
-        PartitionedEvent event4 = new PartitionedEvent();
-        event4.setPartition(sp);
-        event4.setPartitionKey(1000);
-        StreamEvent e4 = new StreamEvent();
-        e4.setData(new Object[] {14.3});
-        e4.setStreamId("testStreamId");
-        e4.setTimestamp(1462909984001L);
-        event4.setEvent(e4);
-
-        Assert.assertNotEquals(event2, event3);
-        Assert.assertEquals(event3, event4);
-
-        // check content in set
-        set.add(event1);
-        set.add(event2);
-        set.add(event3);
-        set.add(event4);
-        Assert.assertEquals(4, set.size());
-        set.forEach(System.out::println);
-
-
-        Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1, event2));
-        Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1, event3));
-        Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event2, event3));
-        Assert.assertEquals(0, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event3, event4));
-
-        Iterator<PartitionedEvent> it = set.iterator();
-        Assert.assertEquals(16.3, it.next().getData()[0]);
-        Assert.assertEquals(18.4, it.next().getData()[0]);
-        Assert.assertEquals(14.3, it.next().getData()[0]);
-        Assert.assertEquals(14.3, it.next().getData()[0]);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
deleted file mode 100644
index d4fe01a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.statecheck;
-
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.TestAlertBolt;
-import org.apache.eagle.alert.engine.runner.AlertBolt;
-import org.apache.eagle.alert.utils.AlertConstants;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.jetbrains.annotations.NotNull;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-/**
- * Created on 8/4/16.
- */
-public class TestStateCheckPolicy {
-
-    @Test
-    public void testStateCheck() throws Exception {
-        PolicyGroupEvaluatorImpl impl = new PolicyGroupEvaluatorImpl("test-statecheck-poicyevaluator");
-        AtomicBoolean verified = new AtomicBoolean(false);
-        OutputCollector collector = new OutputCollector(new IOutputCollector() {
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                verified.set(true);
-                Assert.assertEquals("perfmon_latency_check_output2", ((PublishPartition) tuple.get(0)).getStreamId());
-                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
-                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", ((PublishPartition) tuple.get(0)).getStreamId(), tuple));
-                return null;
-            }
-
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-            }
-
-            @Override
-            public void ack(Tuple input) {
-            }
-
-            @Override
-            public void fail(Tuple input) {
-            }
-
-            @Override
-            public void reportError(Throwable error) {
-            }
-        });
-
-        AlertBolt alertBolt = TestAlertBolt.createAlertBolt(collector);
-        AlertBoltSpec spec = createAlertSpec();
-        Map<String, StreamDefinition> definitionMap = createStreamMap();
-        
-        
-        List<PolicyDefinition> policies = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"),
-                new TypeReference<List<PolicyDefinition>>() {
-                });
-        List<StreamDefinition> streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"),
-                new TypeReference<List<StreamDefinition>>() {
-                });
-        spec.addPublishPartition("perfmon_latency_check_output2", policies.get(0).getName(), "testPublishBolt", null);
-        
-        alertBolt.onAlertBoltSpecChange(spec, definitionMap);
-
-        // send data now
-        sendData(alertBolt, definitionMap, spec.getBoltPoliciesMap().values().iterator().next().get(0));
-
-        Thread.sleep(3000);
-        Assert.assertTrue(verified.get());
-    }
-
-    private void sendData(AlertBolt alertBolt, Map<String, StreamDefinition> definitionMap, PolicyDefinition policyDefinition) {
-        StreamDefinition definition = definitionMap.get("perfmon_latency_stream");
-        long base = System.currentTimeMillis();
-        for (int i = 0; i < 2; i++) {
-            long time = base + i * 1000;
-
-            Map<String, Object> mapdata = new HashMap<>();
-            mapdata.put("host", "host-1");
-            mapdata.put("timestamp", time);
-            mapdata.put("metric", "perfmon_latency");
-            mapdata.put("pool", "raptor");
-            mapdata.put("value", 1000.0 + i * 1000.0);
-            mapdata.put("colo", "phx");
-
-            StreamEvent event = StreamEvent.builder().timestamep(time).attributes(mapdata, definition).build();
-            PartitionedEvent pEvent = new PartitionedEvent(event, policyDefinition.getPartitionSpec().get(0), 1);
-
-            GeneralTopologyContext mock = Mockito.mock(GeneralTopologyContext.class);
-
-            Mockito.when(mock.getComponentId(1)).thenReturn("taskId");
-            Mockito.when(mock.getComponentOutputFields("taskId", "test-stream-id")).thenReturn(new Fields(AlertConstants.FIELD_0));
-
-            TupleImpl ti = new TupleImpl(mock, Collections.singletonList(pEvent), 1, "test-stream-id");
-            alertBolt.execute(ti);
-        }
-    }
-
-    @NotNull
-    private Map<String, StreamDefinition> createStreamMap() throws Exception {
-        List<StreamDefinition> streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"),
-            new TypeReference<List<StreamDefinition>>() {
-            });
-        return streams.stream().collect(Collectors.toMap(StreamDefinition::getStreamId, item -> item));
-    }
-
-    private static ObjectMapper mapper = new ObjectMapper();
-
-    static {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-    }
-
-    private AlertBoltSpec createAlertSpec() throws Exception {
-        AlertBoltSpec spec = new AlertBoltSpec();
-
-        spec.setVersion("version1");
-        spec.setTopologyName("testTopology");
-
-        List<PolicyDefinition> policies = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"),
-            new TypeReference<List<PolicyDefinition>>() {
-            });
-        Assert.assertTrue(policies.size() > 0);
-        spec.addBoltPolicy("alertBolt1", policies.get(0).getName());
-        spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(policies));
-
-        return spec;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
deleted file mode 100644
index 7212785..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Properties;
-
-@SuppressWarnings( {"serial", "unused"})
-public class AlertTopologyTest implements Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(AlertTopologyTest.class);
-    char[] alphabets = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'};
-
-    @Ignore
-    @Test
-    public void testMultipleTopics() throws Exception {
-        final String topoId = "myTopology";
-        int numGroupbyBolts = 2;
-        int numTotalGroupbyBolts = 3;
-        System.setProperty("eagle.correlation.numGroupbyBolts", String.valueOf(numGroupbyBolts));
-        System.setProperty("eagle.correlation.topologyName", topoId);
-        System.setProperty("eagle.correlation.mode", "local");
-        System.setProperty("eagle.correlation.zkHosts", "localhost:2181");
-        final String topicName1 = "testTopic3";
-        final String topicName2 = "testTopic4";
-        // ensure topic ready
-        LogManager.getLogger(CorrelationSpout.class).setLevel(Level.DEBUG);
-        Config config = ConfigFactory.load();
-
-        CreateTopicUtils.ensureTopicReady(System.getProperty("eagle.correlation.zkHosts"), topicName1);
-        CreateTopicUtils.ensureTopicReady(System.getProperty("eagle.correlation.zkHosts"), topicName2);
-
-        TopologyBuilder topoBuilder = new TopologyBuilder();
-
-        int numBolts = config.getInt("eagle.correlation.numGroupbyBolts");
-        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, numBolts);
-        String spoutId = "correlation-spout";
-        SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout);
-        for (int i = 0; i < numBolts; i++) {
-            TestBolt bolt = new TestBolt();
-            BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt);
-            boltDecl.fieldsGrouping(spoutId, "stream_" + i, new Fields());
-        }
-
-        String topoName = config.getString("eagle.correlation.topologyName");
-        LOG.info("start topology in local mode");
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(topoName, new HashMap<>(), topoBuilder.createTopology());
-
-        while (true) {
-            try {
-                Thread.sleep(1000);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    @SuppressWarnings( {"rawtypes", "unchecked"})
-    @Ignore
-    @Test
-    public void generateRandomStringsToKafka() {
-        String topic = "testTopic3";
-        int max = 1000;
-        Properties configMap = new Properties();
-        configMap.put("bootstrap.servers", "localhost:6667");
-        configMap.put("metadata.broker.list", "localhost:6667");
-        configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("request.required.acks", "1");
-        configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
-
-        int i = 0;
-        while (i++ < max) {
-            String randomString = generateRandomString();
-            System.out.println("sending string : " + randomString);
-            ProducerRecord record = new ProducerRecord(topic, randomString);
-            producer.send(record);
-            if (i % 10 == 0) {
-                try {
-                    Thread.sleep(10);
-                } catch (Exception ex) {
-                }
-            }
-        }
-        producer.close();
-    }
-
-    private String generateRandomString() {
-        long count = Math.round(Math.random() * 10);
-        if (count == 0) {
-            count = 1;
-        }
-        StringBuilder sb = new StringBuilder();
-        while (count-- > 0) {
-            int index = (int) (Math.floor(Math.random() * 26));
-            sb.append(alphabets[index]);
-        }
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
deleted file mode 100644
index d011770..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.HashMap;
-
-/**
- * Since 4/28/16.
- */
-@SuppressWarnings( {"serial", "unused", "rawtypes"})
-public class CoordinatorSpoutIntegrationTest implements Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorSpoutIntegrationTest.class);
-
-    @Ignore  // this test need zookeeper
-    @Test
-    public void testConfigNotify() throws Exception {
-        final String topoId = "myTopology";
-        int numGroupbyBolts = 2;
-        int numTotalGroupbyBolts = 3;
-        System.setProperty("correlation.serviceHost", "sandbox.hortonworks.com");
-        System.setProperty("correlation.servicePort", "58080");
-        System.setProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum", "sandbox.hortonworks.com:2181");
-        System.setProperty("correlation.numGroupbyBolts", String.valueOf(numGroupbyBolts));
-        System.setProperty("correlation.topologyName", topoId);
-        System.setProperty("correlation.mode", "local");
-        System.setProperty("correlation.zkHosts", "sandbox.hortonworks.com:2181");
-        final String topicName1 = "testTopic3";
-        final String topicName2 = "testTopic4";
-        // ensure topic ready
-        LogManager.getLogger(CorrelationSpout.class).setLevel(Level.DEBUG);
-        Config config = ConfigFactory.load();
-
-        CreateTopicUtils.ensureTopicReady(System.getProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum"), topicName1);
-        CreateTopicUtils.ensureTopicReady(System.getProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum"), topicName2);
-
-        TopologyBuilder topoBuilder = new TopologyBuilder();
-
-        int numBolts = config.getInt("correlation.numGroupbyBolts");
-        String spoutId = "correlation-spout";
-        CorrelationSpout spout = new CorrelationSpout(config, topoId,
-            new MockMetadataChangeNotifyService(topoId, spoutId), numBolts);
-        SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout);
-        declarer.setNumTasks(2);
-        for (int i = 0; i < numBolts; i++) {
-            TestBolt bolt = new TestBolt();
-            BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt);
-            boltDecl.fieldsGrouping(spoutId,
-                StreamIdConversion.generateStreamIdBetween(AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME + i), new Fields());
-        }
-
-        String topoName = config.getString("correlation.topologyName");
-        LOG.info("start topology in local mode");
-        LocalCluster cluster = new LocalCluster();
-        StormTopology topology = topoBuilder.createTopology();
-        cluster.submitTopology(topoName, new HashMap(), topology);
-
-
-        Utils.sleep(Long.MAX_VALUE);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
deleted file mode 100644
index 5a86cd2..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import storm.kafka.KafkaSpoutWrapper;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-@Ignore
-public class CorrelationSpoutTest {
-    @SuppressWarnings("unused")
-    private static final String TEST_SPOUT_ID = "spout-id-1";
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CorrelationSpoutTest.class);
-    private String dataSourceName = "ds-name";
-
-    @SuppressWarnings( {"serial", "rawtypes"})
-    @Test
-    public void testMetadataInjestion_emptyMetadata() throws Exception {
-        String topoId = "testMetadataInjection";
-        Config config = ConfigFactory.load();
-        AtomicBoolean validated = new AtomicBoolean(false);
-        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
-            @Override
-            protected KafkaSpoutWrapper createKafkaSpout(Config config, Map conf, TopologyContext context,
-                                                         SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds)
-                throws Exception {
-                validated.set(true);
-                return null;
-            }
-        };
-        Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
-        ds.setName("ds-name");
-        ds.setType("KAFKA");
-        ds.setProperties(new HashMap<String, String>());
-        ds.setTopic("name-of-topic1");
-        ds.setSchemeCls("PlainStringScheme");
-        ds.setCodec(new Tuple2StreamMetadata());
-        Map<String, Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
-        dsMap.put(ds.getName(), ds);
-
-        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(ds.getName(), "s1");
-
-        Map<String, List<StreamRepartitionMetadata>> dataSources = new HashMap<String, List<StreamRepartitionMetadata>>();
-        dataSources.put(ds.getName(), Arrays.asList(m1));
-
-        SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap);
-
-        spout.onReload(newMetadata, null);
-        Assert.assertTrue(validated.get());
-    }
-
-    @SuppressWarnings( {"serial", "rawtypes"})
-    @Test
-    public void testMetadataInjestion_oneNewTopic2Streams() throws Exception {
-        String topoId = "testMetadataInjection";
-        final String topicName = "testTopic";
-
-        Config config = ConfigFactory.load();
-        final AtomicBoolean verified = new AtomicBoolean(false);
-        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
-            @Override
-            protected KafkaSpoutWrapper createKafkaSpout(Config config,
-                                                         Map conf,
-                                                         TopologyContext context,
-                                                         SpoutOutputCollector collector,
-                                                         String topic,
-                                                         String topic2SchemeClsName,
-                                                         SpoutSpec streamMetadatas,
-                                                         Map<String, StreamDefinition> sds) {
-                Assert.assertEquals(1, streamMetadatas.getStreamRepartitionMetadataMap().size());
-                Assert.assertTrue(streamMetadatas.getStream("s1") != null);
-                Assert.assertTrue(streamMetadatas.getStream("s2") != null);
-                Assert.assertEquals(topicName, streamMetadatas.getStream("s1").getTopicName());
-                Assert.assertEquals(topicName, streamMetadatas.getStream("s2").getTopicName());
-                LOG.info("successfully verified new topic and streams");
-                verified.set(true);
-                return null;
-            }
-        };
-
-        Map<String, Kafka2TupleMetadata> dsMap = createDatasource(topicName, dataSourceName);
-
-        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(dataSourceName, "s1");
-        StreamRepartitionMetadata m2 = new StreamRepartitionMetadata(dataSourceName, "s2");
-        Map<String, List<StreamRepartitionMetadata>> dataSources = new HashMap<String, List<StreamRepartitionMetadata>>();
-        dataSources.put(dataSourceName, Arrays.asList(m1, m2));
-
-        SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap);
-        spout.onReload(newMetadata, null);
-        Assert.assertTrue(verified.get());
-    }
-
-    private Map<String, Kafka2TupleMetadata> createDatasource(final String topicName, final String dataSourceName) {
-        Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
-
-        ds.setName(dataSourceName);
-        ds.setType("KAFKA");
-        ds.setProperties(new HashMap<String, String>());
-        ds.setTopic(topicName);
-        ds.setSchemeCls("PlainStringScheme");
-        ds.setCodec(new Tuple2StreamMetadata());
-        Map<String, Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
-        dsMap.put(ds.getName(), ds);
-        return dsMap;
-    }
-
-    @SuppressWarnings( {"serial", "rawtypes"})
-    @Test
-    public void testMetadataInjestion_deleteOneTopic() throws Exception {
-        String topoId = "testMetadataInjection";
-        final String topicName = "testTopic";
-        Config config = ConfigFactory.load();
-        final AtomicBoolean verified = new AtomicBoolean(false);
-        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
-            @Override
-            protected KafkaSpoutWrapper createKafkaSpout(Config config,
-                                                         Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
-                                                         String schemeClsName, SpoutSpec streamMetadatas,
-                                                         Map<String, StreamDefinition> sds) {
-                return new KafkaSpoutWrapper(null, null);
-            }
-
-            @Override
-            protected void removeKafkaSpout(KafkaSpoutWrapper wrapper) {
-                LOG.info("successfully verified removed topic and streams");
-                verified.set(true);
-            }
-        };
-
-        Map<String, Kafka2TupleMetadata> dsMap = createDatasource(topicName, dataSourceName);
-
-        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(dataSourceName, "s1");
-
-        Map<String, List<StreamRepartitionMetadata>> streamMetadatas = new HashMap<String, List<StreamRepartitionMetadata>>();
-        streamMetadatas.put(dataSourceName, Arrays.asList(m1));
-
-        SpoutSpec cachedMetadata = new SpoutSpec(topoId, streamMetadatas, null, dsMap);
-        // add new topic
-        spout.onReload(cachedMetadata, null);
-        // delete new topic
-        try {
-            spout.onReload(new SpoutSpec(topoId, new HashMap<String, List<StreamRepartitionMetadata>>(), new HashMap<>(), new HashMap<String, Kafka2TupleMetadata>()),
-                null);
-        } catch (Exception ex) {
-            LOG.error("error reloading spout metadata", ex);
-            throw ex;
-        }
-        Assert.assertTrue(verified.get());
-    }
-
-//    @Ignore
-//    @SuppressWarnings("rawtypes")
-//    @Test
-//    public void testSpout() {
-//        String topoId = "testMetadataInjection";
-//        final AtomicBoolean verified = new AtomicBoolean(false);
-//        Config config = ConfigFactory.load();
-//        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1);
-//
-//        TopologyBuilder builder = new TopologyBuilder();
-//        // only one spout
-//        builder.setSpout("cc-spout", spout);
-//        builder.setBolt("recv-bolt", new RecvBolt()).globalGrouping("cc-spout");
-//
-//        StormTopology topology = builder.createTopology();
-//        LocalCluster cluster = new LocalCluster();
-//        cluster.submitTopology(topoId, new HashMap(), topology);
-//
-//        while (true) {
-//            try {
-//                Thread.sleep(1000);
-//            } catch (Exception e) {
-//                e.printStackTrace();
-//            }
-//        }
-//    }
-//    
-//    @SuppressWarnings("serial")
-//    private static class RecvBolt extends BaseRichBolt {
-//
-//        @Override
-//        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-//        }
-//
-//        @Override
-//        public void execute(Tuple input) {
-//            
-//        }
-//
-//        @Override
-//        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-//        }
-//        
-//    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
deleted file mode 100644
index 5aff5c8..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.topology;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.*;
-import backtype.storm.metric.LoggingMetricsConsumer;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * WordCount but the spout does not stop, and the bolts are implemented in
- * java.  This can show how fast the word count can run.
- */
-@SuppressWarnings("serial")
-public class FastWordCountTopology {
-    public static class FastRandomSentenceSpout extends BaseRichSpout {
-        private final static Logger LOG = LoggerFactory.getLogger(FastRandomSentenceSpout.class);
-        SpoutOutputCollector _collector;
-        Random _rand;
-        private static final String[] CHOICES = {
-            "marry had a little lamb whos fleese was white as snow",
-            "and every where that marry went the lamb was sure to go",
-            "one two three four five six seven eight nine ten",
-            "this is a test of the emergency broadcast system this is only a test",
-            "peter piper picked a peck of pickeled peppers"
-        };
-
-        @SuppressWarnings("rawtypes")
-        @Override
-        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-            _collector = collector;
-            _rand = ThreadLocalRandom.current();
-        }
-
-        @Override
-        public void nextTuple() {
-            String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
-            _collector.emit(new Values(sentence), sentence);
-            LOG.debug("Emit tuple: {}, id:{}", new Values(sentence), sentence);
-        }
-
-        @Override
-        public void ack(Object id) {
-            //Ignored
-        }
-
-        @Override
-        public void fail(Object id) {
-            _collector.emit(new Values(id), id);
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("sentence"));
-        }
-    }
-
-    public static class SplitSentence extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word : sentence.split("\\s+")) {
-                collector.emit(new Values(word, 1));
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
-        }
-    }
-
-
-    public static class WordCount extends BaseBasicBolt {
-        Map<String, Integer> counts = new HashMap<String, Integer>();
-
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String word = tuple.getString(0);
-            Integer count = counts.get(word);
-            if (count == null) {
-                count = 0;
-            }
-            count++;
-            counts.put(word, count);
-            collector.emit(new Values(word, count));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
-        }
-    }
-
-    public static void printMetrics(Nimbus.Client client, String name) throws Exception {
-        ClusterSummary summary = client.getClusterInfo();
-        String id = null;
-        for (TopologySummary ts : summary.get_topologies()) {
-            if (name.equals(ts.get_name())) {
-                id = ts.get_id();
-            }
-        }
-        if (id == null) {
-            throw new Exception("Could not find a topology named " + name);
-        }
-        TopologyInfo info = client.getTopologyInfo(id);
-        int uptime = info.get_uptime_secs();
-        long acked = 0;
-        long failed = 0;
-        double weightedAvgTotal = 0.0;
-        for (ExecutorSummary exec : info.get_executors()) {
-            if ("spout".equals(exec.get_component_id())) {
-                SpoutStats stats = exec.get_stats().get_specific().get_spout();
-                Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-                Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-                Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
-                for (String key : ackedMap.keySet()) {
-                    if (failedMap != null) {
-                        Long tmp = failedMap.get(key);
-                        if (tmp != null) {
-                            failed += tmp;
-                        }
-                    }
-                    long ackVal = ackedMap.get(key);
-                    double latVal = avgLatMap.get(key) * ackVal;
-                    acked += ackVal;
-                    weightedAvgTotal += latVal;
-                }
-            }
-        }
-        double avgLatency = weightedAvgTotal / acked;
-        System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " + (((double) acked) / uptime + " failed: " + failed));
-    }
-
-    public static void kill(Nimbus.Client client, String name) throws Exception {
-        KillOptions opts = new KillOptions();
-        opts.set_wait_secs(0);
-        client.killTopologyWithOpts(name, opts);
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout("spout", new FastRandomSentenceSpout(), 4);
-
-        builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
-        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word"));
-
-        Config conf = new Config();
-        conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
-
-        String name = "wc-test";
-        if (args != null && args.length > 0) {
-            name = args[0];
-        }
-
-//        conf.setNumWorkers(1);
-
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(name, conf, builder.createTopology());
-
-        Utils.sleep(Long.MAX_VALUE);
-//
-//        Map clusterConf = Utils.readStormConfig();
-//        clusterConf.putAll(Utils.readCommandLineOpts());
-//        Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-//
-//        //Sleep for 5 mins
-//        for (int i = 0; i < 10; i++) {
-//            Thread.sleep(30 * 1000);
-//            printMetrics(client, name);
-//        }
-//        kill(client, name);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
deleted file mode 100644
index b7d32ab..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Since 5/4/16.
- */
-@SuppressWarnings( {"serial"})
-public class MockMetadataChangeNotifyService extends AbstractMetadataChangeNotifyService implements Runnable {
-    private final static Logger LOG = LoggerFactory.getLogger(MockMetadataChangeNotifyService.class);
-    @SuppressWarnings("unused")
-    private static final String[] topics = new String[] {"testTopic3", "testTopic4", "testTopic5"};
-    @SuppressWarnings("unused")
-    private String topologyName;
-    @SuppressWarnings("unused")
-    private String spoutId;
-    private Map<String, StreamDefinition> sds;
-
-    public MockMetadataChangeNotifyService(String topologyName, String spoutId) {
-        this.topologyName = topologyName;
-        this.spoutId = spoutId;
-    }
-
-    @Override
-    public void init(Config config, MetadataType type) {
-        super.init(config, type);
-        this.sds = defineStreamDefinitions();
-        new Thread(this).start();
-    }
-
-    @Override
-    public void run() {
-        switch (type) {
-            case SPOUT:
-                notifySpout(Arrays.asList("testTopic3", "testTopic4"), Arrays.asList("testTopic5"));
-                break;
-            case STREAM_ROUTER_BOLT:
-                populateRouterMetadata();
-                break;
-            case ALERT_BOLT:
-                populateAlertBoltSpec();
-                break;
-            case ALERT_PUBLISH_BOLT:
-                notifyAlertPublishBolt();
-                break;
-            default:
-                LOG.error("that is not possible man!");
-        }
-    }
-
-    private Map<String, StreamDefinition> defineStreamDefinitions() {
-        Map<String, StreamDefinition> sds = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamDefinitionsSpec.json"),
-            new TypeReference<Map<String, StreamDefinition>>() {
-            });
-        return sds;
-    }
-
-    private void notifySpout(List<String> plainStringTopics, List<String> jsonStringTopics) {
-        SpoutSpec newSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testSpoutSpec.json"), SpoutSpec.class);
-        notifySpout(newSpec, sds);
-    }
-
-    private void populateRouterMetadata() {
-        RouterSpec boltSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamRouterBoltSpec.json"), RouterSpec.class);
-        notifyStreamRouterBolt(boltSpec, sds);
-    }
-
-    private void populateAlertBoltSpec() {
-        AlertBoltSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testAlertBoltSpec.json"), AlertBoltSpec.class);
-        notifyAlertBolt(spec, sds);
-    }
-
-    private void notifyAlertPublishBolt() {
-        PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
-        notifyAlertPublishBolt(spec, sds);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
deleted file mode 100644
index 0ade06a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.topology;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.FileWriter;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Created on 3/12/16.
- */
-@SuppressWarnings( {"serial", "unchecked", "rawtypes", "resource"})
-public class SendData2KafkaTest implements Serializable {
-    /**
-     * {"timestamp": 10000, "metric": "esErrorLogEvent", "instanceUuid": "vm-InstanceId1", "host":"test-host1", "type":"nova", "stack":"NullPointException-..........."}
-     * {"timestamp": 10000, "metric": "instanceFailureLogEvent", "instanceUuid": "vm-InstanceId1", "message":"instance boot failure for user liasu!"}
-     */
-    @Test
-    @Ignore
-    public void testOutput() throws Exception {
-        String s1 = "{\"timestamp\": %d, \"metric\": \"esErrorLogEvent\", \"instanceUuid\": \"vm-InstanceId%d\", \"host\":\"test-host1\", \"type\":\"nova\", \"stack\":\"NullPointException-...........\"}";
-        String s2 = "{\"timestamp\": %d, \"metric\": \"instanceFailureLogEvent\", \"instanceUuid\": \"vm-InstanceId%d\", \"message\":\"instance boot failure for user liasu!\"}";
-
-        PrintWriter espw = new PrintWriter(new FileWriter("src/test/resources/es.log"));
-        PrintWriter ifpw = new PrintWriter(new FileWriter("src/test/resources/if.log"));
-
-        long base = System.currentTimeMillis();
-        long timestamp = 10000;
-
-        for (int i = 0; i < 10; i++) {
-
-            timestamp = base + i * 10000;
-            for (int j = 0; j < 10; j++) {
-                timestamp = timestamp + j * 1000;
-
-                espw.println(String.format(s1, timestamp, i));
-            }
-
-            ifpw.println(String.format(s2, timestamp, i));
-        }
-
-        espw.flush();
-        ifpw.flush();
-    }
-
-    @Test
-    @Ignore
-    public void sendKakfa() throws Exception {
-        List<String> eslogs = Files.readAllLines(Paths.get(SendData2KafkaTest.class.getResource("/es.log").toURI()), Charset.defaultCharset());
-        List<String> iflogs = Files.readAllLines(Paths.get(SendData2KafkaTest.class.getResource("/if.log").toURI()), Charset.defaultCharset());
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", "localhost:9092");
-        props.put("value.serializer", StringSerializer.class.getCanonicalName());
-        props.put("key.serializer", StringSerializer.class.getCanonicalName());
-
-        KafkaProducer producer = new KafkaProducer<>(props);
-        while (true) {
-            for (String s : eslogs) {
-                ProducerRecord<String, String> record = new ProducerRecord<>("nn_jmx_metric_sandbox", s);
-                producer.send(record);
-            }
-
-            for (String s : iflogs) {
-                ProducerRecord<String, String> record = new ProducerRecord<>("nn_jmx_metric_sandbox", s);
-                producer.send(record);
-            }
-
-            Thread.sleep(5000);
-        }
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
deleted file mode 100644
index dc9c9b3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-@Ignore
-@SuppressWarnings( {"rawtypes", "serial"})
-public class TestBolt extends BaseRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class);
-    private OutputCollector collector;
-    private long count;
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        LOG.info("data is coming: " + input);
-        count++;
-        if (count % 10 == 0) {
-            LOG.info("count = " + count);
-        }
-        collector.ack(input);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
deleted file mode 100644
index 29986f1..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-
-/**
- * Storm 1.0.0 uses ByteBuffer, we need test it
- */
-public class TestByteBuffer {
-    @Test
-    public void testBB() {
-        ByteBuffer bb = ByteBuffer.allocate(100);
-        bb.put((byte) 12);
-        Assert.assertTrue(bb.hasArray());
-        bb.rewind();
-        Assert.assertEquals(12, bb.get());
-    }
-
-    @Test
-    public void testMultipleStrings() throws Exception {
-        ByteBuffer bb = ByteBuffer.allocate(100);
-        bb.put("abc".getBytes("UTF-8"));
-        bb.put("xyz".getBytes("UTF-8"));
-        bb.put("1234".getBytes("UTF-8"));
-        bb.rewind();
-        if (bb.hasArray()) {
-            int base = bb.arrayOffset();
-            String ret = new String(bb.array(), base + bb.position(), bb.remaining());
-            System.out.println("string is: " + ret);
-        }
-    }
-}


Mime
View raw message