eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [09/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:07:48 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
deleted file mode 100644
index 6cadba7..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
-import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl;
-import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.Slf4jReporter;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import com.google.common.collect.Ordering;
-
-/**
- * -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+PrintGCTaskTimeStamps -XX:+PrintGCDetails -verbose:gc
- */
-public class StreamSortHandlerTest {
-    private final static Logger LOG = LoggerFactory.getLogger(StreamSortHandlerTest.class);
-
-    static {
-        LOG.info(ManagementFactory.getRuntimeMXBean().getName());
-    }
-
-    private ScheduledReporter metricReporter;
-    @Before
-    public void setUp(){
-        final MetricRegistry metrics = new MetricRegistry();
-        metrics.registerAll(new MemoryUsageGaugeSet());
-        metrics.registerAll(new GarbageCollectorMetricSet());
-        metricReporter = Slf4jReporter.forRegistry(metrics)
-                .filter((name, metric) -> name.matches("(.*heap|pools.PS.*).usage"))
-                .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
-                .convertRatesTo(TimeUnit.SECONDS)
-                .convertDurationsTo(TimeUnit.MILLISECONDS)
-                .build();
-        metricReporter.start(60,TimeUnit.SECONDS);
-    }
-
-    /**
-     * Used to debug window bucket lifecycle
-     *
-     * Window period: PT1s, margin: 5s
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testWithUnsortedEventsIn1MinuteWindow() throws InterruptedException {
-        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
-        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1m",5000),mockCollector);
-        List<PartitionedEvent> unsortedList = new LinkedList<>();
-
-        int i = 0;
-        while(i<1000) {
-            PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
-            sortHandler.nextEvent(event);
-            unsortedList.add(event);
-            if(event.getTimestamp()>timeClock.getTime()) {
-                timeClock.moveForward(event.getTimestamp());
-            }
-            sortHandler.onTick(timeClock,System.currentTimeMillis());
-            i++;
-        }
-        sortHandler.close();
-        Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
-        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertTrue(mockCollector.get().size() > 0);
-    }
-
-    @Test
-    public void testStreamSortHandlerWithUnsortedEventsIn1HourWindow() throws InterruptedException {
-        testWithUnsortedEventsIn1hWindow(1000000);
-    }
-
-    @Test
-    public void testSortedInPatient() throws InterruptedException {
-        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
-        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
-        List<PartitionedEvent> sortedList = new LinkedList<>();
-
-        int i = 0;
-        while(i<1000000) {
-            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",System.currentTimeMillis()+i);
-            sortHandler.nextEvent(event);
-            sortedList.add(event);
-            if(event.getTimestamp()>timeClock.getTime()) {
-                timeClock.moveForward(event.getTimestamp());
-            }
-            sortHandler.onTick(timeClock,System.currentTimeMillis());
-            i++;
-        }
-        sortHandler.close();
-        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
-        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertEquals(1000000,mockCollector.get().size());
-    }
-
-    /**
-     * -XX:+PrintGC
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testWithUnsortedEventsInLargeWindowBenchmark() throws InterruptedException {
-        metricReporter.report();
-        testWithUnsortedEventsIn1hWindow(1000);
-        metricReporter.report();
-        testWithUnsortedEventsIn1hWindow(10000);
-        metricReporter.report();
-        testWithUnsortedEventsIn1hWindow(100000);
-        metricReporter.report();
-        testWithUnsortedEventsIn1hWindow(1000000);
-        metricReporter.report();
-//        testWithUnsortedEventsIn1hWindow(10000000);
-//        metricReporter.report();
-    }
-
-    public void testWithUnsortedEventsIn1hWindow(int count) throws InterruptedException {
-        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
-        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
-        List<PartitionedEvent> unsortedList = new LinkedList<>();
-
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        int i = 0;
-        while(i<count) {
-            PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
-            sortHandler.nextEvent(event);
-            unsortedList.add(event);
-            if(event.getEvent().getTimestamp()>timeClock.getTime()) {
-                timeClock.moveForward(event.getEvent().getTimestamp());
-            }
-            sortHandler.onTick(timeClock,System.currentTimeMillis());
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Produced {} events in {} ms",count,stopWatch.getTime());
-        sortHandler.close();
-        Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
-        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertTrue(mockCollector.get().size()>=0);
-    }
-
-    /**
-     * Used to debug window bucket lifecycle
-     *
-     * Window period: PT1h, margin: 5s
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testWithSortedEvents() throws InterruptedException {
-        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
-        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
-        List<PartitionedEvent> sortedList = new LinkedList<>();
-
-        int i = 0;
-        while(i<1000000) {
-            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",System.currentTimeMillis()+i);
-            sortHandler.nextEvent(event);
-            sortedList.add(event);
-            if(event.getTimestamp()>timeClock.getTime()) {
-                timeClock.moveForward(event.getTimestamp());
-            }
-            sortHandler.onTick(timeClock,System.currentTimeMillis());
-            i++;
-        }
-        sortHandler.close();
-        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
-        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertEquals(1000000,mockCollector.get().size());
-    }
-
-    /**
-     * Used to debug window bucket lifecycle
-     *
-     * Window period: PT1h, margin: 5s
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testWithSortedEventsAndExpireBySystemTime() throws InterruptedException {
-        MockPartitionedCollector mockCollector = new MockPartitionedCollector();
-        StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT10s",1000),mockCollector);
-        List<PartitionedEvent> sortedList = new LinkedList<>();
-
-        PartitionedEvent event = MockSampleMetadataFactory.createRandomSortedEventGroupedByName("sampleStream_1");
-        sortHandler.nextEvent(event);
-        sortedList.add(event);
-        timeClock.moveForward(event.getTimestamp());
-        sortHandler.onTick(timeClock,System.currentTimeMillis());
-
-        // Triggered to become expired by System time
-        sortHandler.onTick(timeClock,System.currentTimeMillis()+10*1000+1000L + 1);
-
-        Assert.assertTrue(timeOrdering.isOrdered(sortedList));
-        Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertEquals(1,mockCollector.get().size());
-
-        sortHandler.close();
-    }
-
-//    @Test
-    public void testWithTimerLock() throws InterruptedException {
-        Timer timer = new Timer();
-        List<Long> collected = new ArrayList<>();
-        timer.schedule(new TimerTask() {
-            @Override
-            public void run() {
-                synchronized (collected) {
-                    LOG.info("Ticking {}", DateTimeUtil.millisecondsToHumanDateWithMilliseconds(System.currentTimeMillis()));
-                    collected.add(System.currentTimeMillis());
-                    try {
-                        Thread.sleep(5000);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-        },0,100);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
deleted file mode 100644
index dc9782e..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-
-@Ignore
-public class StreamWindowBenchmarkTest {
-    private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowBenchmarkTest.class);
-    public void sendDESCOrderedEventsToWindow(StreamWindow window, StreamWindowRepository.StorageType storageType, int num) {
-        LOGGER.info("Sending {} events to {} ({})",num,window.getClass().getSimpleName(),storageType);
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        int i=0;
-        while(i<num) {
-            PartitionedEvent event = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream_1",(window.startTime()+i));
-            window.add(event);
-            i++;
-        }
-        stopWatch.stop();
-        performanceReport.put(num+"\tInsertTime\t"+storageType,stopWatch.getTime());
-        LOGGER.info("Inserted {} events in {} ms",num,stopWatch.getTime());
-        stopWatch.reset();
-        stopWatch.start();
-        window.flush();
-        stopWatch.stop();
-        performanceReport.put(num+"\tReadTime\t"+storageType,stopWatch.getTime());
-    }
-
-    private ScheduledReporter metricReporter;
-    private Map<String,Long> performanceReport;
-    @Before
-    public void setUp(){
-        final MetricRegistry metrics = new MetricRegistry();
-        metrics.registerAll(new MemoryUsageGaugeSet());
-        metrics.registerAll(new GarbageCollectorMetricSet());
-        metricReporter = ConsoleReporter.forRegistry(metrics)
-                .filter((name, metric) -> name.matches("(.*heap|total).(usage|used)"))
-//                .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
-                .convertRatesTo(TimeUnit.SECONDS)
-                .convertDurationsTo(TimeUnit.MILLISECONDS)
-                .build();
-        metricReporter.start(60,TimeUnit.SECONDS);
-        performanceReport = new TreeMap<>();
-    }
-
-    @After
-    public void after(){
-        StringBuilder sb = new StringBuilder();
-        for(Map.Entry<String,Long> entry:performanceReport.entrySet()){
-            sb.append(String.format("%-40s\t%s\n",entry.getKey(),entry.getValue()));
-        }
-        LOGGER.info("\n===== Benchmark Result Report =====\n\n{}",sb.toString());
-    }
-
-    private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
-    private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-05 00:00:00,000");
-    private final long margin = (stop - start)/3;
-
-    private void benchmarkTest(StreamWindow window, StreamWindowRepository.StorageType storageType){
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        LOGGER.info("\n===== Benchmark Test for {} ({}) =====",window.getClass().getSimpleName(),storageType);
-        metricReporter.report();
-        sendDESCOrderedEventsToWindow(window,storageType,1000);
-        metricReporter.report();
-        sendDESCOrderedEventsToWindow(window,storageType,10000);
-        metricReporter.report();
-        sendDESCOrderedEventsToWindow(window,storageType,100000);
-        metricReporter.report();
-        sendDESCOrderedEventsToWindow(window,storageType,1000000);
-        metricReporter.report();
-        stopWatch.stop();
-        LOGGER.info("\n===== Finished in total {} ms =====\n",stopWatch.getTime());
-    }
-
-    @Test @Ignore
-    public void testStreamWindowBenchmarkMain(){
-        testStreamSortedWindowOnHeap();
-        testStreamSortedWindowInSerializedMemory();
-        testStreamSortedWindowOffHeap();
-        testStreamSortedWindowFile();
-    }
-
-    @Test @Ignore
-    public void testStreamSortedWindowOnHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
-        benchmarkTest(window,StreamWindowRepository.StorageType.ONHEAP);
-        window.close();
-    }
-
-    @Test @Ignore
-    public void testStreamSortedWindowInSerializedMemory() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
-        benchmarkTest(window,StreamWindowRepository.StorageType.MEMORY);
-        window.close();
-    }
-
-    @Test @Ignore
-    public void testStreamSortedWindowOffHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
-        benchmarkTest(window,StreamWindowRepository.StorageType.DIRECT_MEMORY);
-        window.close();
-    }
-
-    @Test @Ignore
-    public void testStreamSortedWindowFile() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
-        benchmarkTest(window,StreamWindowRepository.StorageType.FILE_RAF);
-        window.close();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
deleted file mode 100644
index 950aa34..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import java.util.List;
-
-import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
-import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Ordering;
-
-@SuppressWarnings("unused")
-public class StreamWindowTestSuite {
-    private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowTestSuite.class);
-
-    private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
-    private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000");
-    private final long margin = (stop - start)/3;
-
-    @Test
-    public void testStreamSortedWindowOnHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
-        streamSortedWindowMustTest(window);
-    }
-
-    @Test
-    public void testStreamSortedWindowInSerializedMemory() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
-        streamSortedWindowMustTest(window);
-    }
-
-    @Test
-    public void testStreamSortedWindowOffHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
-        streamSortedWindowMustTest(window);
-    }
-
-    @Test
-    public void testStreamSortedWindowFile() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
-        streamSortedWindowMustTest(window);
-    }
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void streamSortedWindowMustTest(StreamWindow window){
-        MockPartitionedCollector collector = new MockPartitionedCollector();
-        window.register(collector);
-
-        StreamTimeClock clock = new StreamTimeClockInLocalMemory("sampleStream_1");
-        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"));
-
-        // Current time is: "2016-05-04 00:00:30"
-        window.onTick(clock,System.currentTimeMillis());
-
-        Assert.assertTrue(window.alive());
-        Assert.assertFalse(window.expired());
-
-        // Accepted
-        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000")));
-        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000")));
-        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000")));
-        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000")));
-        Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000")));
-
-
-        // Rejected
-        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000")));
-        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000")));
-        Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000")));
-
-        // Accepted
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
-
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"))));
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"))));
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"))));
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"))));
-
-        // Should accept Duplicated
-        Assert.assertTrue("Should support duplicated timestamp",window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
-
-        Assert.assertEquals(6,window.size());
-
-        // Rejected
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000"))));
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000"))));
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000"))));
-
-        Assert.assertEquals(6,window.size());
-
-        // Now is: "2016-05-04 00:00:55"
-        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:55,000"));
-        window.onTick(clock,System.currentTimeMillis());
-        Assert.assertTrue(window.alive());
-        Assert.assertFalse(window.expired());
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
-        Assert.assertEquals(7,window.size());
-
-        // Flush when stream time delay too much after system time but window will still be alive
-        window.onTick(clock,System.currentTimeMillis() + 1 + stop - start + margin);
-        Assert.assertTrue(window.alive());
-        Assert.assertFalse(window.expired());
-        Assert.assertEquals(0,window.size());
-        Assert.assertEquals(7,collector.size());
-
-        Assert.assertFalse("Because window has flushed but not expired, window should reject future events < last flush stream time",
-                window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:54,000"))));
-        Assert.assertTrue("Because window has flushed but not expired, window should still accept future events >= last flush stream time",
-                window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"))));
-        Assert.assertEquals(1,window.size());
-        Assert.assertEquals(7,collector.size());
-
-        // Now is: "2016-05-04 00:01:10", not expire,
-        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:10,000"));
-        window.onTick(clock,System.currentTimeMillis() + 2 * (1+ stop - start + margin));
-        Assert.assertEquals(8,collector.size());
-
-        // Now is: "2016-05-04 00:01:20", expire
-        clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:20,000"));
-        window.onTick(clock,System.currentTimeMillis());
-        Assert.assertFalse(window.alive());
-        Assert.assertTrue(window.expired());
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
-        Assert.assertEquals(0,window.size());
-
-        Assert.assertEquals(8,collector.size());
-
-        Ordering ordering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
-        Assert.assertTrue(ordering.isOrdered(collector.get()));
-
-        List<PartitionedEvent> list = collector.get();
-        Assert.assertEquals(8,list.size());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"),list.get(0).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"),list.get(1).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"),list.get(2).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"),list.get(3).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"),list.get(4).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"),list.get(5).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"),list.get(6).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"),list.get(7).getTimestamp());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
deleted file mode 100644
index b7e76b8..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.TreeMultiset;
-
-/**
- * Since 5/10/16.
- */
-public class TreeMultisetComparatorTest {
-    /**
-     * if 2 different events have the same timestamp, and comparator return 0 when timestamp is same,
-     * when they are added into TreeMultiset, the second event will be replaced by the first event
-     */
-    @Test
-    public void testComparator(){
-        TreeMultiset<PartitionedEvent> set = TreeMultiset.create(PartitionedEventTimeOrderingComparator.INSTANCE);
-
-        // construct PartitionEvent1
-        PartitionedEvent event1 = new PartitionedEvent();
-        StreamPartition sp = new StreamPartition();
-        sp.setColumns(Arrays.asList("host"));
-        sp.setSortSpec(null);
-        sp.setStreamId("testStreamId");
-        sp.setType(StreamPartition.Type.GROUPBY);
-        event1.setPartition(sp);
-        event1.setPartitionKey(1000);
-        StreamEvent e1 = new StreamEvent();
-        e1.setData(new Object[]{18.4});
-        e1.setStreamId("testStreamId");
-        e1.setTimestamp(1462909984000L);
-        event1.setEvent(e1);
-
-        // construct PartitionEvent2 with same timestamp but different value
-        PartitionedEvent event2 = new PartitionedEvent();
-        event2.setPartition(sp);
-        event2.setPartitionKey(1000);
-        StreamEvent e2 = new StreamEvent();
-        e2.setData(new Object[]{16.3});
-        e2.setStreamId("testStreamId");
-        e2.setTimestamp(1462909984000L);
-        event2.setEvent(e2);
-
-        // construct PartitionEvent2 with same timestamp but different value
-        PartitionedEvent event3 = new PartitionedEvent();
-        event3.setPartition(sp);
-        event3.setPartitionKey(1000);
-        StreamEvent e3 = new StreamEvent();
-        e3.setData(new Object[]{14.3});
-        e3.setStreamId("testStreamId");
-        e3.setTimestamp(1462909984001L);
-        event3.setEvent(e3);
-
-        PartitionedEvent event4 = new PartitionedEvent();
-        event4.setPartition(sp);
-        event4.setPartitionKey(1000);
-        StreamEvent e4 = new StreamEvent();
-        e4.setData(new Object[]{14.3});
-        e4.setStreamId("testStreamId");
-        e4.setTimestamp(1462909984001L);
-        event4.setEvent(e4);
-
-        Assert.assertNotEquals(event2,event3);
-        Assert.assertEquals(event3,event4);
-
-        // check content in set
-        set.add(event1);
-        set.add(event2);
-        set.add(event3);
-        set.add(event4);
-        Assert.assertEquals(4, set.size());
-        set.forEach(System.out::println);
-
-
-        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1,event2));
-        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1,event3));
-        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event2,event3));
-        Assert.assertEquals(0,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event3,event4));
-
-        Iterator<PartitionedEvent> it = set.iterator();
-        Assert.assertEquals(16.3,it.next().getData()[0]);
-        Assert.assertEquals(18.4,it.next().getData()[0]);
-        Assert.assertEquals(14.3,it.next().getData()[0]);
-        Assert.assertEquals(14.3,it.next().getData()[0]);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
deleted file mode 100644
index 62427e0..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Properties;
-
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-@SuppressWarnings({"serial", "unused"})
-public class AlertTopologyTest implements Serializable{
-    private static final Logger LOG = LoggerFactory.getLogger(AlertTopologyTest.class);
-    char[] alphabets = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'};
-    @Ignore
-    @Test
-    public void testMultipleTopics() throws Exception {
-        final String topoId = "myTopology";
-        int numGroupbyBolts = 2;
-        int numTotalGroupbyBolts = 3;
-        System.setProperty("eagle.correlation.numGroupbyBolts", String.valueOf(numGroupbyBolts));
-        System.setProperty("eagle.correlation.topologyName", topoId);
-        System.setProperty("eagle.correlation.mode", "local");
-        System.setProperty("eagle.correlation.zkHosts", "localhost:2181");
-        final String topicName1 = "testTopic3";
-        final String topicName2 = "testTopic4";
-        // ensure topic ready
-        LogManager.getLogger(CorrelationSpout.class).setLevel(Level.DEBUG);
-        Config config = ConfigFactory.load();
-
-        CreateTopicUtils.ensureTopicReady(System.getProperty("eagle.correlation.zkHosts"), topicName1);
-        CreateTopicUtils.ensureTopicReady(System.getProperty("eagle.correlation.zkHosts"), topicName2);
-
-        TopologyBuilder topoBuilder = new TopologyBuilder();
-
-        int numBolts = config.getInt("eagle.correlation.numGroupbyBolts");
-        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, numBolts);
-        String spoutId = "correlation-spout";
-        SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout);
-        for (int i = 0; i < numBolts; i++) {
-            TestBolt bolt = new TestBolt();
-            BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt);
-            boltDecl.fieldsGrouping(spoutId, "stream_" + i, new Fields());
-        }
-
-        String topoName = config.getString("eagle.correlation.topologyName");
-        LOG.info("start topology in local mode");
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(topoName, new HashMap<>(), topoBuilder.createTopology());
-
-        while(true) {
-            try {
-                Thread.sleep(1000);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Ignore
-    @Test
-    public void generateRandomStringsToKafka(){
-        String topic = "testTopic3";
-        int max = 1000;
-        Properties configMap = new Properties();
-        configMap.put("bootstrap.servers", "sandbox.hortonworks.com:6667");
-        configMap.put("metadata.broker.list", "sandbox.hortonworks.com:6667");
-        configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("request.required.acks", "1");
-        configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
-        configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
-        KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
-
-        int i = 0;
-        while(i++ < max) {
-            String randomString = generateRandomString();
-            System.out.println("sending string : " + randomString);
-            ProducerRecord record = new ProducerRecord(topic, randomString);
-            producer.send(record);
-            if(i % 10 == 0){
-                try {
-                    Thread.sleep(10);
-                }catch(Exception ex){
-                }
-            }
-        }
-        producer.close();
-    }
-
-    private String generateRandomString(){
-        long count = Math.round(Math.random() * 10);
-        if(count == 0)
-            count = 1;
-        StringBuilder sb = new StringBuilder();
-        while(count-- > 0) {
-            int index = (int)(Math.floor(Math.random()*26));
-            sb.append(alphabets[index]);
-        }
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
deleted file mode 100644
index deca9b4..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import java.io.Serializable;
-import java.util.HashMap;
-
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * Since 4/28/16.
- */
-@SuppressWarnings({"serial", "unused", "rawtypes"})
-public class CoordinatorSpoutIntegrationTest implements Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorSpoutIntegrationTest.class);
-    @Ignore  // this test need zookeeper
-    @Test
-    public void testConfigNotify() throws Exception{
-        final String topoId = "myTopology";
-        int numGroupbyBolts = 2;
-        int numTotalGroupbyBolts = 3;
-        System.setProperty("correlation.serviceHost", "sandbox.hortonworks.com");
-        System.setProperty("correlation.servicePort", "58080");
-        System.setProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum", "sandbox.hortonworks.com:2181");
-        System.setProperty("correlation.numGroupbyBolts", String.valueOf(numGroupbyBolts));
-        System.setProperty("correlation.topologyName", topoId);
-        System.setProperty("correlation.mode", "local");
-        System.setProperty("correlation.zkHosts", "sandbox.hortonworks.com:2181");
-        final String topicName1 = "testTopic3";
-        final String topicName2 = "testTopic4";
-        // ensure topic ready
-        LogManager.getLogger(CorrelationSpout.class).setLevel(Level.DEBUG);
-        Config config = ConfigFactory.load();
-
-        CreateTopicUtils.ensureTopicReady(System.getProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum"), topicName1);
-        CreateTopicUtils.ensureTopicReady(System.getProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum"), topicName2);
-
-        TopologyBuilder topoBuilder = new TopologyBuilder();
-
-        int numBolts = config.getInt("correlation.numGroupbyBolts");
-        String spoutId = "correlation-spout";
-        CorrelationSpout spout = new CorrelationSpout(config, topoId,
-                new MockMetadataChangeNotifyService(topoId, spoutId), numBolts);
-        SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout);
-        declarer.setNumTasks(2);
-        for (int i = 0; i < numBolts; i++) {
-            TestBolt bolt = new TestBolt();
-            BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt);
-            boltDecl.fieldsGrouping(spoutId,
-                    StreamIdConversion.generateStreamIdBetween(AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME+i), new Fields());
-        }
-
-        String topoName = config.getString("correlation.topologyName");
-        LOG.info("start topology in local mode");
-        LocalCluster cluster = new LocalCluster();
-        StormTopology topology = topoBuilder.createTopology();
-        cluster.submitTopology(topoName, new HashMap(), topology);
-
-
-        Utils.sleep(Long.MAX_VALUE);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
deleted file mode 100644
index 6960537..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-
-import storm.kafka.KafkaSpoutWrapper;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-@Ignore
-public class CorrelationSpoutTest {
-    @SuppressWarnings("unused")
-    private static final String TEST_SPOUT_ID = "spout-id-1";
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CorrelationSpoutTest.class);
-    private String dataSourceName = "ds-name";
-
-    @SuppressWarnings({ "serial", "rawtypes" })
-    @Test
-    public void testMetadataInjestion_emptyMetadata() throws Exception{
-        String topoId = "testMetadataInjection";
-        Config config = ConfigFactory.load();
-        AtomicBoolean validated = new AtomicBoolean(false);
-        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
-            @Override
-            protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context,
-                    SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds)
-                    throws Exception {
-                validated.set(true);
-                return null;
-            }
-        };
-        Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
-        ds.setName("ds-name");
-        ds.setType("KAFKA");
-        ds.setProperties(new HashMap<String, String>());
-        ds.setTopic("name-of-topic1");
-        ds.setSchemeCls("PlainStringScheme");
-        ds.setCodec(new Tuple2StreamMetadata());
-        Map<String,Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
-        dsMap.put(ds.getName(), ds);
-
-        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(ds.getName(), "s1");
-
-        Map<String, List<StreamRepartitionMetadata>> dataSources = new HashMap<String, List<StreamRepartitionMetadata>>();
-        dataSources.put(ds.getName(), Arrays.asList(m1));
-
-        SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap);
-        
-        spout.onReload(newMetadata, null);
-        Assert.assertTrue(validated.get());
-    }
-
-    @SuppressWarnings({ "serial", "rawtypes" })
-    @Test
-    public void testMetadataInjestion_oneNewTopic2Streams() throws Exception {
-        String topoId = "testMetadataInjection";
-        final String topicName = "testTopic";
-
-        Config config = ConfigFactory.load();
-        final AtomicBoolean verified = new AtomicBoolean(false);
-        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1)  {
-            @Override
-            protected KafkaSpoutWrapper createKafkaSpout(Map conf, 
-                    TopologyContext context,
-                    SpoutOutputCollector collector, 
-                    String topic, 
-                    String topic2SchemeClsName,
-                    SpoutSpec streamMetadatas,
-                    Map<String, StreamDefinition> sds) {
-                Assert.assertEquals(1, streamMetadatas.getStreamRepartitionMetadataMap().size());
-                Assert.assertTrue(streamMetadatas.getStream("s1") != null);
-                Assert.assertTrue(streamMetadatas.getStream("s2") != null);
-                Assert.assertEquals(dataSourceName, streamMetadatas.getStream("s1").getTopicName());
-                Assert.assertEquals(dataSourceName, streamMetadatas.getStream("s2").getTopicName());
-                LOG.info("successfully verified new topic and streams");
-                verified.set(true);
-                return null;
-            }
-        };
-
-        Map<String, Kafka2TupleMetadata> dsMap = createDatasource(topicName, dataSourceName);
-
-        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(dataSourceName, "s1");
-        StreamRepartitionMetadata m2 = new StreamRepartitionMetadata(dataSourceName, "s2");
-        Map<String, List<StreamRepartitionMetadata>> dataSources = new HashMap<String, List<StreamRepartitionMetadata>>();
-        dataSources.put(dataSourceName, Arrays.asList(m1, m2));
-
-        SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap);
-        spout.onReload(newMetadata, null);
-        Assert.assertTrue(verified.get());
-    }
-
-    private Map<String, Kafka2TupleMetadata> createDatasource(final String topicName, final String dataSourceName) {
-        Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
-        
-        ds.setName(dataSourceName);
-        ds.setType("KAFKA");
-        ds.setProperties(new HashMap<String, String>());
-        ds.setTopic(topicName);
-        ds.setSchemeCls("PlainStringScheme");
-        ds.setCodec(new Tuple2StreamMetadata());
-        Map<String, Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
-        dsMap.put(ds.getName(), ds);
-        return dsMap;
-    }
-
-    @SuppressWarnings({ "serial", "rawtypes" })
-    @Test
-    public void testMetadataInjestion_deleteOneTopic() throws Exception{
-        String topoId = "testMetadataInjection";
-        final String topicName = "testTopic";
-        Config config = ConfigFactory.load();
-        final AtomicBoolean verified = new AtomicBoolean(false);
-        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
-            @Override
-            protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
-                                                         String schemeClsName, SpoutSpec streamMetadatas,
-                                                         Map<String, StreamDefinition> sds){
-                return new KafkaSpoutWrapper(null, null);
-            }
-            @Override
-            protected void removeKafkaSpout(KafkaSpoutWrapper wrapper){
-                LOG.info("successfully verified removed topic and streams");
-                verified.set(true);
-            }
-        };
-
-        Map<String, Kafka2TupleMetadata> dsMap = createDatasource(topicName, dataSourceName);
-
-        StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(dataSourceName, "s1");
-
-        Map<String, List<StreamRepartitionMetadata>> streamMetadatas = new HashMap<String, List<StreamRepartitionMetadata>>();
-        streamMetadatas.put(dataSourceName, Arrays.asList(m1));
-
-        SpoutSpec cachedMetadata = new SpoutSpec(topoId, streamMetadatas, null, dsMap);
-        // add new topic
-        spout.onReload(cachedMetadata, null);
-        // delete new topic
-        try {
-            spout.onReload(new SpoutSpec(topoId, new HashMap<String, List<StreamRepartitionMetadata>>(), new HashMap<>(), new HashMap<String, Kafka2TupleMetadata>()),
-                    null);
-        }catch(Exception ex){
-            LOG.error("error reloading spout metadata", ex);
-            throw ex;
-        }
-        Assert.assertTrue(verified.get());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
deleted file mode 100644
index 4779fac..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.topology;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.ExecutorSummary;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.SpoutStats;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.metric.LoggingMetricsConsumer;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-/**
- * WordCount but the spout does not stop, and the bolts are implemented in
- * java.  This can show how fast the word count can run.
- */
-@SuppressWarnings("serial")
-public class FastWordCountTopology {
-    public static class FastRandomSentenceSpout extends BaseRichSpout {
-        private final static Logger LOG = LoggerFactory.getLogger(FastRandomSentenceSpout.class);
-        SpoutOutputCollector _collector;
-        Random _rand;
-        private static final String[] CHOICES = {
-                "marry had a little lamb whos fleese was white as snow",
-                "and every where that marry went the lamb was sure to go",
-                "one two three four five six seven eight nine ten",
-                "this is a test of the emergency broadcast system this is only a test",
-                "peter piper picked a peck of pickeled peppers"
-        };
-
-        @SuppressWarnings("rawtypes")
-        @Override
-        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-            _collector = collector;
-            _rand = ThreadLocalRandom.current();
-        }
-
-        @Override
-        public void nextTuple() {
-            String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
-            _collector.emit(new Values(sentence), sentence);
-            LOG.debug("Emit tuple: {}, id:{}",new Values(sentence),sentence);
-        }
-
-        @Override
-        public void ack(Object id) {
-            //Ignored
-        }
-
-        @Override
-        public void fail(Object id) {
-            _collector.emit(new Values(id), id);
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("sentence"));
-        }
-    }
-
-    public static class SplitSentence extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word: sentence.split("\\s+")) {
-                collector.emit(new Values(word, 1));
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
-        }
-    }
-
-
-    public static class WordCount extends BaseBasicBolt {
-        Map<String, Integer> counts = new HashMap<String, Integer>();
-
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String word = tuple.getString(0);
-            Integer count = counts.get(word);
-            if (count == null)
-                count = 0;
-            count++;
-            counts.put(word, count);
-            collector.emit(new Values(word, count));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
-        }
-    }
-
-    public static void printMetrics(Nimbus.Client client, String name) throws Exception {
-        ClusterSummary summary = client.getClusterInfo();
-        String id = null;
-        for (TopologySummary ts: summary.get_topologies()) {
-            if (name.equals(ts.get_name())) {
-                id = ts.get_id();
-            }
-        }
-        if (id == null) {
-            throw new Exception("Could not find a topology named "+name);
-        }
-        TopologyInfo info = client.getTopologyInfo(id);
-        int uptime = info.get_uptime_secs();
-        long acked = 0;
-        long failed = 0;
-        double weightedAvgTotal = 0.0;
-        for (ExecutorSummary exec: info.get_executors()) {
-            if ("spout".equals(exec.get_component_id())) {
-                SpoutStats stats = exec.get_stats().get_specific().get_spout();
-                Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-                Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-                Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
-                for (String key: ackedMap.keySet()) {
-                    if (failedMap != null) {
-                        Long tmp = failedMap.get(key);
-                        if (tmp != null) {
-                            failed += tmp;
-                        }
-                    }
-                    long ackVal = ackedMap.get(key);
-                    double latVal = avgLatMap.get(key) * ackVal;
-                    acked += ackVal;
-                    weightedAvgTotal += latVal;
-                }
-            }
-        }
-        double avgLatency = weightedAvgTotal/acked;
-        System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
-    }
-
-    public static void kill(Nimbus.Client client, String name) throws Exception {
-        KillOptions opts = new KillOptions();
-        opts.set_wait_secs(0);
-        client.killTopologyWithOpts(name, opts);
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout("spout", new FastRandomSentenceSpout(), 4);
-
-        builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
-        builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word"));
-
-        Config conf = new Config();
-        conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
-
-        String name = "wc-test";
-        if (args != null && args.length > 0) {
-            name = args[0];
-        }
-
-//        conf.setNumWorkers(1);
-
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(name, conf, builder.createTopology());
-
-        Utils.sleep(Long.MAX_VALUE);
-//
-//        Map clusterConf = Utils.readStormConfig();
-//        clusterConf.putAll(Utils.readCommandLineOpts());
-//        Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-//
-//        //Sleep for 5 mins
-//        for (int i = 0; i < 10; i++) {
-//            Thread.sleep(30 * 1000);
-//            printMetrics(client, name);
-//        }
-//        kill(client, name);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
deleted file mode 100644
index a8fb6d0..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
-import org.codehaus.jackson.type.TypeReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * Since 5/4/16.
- */
-@SuppressWarnings({"serial"})
-public class MockMetadataChangeNotifyService extends AbstractMetadataChangeNotifyService implements Runnable {
-    private final static Logger LOG = LoggerFactory.getLogger(MockMetadataChangeNotifyService.class);
-    @SuppressWarnings("unused")
-    private static final String[] topics = new String[]{"testTopic3", "testTopic4", "testTopic5"};
-    @SuppressWarnings("unused")
-    private String topologyName;
-    @SuppressWarnings("unused")
-    private String spoutId;
-    private Map<String, StreamDefinition> sds;
-
-    public MockMetadataChangeNotifyService(String topologyName, String spoutId){
-        this.topologyName = topologyName;
-        this.spoutId = spoutId;
-    }
-
-    @Override
-    public void init(Config config, MetadataType type) {
-        super.init(config, type);
-        this.sds = defineStreamDefinitions();
-        new Thread(this).start();
-    }
-
-    @Override
-    public void run() {
-        switch (type) {
-            case SPOUT:
-                notifySpout(Arrays.asList("testTopic3", "testTopic4"), Arrays.asList("testTopic5"));
-                break;
-            case STREAM_ROUTER_BOLT:
-                populateRouterMetadata();
-                break;
-            case ALERT_BOLT:
-                populateAlertBoltSpec();
-                break;
-            case ALERT_PUBLISH_BOLT:
-                notifyAlertPublishBolt();
-                break;
-            default:
-                LOG.error("that is not possible man!");
-        }
-    }
-
-    private Map<String, StreamDefinition> defineStreamDefinitions(){
-        Map<String, StreamDefinition> sds = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamDefinitionsSpec.json"),
-                new TypeReference<Map<String, StreamDefinition>>() {});
-        return sds;
-    }
-
-    private void notifySpout(List<String> plainStringTopics, List<String> jsonStringTopics){
-        SpoutSpec newSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testSpoutSpec.json"), SpoutSpec.class);
-        notifySpout(newSpec, sds);
-    }
-
-    private void populateRouterMetadata(){
-        RouterSpec boltSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamRouterBoltSpec.json"), RouterSpec.class);
-        notifyStreamRouterBolt(boltSpec, sds);
-    }
-
-    private void populateAlertBoltSpec(){
-        AlertBoltSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testAlertBoltSpec.json"), AlertBoltSpec.class);
-        notifyAlertBolt(spec, sds);
-    }
-
-    private void notifyAlertPublishBolt(){
-        PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
-        notifyAlertPublishBolt(spec, sds);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
deleted file mode 100644
index fa2f0a6..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.topology;
-
-import java.io.FileWriter;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * Created on 3/12/16.
- */
-@SuppressWarnings({"serial", "unchecked", "rawtypes", "resource"})
-public class SendData2KafkaTest implements Serializable{
-    /**
-     *
-     * {"timestamp": 10000, "metric": "esErrorLogEvent", "instanceUuid": "vm-InstanceId1", "host":"test-host1", "type":"nova", "stack":"NullPointException-..........."}
-     {"timestamp": 10000, "metric": "instanceFailureLogEvent", "instanceUuid": "vm-InstanceId1", "message":"instance boot failure for user liasu!"}
-     *
-     */
-    @Test
-    @Ignore
-    public void testOutput() throws Exception {
-        String s1 = "{\"timestamp\": %d, \"metric\": \"esErrorLogEvent\", \"instanceUuid\": \"vm-InstanceId%d\", \"host\":\"test-host1\", \"type\":\"nova\", \"stack\":\"NullPointException-...........\"}";
-        String s2 = "{\"timestamp\": %d, \"metric\": \"instanceFailureLogEvent\", \"instanceUuid\": \"vm-InstanceId%d\", \"message\":\"instance boot failure for user liasu!\"}";
-
-        PrintWriter espw = new PrintWriter(new FileWriter("src/test/resources/es.log"));
-        PrintWriter ifpw = new PrintWriter(new FileWriter("src/test/resources/if.log"));
-
-        long base = System.currentTimeMillis();
-        long timestamp = 10000;
-
-        for (int i = 0; i < 10; i++) {
-
-            timestamp = base + i * 10000;
-            for (int j = 0; j < 10; j++) {
-                timestamp = timestamp + j * 1000;
-
-                espw.println(String.format(s1, timestamp, i));
-            }
-
-            ifpw.println(String.format(s2, timestamp, i));
-        }
-
-        espw.flush();
-        ifpw.flush();
-    }
-
-    @Test
-    @Ignore
-    public void sendKakfa() throws Exception {
-        List<String> eslogs = Files.readAllLines(Paths.get(SendData2KafkaTest.class.getResource("/es.log").toURI()), Charset.defaultCharset());
-        List<String> iflogs = Files.readAllLines(Paths.get(SendData2KafkaTest.class.getResource("/if.log").toURI()), Charset.defaultCharset());
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", "localhost:9092");
-        props.put("value.serializer", StringSerializer.class.getCanonicalName());
-        props.put("key.serializer", StringSerializer.class.getCanonicalName());
-
-        KafkaProducer producer = new KafkaProducer<>(props);
-        while (true) {
-            for (String s : eslogs) {
-                ProducerRecord<String, String> record = new ProducerRecord<>("nn_jmx_metric_sandbox", s);
-                producer.send(record);
-            }
-
-            for (String s : iflogs) {
-                ProducerRecord<String, String> record = new ProducerRecord<>("nn_jmx_metric_sandbox", s);
-                producer.send(record);
-            }
-
-            Thread.sleep(5000);
-        }
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
deleted file mode 100644
index 493d30e..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-
-/**
- * Created by yonzhang on 4/7/16.
- */
-@SuppressWarnings({"rawtypes", "serial"})
-public class TestBolt extends BaseRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class);
-    private OutputCollector collector;
-    private long count;
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        LOG.info("data is coming: " + input);
-        count++;
-        if(count % 10 == 0){
-            LOG.info("count = " + count);
-        }
-        collector.ack(input);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
deleted file mode 100644
index 666d167..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.topology;
-
-import java.nio.ByteBuffer;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Storm 1.0.0 uses ByteBuffer, we need test it
- */
-public class TestByteBuffer {
-    @Test
-    public void testBB(){
-        ByteBuffer bb = ByteBuffer.allocate(100);
-        bb.put((byte)12);
-        Assert.assertTrue(bb.hasArray());
-        bb.rewind();
-        Assert.assertEquals(12, bb.get());
-    }
-
-    @Test
-    public void testMultipleStrings() throws Exception{
-        ByteBuffer bb = ByteBuffer.allocate(100);
-        bb.put("abc".getBytes("UTF-8"));
-        bb.put("xyz".getBytes("UTF-8"));
-        bb.put("1234".getBytes("UTF-8"));
-        bb.rewind();
-        if (bb.hasArray()) {
-            int base = bb.arrayOffset();
-            String ret = new String(bb.array(), base + bb.position(), bb.remaining());
-            System.out.println("string is: " + ret);
-        }
-    }
-}


Mime
View raw message