eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [16/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:24 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
deleted file mode 100644
index 21872b9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.mock;
-
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
-
-@SuppressWarnings("serial")
-public class MockSampleMetadataFactory {
-    private static MockStreamMetadataService mockStreamMetadataServiceInstance = null;
-
-    public static MockStreamMetadataService createSingletonMetadataServiceWithSample() {
-        if (mockStreamMetadataServiceInstance != null) {
-            return mockStreamMetadataServiceInstance;
-        }
-        mockStreamMetadataServiceInstance = new MockStreamMetadataService();
-        mockStreamMetadataServiceInstance.registerStream("sampleStream", createSampleStreamDefinition("sampleStream"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_1", createSampleStreamDefinition("sampleStream_1"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_2", createSampleStreamDefinition("sampleStream_2"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_3", createSampleStreamDefinition("sampleStream_3"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_4", createSampleStreamDefinition("sampleStream_4"));
-        return mockStreamMetadataServiceInstance;
-    }
-
-    public static StreamDefinition createSampleStreamDefinition(String streamId) {
-        StreamDefinition sampleStreamDefinition = new StreamDefinition();
-        sampleStreamDefinition.setStreamId(streamId);
-        sampleStreamDefinition.setTimeseries(true);
-        sampleStreamDefinition.setValidate(true);
-        sampleStreamDefinition.setDescription("Schema for " + streamId);
-        List<StreamColumn> streamColumns = new ArrayList<>();
-
-        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
-        streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
-        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-        sampleStreamDefinition.setColumns(streamColumns);
-        return sampleStreamDefinition;
-    }
-
-    /**
-     * By default window period is: PT1m
-     *
-     * @param streamId
-     * @return
-     */
-    public static StreamSortSpec createSampleStreamSortSpec(String streamId) {
-        StreamSortSpec streamSortSpec = new StreamSortSpec();
-//        streamSortSpec.setColumn("timestamp");
-//        streamSortSpec.setStreamId(streamId);
-        streamSortSpec.setWindowMargin(1000);
-        streamSortSpec.setWindowPeriod("PT1m");
-        return streamSortSpec;
-    }
-
-    public static StreamSortSpec createSampleStreamSortSpec(String streamId, String period, int margin) {
-        StreamSortSpec streamSortSpec = new StreamSortSpec();
-//        streamSortSpec.setColumn("timestamp");
-//        streamSortSpec.setStreamId(streamId);
-        streamSortSpec.setWindowMargin(margin);
-        streamSortSpec.setWindowPeriod(period);
-        return streamSortSpec;
-    }
-
-    /**
-     * Policy: from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;
-     *
-     * @return PolicyDefinition[from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;]
-     */
-    public static PolicyDefinition createSingleMetricSamplePolicy() {
-        String definePolicy = "from sampleStream_1[name == \"cpu\" and value > 50.0] select name, host, flag, value insert into outputStream;";
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("SamplePolicyForTest");
-        policyDefinition.setInputStreams(Arrays.asList("sampleStream_1"));
-        policyDefinition.setOutputStreams(Arrays.asList("outputStream"));
-        policyDefinition.setDefinition(new PolicyDefinition.Definition(
-            PolicyStreamHandlers.SIDDHI_ENGINE,
-            definePolicy
-        ));
-        policyDefinition.setPartitionSpec(Arrays.asList(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name"))));
-        return policyDefinition;
-    }
-
-    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField) {
-        StreamPartition streamPartition = new StreamPartition();
-        streamPartition.setStreamId(streamId);
-        streamPartition.setColumns(new ArrayList<>(groupByField));
-        streamPartition.setType(StreamPartition.Type.GROUPBY);
-        StreamSortSpec streamSortSpec = new StreamSortSpec();
-        streamSortSpec.setWindowPeriod("PT30m");
-        streamSortSpec.setWindowMargin(10000);
-        streamPartition.setSortSpec(streamSortSpec);
-        return streamPartition;
-    }
-
-    public static StreamRouterSpec createSampleStreamRouteSpec(String streamId, String groupByField, List<String> targetEvaluatorIds) {
-        List<WorkSlot> slots = Arrays.asList(targetEvaluatorIds.stream().map((t) -> {
-            return new WorkSlot("sampleTopology", t);
-        }).toArray(WorkSlot[]::new));
-        StreamRouterSpec streamRouteSpec = new StreamRouterSpec();
-        streamRouteSpec.setStreamId(streamId);
-        streamRouteSpec.setPartition(createSampleStreamGroupbyPartition(streamId, Arrays.asList(groupByField)));
-        streamRouteSpec.setTargetQueue(Arrays.asList(new PolicyWorkerQueue(slots)));
-        return streamRouteSpec;
-    }
-
-    public static StreamRouterSpec createSampleStreamRouteSpec(List<String> targetEvaluatorIds) {
-        return createSampleStreamRouteSpec("sampleStream_1", "name", targetEvaluatorIds);
-    }
-
-    /**
-     * GROUPBY_sampleStream_1_ON_name
-     *
-     * @param targetEvaluatorIds
-     * @return
-     */
-    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_1_BY_name(List<String> targetEvaluatorIds) {
-        return createSampleStreamRouteSpec("sampleStream_1", "name", targetEvaluatorIds);
-    }
-
-    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_2_BY_name(List<String> targetEvaluatorIds) {
-        return createSampleStreamRouteSpec("sampleStream_2", "name", targetEvaluatorIds);
-    }
-
-    public static PartitionedEvent createSimpleStreamEvent() {
-        StreamEvent event = null;
-        try {
-            event = StreamEvent.builder()
-                .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition("sampleStream_1"))
-                .streamId("sampleStream_1")
-                .timestamep(System.currentTimeMillis())
-                .attributes(new HashMap<String, Object>() {{
-                    put("name", "cpu");
-                    put("value", 60.0);
-                    put("unknown", "unknown column value");
-                }}).build();
-        } catch (StreamNotDefinedException e) {
-            e.printStackTrace();
-        }
-        PartitionedEvent pEvent = new PartitionedEvent();
-        pEvent.setEvent(event);
-        return pEvent;
-    }
-
-    private final static String[] SAMPLE_STREAM_NAME_OPTIONS = new String[] {
-        "cpu", "memory", "disk", "network"
-    };
-
-    private final static String[] SAMPLE_STREAM_HOST_OPTIONS = new String[] {
-        "localhost_1", "localhost_2", "localhost_3", "localhost_4"
-    };
-
-    private final static Boolean[] SAMPLE_STREAM_FLAG_OPTIONS = new Boolean[] {
-        true, false
-    };
-
-    private final static Double[] SAMPLE_STREAM_VALUE_OPTIONS = new Double[] {
-        -0.20, 40.4, 50.5, 60.6, 10000.1
-    };
-    private final static String[] SAMPLE_STREAM_ID_OPTIONS = new String[] {
-        "sampleStream_1", "sampleStream_2", "sampleStream_3", "sampleStream_4",
-    };
-    private final static Random RANDOM = ThreadLocalRandom.current();
-
-    public static StreamEvent createRandomStreamEvent() {
-        return createRandomStreamEvent(SAMPLE_STREAM_ID_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_ID_OPTIONS.length)]);
-    }
-
-    public static StreamEvent createRandomStreamEvent(String streamId) {
-        return createRandomStreamEvent(streamId, System.currentTimeMillis());
-    }
-
-    private final static Long[] TIME_DELTA_OPTIONS = new Long[] {
-        -30000L, -10000L, -5000L, -1000L, 0L, 1000L, 5000L, 10000L, 30000L
-    };
-
-    public static StreamEvent createRandomOutOfTimeOrderStreamEvent(String streamId) {
-        StreamEvent event = createRandomStreamEvent(streamId);
-        event.setTimestamp(System.currentTimeMillis() + TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
-        return event;
-    }
-
-
-    public static PartitionedEvent createRandomOutOfTimeOrderEventGroupedByName(String streamId) {
-        StreamEvent event = createRandomStreamEvent(streamId);
-        event.setTimestamp(System.currentTimeMillis() + TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
-        return new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode());
-    }
-
-    public static PartitionedEvent createPartitionedEventGroupedByName(String streamId, long timestamp) {
-        StreamEvent event = createRandomStreamEvent(streamId);
-        event.setTimestamp(timestamp);
-        return new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode());
-    }
-
-    public static PartitionedEvent createRandomSortedEventGroupedByName(String streamId) {
-        StreamEvent event = createRandomStreamEvent(streamId);
-        event.setTimestamp(System.currentTimeMillis());
-        return new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode());
-    }
-
-    public static StreamEvent createRandomStreamEvent(String streamId, long timestamp) {
-        StreamEvent event;
-        try {
-            event = StreamEvent.builder()
-                .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition(streamId))
-                .streamId(streamId)
-                .timestamep(timestamp)
-                .attributes(new HashMap<String, Object>() {{
-                    put("name", SAMPLE_STREAM_NAME_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_NAME_OPTIONS.length)]);
-                    put("value", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-                    put("host", SAMPLE_STREAM_HOST_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_HOST_OPTIONS.length)]);
-                    put("flag", SAMPLE_STREAM_FLAG_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_FLAG_OPTIONS.length)]);
-//                        put("value1", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-//                        put("value2", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-//                        put("value3", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-//                        put("value4", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-//                        put("value5", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-                    put("unknown", "unknown column value");
-                }}).build();
-        } catch (StreamNotDefinedException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-        return event;
-    }
-
-    public static PartitionedEvent createRandomPartitionedEvent(String streamId, long timestamp) {
-        StreamEvent event = createRandomStreamEvent(streamId, timestamp);
-        PartitionedEvent partitionedEvent = new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode());
-        return partitionedEvent;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
deleted file mode 100755
index b865422..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.mock;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedList;
-import java.util.List;
-
-public class MockStreamCollector implements Collector<AlertStreamEvent> {
-    @SuppressWarnings("unused")
-    private final static Logger LOG = LoggerFactory.getLogger(MockStreamCollector.class);
-    private List<AlertStreamEvent> cache;
-
-    public MockStreamCollector() {
-        cache = new LinkedList<>();
-    }
-
-    public void emit(AlertStreamEvent event) {
-        cache.add(event);
-        // LOG.info("PartitionedEventCollector received: {}",event);
-    }
-
-    public void clear() {
-        cache.clear();
-    }
-
-    public List<AlertStreamEvent> get() {
-        return cache;
-    }
-
-    public int size() {
-        return cache.size();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
deleted file mode 100644
index 73c39c4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.mock;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class MockStreamMetadataService {
-    private final Map<String, StreamDefinition> streamSchemaMap = new HashMap<>();
-
-    public StreamDefinition getStreamDefinition(String streamId) throws StreamNotDefinedException {
-        if (streamSchemaMap.containsKey(streamId)) {
-            return streamSchemaMap.get(streamId);
-        } else {
-            throw new StreamNotDefinedException(streamId);
-        }
-    }
-
-    public void registerStream(String streamId, StreamDefinition schema) {
-        streamSchemaMap.put(streamId, schema);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
deleted file mode 100644
index 9ab4c24..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package org.apache.eagle.alert.engine.mock;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@SuppressWarnings("serial")
-public class MockStreamReceiver extends BaseRichSpout {
-    private final static Logger LOG = LoggerFactory.getLogger(MockStreamReceiver.class);
-    private SpoutOutputCollector collector;
-    private List<String> outputStreamIds;
-
-    public MockStreamReceiver(int partition) {
-        outputStreamIds = new ArrayList<>(partition);
-        for (int i = 0; i < partition; i++) {
-            outputStreamIds.add(StreamIdConversion.generateStreamIdByPartition(i));
-        }
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void close() {
-    }
-
-    /**
-     * This unit test is not to mock the end2end logic of correlation spout,
-     * but simply generate some sample data for following bolts testing
-     */
-    @Override
-    public void nextTuple() {
-        PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
-        LOG.info("Receive {}", event);
-        collector.emit(outputStreamIds.get(
-            // group by the first field in event i.e. name
-            (int) (event.getPartitionKey() % outputStreamIds.size())),
-            Collections.singletonList(event));
-        Utils.sleep(500);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for (String streamId : outputStreamIds) {
-            declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0));
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
deleted file mode 100644
index 0446b5e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.nodata;
-
-import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeBatchWindow;
-import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.*;
-
-public class TestDistinctValuesInTimeBatchWindow {
-
-    private static final String inputStream = "testInputStream";
-
-    private NoDataPolicyTimeBatchHandler handler;
-
-    @Before
-    public void setup() {
-        handler = mock(NoDataPolicyTimeBatchHandler.class);
-    }
-
-    @After
-    public void teardown() {
-    }
-
-    @Test @Ignore
-    public void testNormal() throws Exception {
-        // wisb is null since it is dynamic mode
-        DistinctValuesInTimeBatchWindow window = new DistinctValuesInTimeBatchWindow(handler, 5 * 1000, null);
-
-        long now = System.currentTimeMillis();
-
-        // handler.compareAndEmit(anyObject(), anyObject(), anyObject());
-
-        // event time
-        sendEventToWindow(window, now, "host1", 95.5);
-
-        Thread.sleep(6000);
-
-        sendEventToWindow(window, now, "host1", 91.0);
-        sendEventToWindow(window, now, "host2", 95.5);
-        sendEventToWindow(window, now, "host2", 97.1);
-
-        Thread.sleep(3000);
-
-        sendEventToWindow(window, now, "host1", 90.7);
-
-        Thread.sleep(4000);
-
-        sendEventToWindow(window, now, "host1", 90.7);
-
-        Thread.sleep(3000);
-
-        verify(handler, times(3)).compareAndEmit(anyObject(), anyObject(), anyObject());
-    }
-
-    private void sendEventToWindow(DistinctValuesInTimeBatchWindow window, long ts, String host, double value) {
-        window.send(buildStreamEvent(ts, host, value), host, ts);
-    }
-
-    private StreamEvent buildStreamEvent(long ts, String host, double value) {
-        StreamEvent e = new StreamEvent();
-        e.setData(new Object[] {ts, host, value});
-        e.setStreamId(inputStream);
-        e.setTimestamp(ts);
-        return e;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
deleted file mode 100644
index d23abcb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.nodata;
-
-import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeWindow;
-import org.junit.Test;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-/**
- * Since 6/28/16.
- */
-public class TestDistinctValuesInTimeWindow {
-    @Test
-    public void test() {
-        DistinctValuesInTimeWindow window = new DistinctValuesInTimeWindow(60 * 1000);
-        window.send("1", 0);
-        window.send("2", 1000);
-        window.send("3", 1000);
-        window.send("1", 30000);
-        window.send("2", 50000);
-        window.send("1", 62000);
-        Map<Object, Long> values = window.distinctValues();
-        System.out.println(values);
-    }
-
-    @Test
-    public void testSort() {
-        SortedMap<DistinctValuesInTimeWindow.ValueAndTime, DistinctValuesInTimeWindow.ValueAndTime> timeSortedMap =
-            new TreeMap<>(new DistinctValuesInTimeWindow.ValueAndTimeComparator());
-        DistinctValuesInTimeWindow.ValueAndTime vt1 = new DistinctValuesInTimeWindow.ValueAndTime("1", 0);
-        timeSortedMap.put(vt1, vt1);
-        DistinctValuesInTimeWindow.ValueAndTime vt2 = new DistinctValuesInTimeWindow.ValueAndTime("2", 1000);
-        timeSortedMap.put(vt2, vt2);
-        DistinctValuesInTimeWindow.ValueAndTime vt3 = new DistinctValuesInTimeWindow.ValueAndTime("3", 1000);
-        timeSortedMap.put(vt3, vt3);
-        timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("1", 0));
-        DistinctValuesInTimeWindow.ValueAndTime vt4 = new DistinctValuesInTimeWindow.ValueAndTime("1", 30000);
-        timeSortedMap.put(vt4, vt4);
-        Iterator<?> it = timeSortedMap.entrySet().iterator();
-        while (it.hasNext()) {
-            System.out.println(it.next());
-        }
-        timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("2", 1000));
-        DistinctValuesInTimeWindow.ValueAndTime vt5 = new DistinctValuesInTimeWindow.ValueAndTime("2", 50000);
-        timeSortedMap.put(vt5, vt5);
-        DistinctValuesInTimeWindow.ValueAndTime vt6 = new DistinctValuesInTimeWindow.ValueAndTime("1", 62000);
-        timeSortedMap.put(vt6, vt6);
-        it = timeSortedMap.entrySet().iterator();
-        while (it.hasNext()) {
-            System.out.println(it.next());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
deleted file mode 100644
index 79b939c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.nodata;
-
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-/**
- * Since 6/27/16.
- */
-public class TestEventTable {
-    @Test
-    public void test() throws Exception {
-        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
-            "define stream expectStream (key string, src string);" +
-                "define stream appearStream (key string, src string);" +
-                "define table expectTable (key string, src string);" +
-                "from expectStream insert into expectTable;" +
-                "from appearStream[(expectTable.key==key) in expectTable] insert into outputStream;"
-        );
-
-        runtime.addCallback("outputStream", new StreamCallback() {
-            @Override
-            public void receive(Event[] events) {
-                EventPrinter.print(events);
-            }
-        });
-
-        runtime.start();
-        runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[] {"host1", "expectStream"});
-        Thread.sleep(2000);
-        runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[] {"host2", "expectStream"});
-        Thread.sleep(2000);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
deleted file mode 100644
index fe70630..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.nodata;
-
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-/**
- * Since 6/27/16.
- */
-public class TestNoDataAlert {
-    @Test
-    public void test() throws Exception {
-        String[] expectHosts = new String[] {"host_1", "host_2", "host_3", "host_4", "host_5", "host_6", "host_7", "host_8"};
-//        String[] appearHosts = new String[]{"host_6","host_7","host_8"};
-//        String[] noDataHosts = new String[]{"host_1","host_2","host_3","host_4","host_5"};
-
-        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
-            "define stream appearStream (key string, src string);" +
-                "define stream expectStream (key string, src string);" +
-                "define table expectTable (key string, src string);" +
-                "define trigger fiveSecTriggerStream at every 1 sec;" +
-                "define trigger initAppearTriggerStream at 'start';" +
-                "from expectStream insert into expectTable;" +
-                "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;" +
-                "from initAppearTriggerStream join expectTable insert into initAppearStream;"
-//                        "from triggerExpectStream as l left outer join appearStream#window.time(5 sec) as r on l.key == r.key select l.key as k1,r.key as k2 insert current events into joinStream;" +
-//                        "from joinStream[k2 is null] select k1 insert current events into missingStream;"
-        );
-
-//        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
-//                "define stream appearStream (key string, src string);"+
-//                        "define stream expectStream (key string, src string);"+
-//                        "define table expectTable (key string, src string);"+
-//                        "from expectStream insert into expectTable;"+
-//                        "from appearStream#window.time(10 sec)  as l right outer join expectTable as r on l.key == r.key select r.key as k2, l.key as k1 insert current events into joinStream;" +
-//                        "from joinStream[k1 is null] select k2 insert current events into missingStream;"
-////                "from joinStream insert into missingStream;"
-//
-//        );
-
-        runtime.addCallback("initAppearStream", new StreamCallback() {
-            @Override
-            public void receive(Event[] events) {
-                EventPrinter.print(events);
-            }
-        });
-
-        runtime.start();
-        for (String host : expectHosts) {
-            runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[] {host, "expectStream"});
-        }
-
-//        for(String host:appearHosts) {
-//            runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{host,"inStream"});
-//        }
-
-        Thread.sleep(5000);
-
-//        for(String host:appearHosts) {
-//            runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{host,"inStream"});
-//        }
-//        Thread.sleep(10000);
-    }
-
-    /**
-     * only alert when the successive 2 events has number of missing blocks changed
-     * from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp;
-     */
-    @Test
-    public void testMissingBlock() throws Exception {
-        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
-            "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);" +
-                "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> " +
-                "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and " +
-                "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, " +
-                "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " +
-                "b.site as site insert into outputStream;"
-        );
-
-        runtime.addCallback("outputStream", new StreamCallback() {
-            @Override
-            public void receive(Event[] events) {
-                EventPrinter.print(events);
-            }
-        });
-
-        runtime.start();
-        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L});
-        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L});
-        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L});
-
-
-        Thread.sleep(5000);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
deleted file mode 100644
index 5564b90..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.nodata;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Since 6/29/16.
- */
-public class TestNoDataPolicyHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyHandler.class);
-    private static final String inputStream = "testInputStream";
-    private static final String outputStream = "testOutputStream";
-
-    @Test
-    public void test() throws Exception {
-        test(buildPolicyDef_provided());
-        test(buildPolicyDef_dynamic());
-    }
-
-    @SuppressWarnings("unchecked")
-    public void test(PolicyDefinition pd) throws Exception {
-        Map<String, StreamDefinition> sds = new HashMap<>();
-        StreamDefinition sd = buildStreamDef();
-        sds.put("testInputStream", sd);
-        NoDataPolicyHandler handler = new NoDataPolicyHandler(sds);
-
-        PolicyHandlerContext context = new PolicyHandlerContext();
-        context.setPolicyDefinition(pd);
-        handler.prepare(new TestCollector(), context);
-
-        handler.send(buildStreamEvt(0, "host1", 12.5));
-        handler.send(buildStreamEvt(0, "host2", 12.6));
-        handler.send(buildStreamEvt(100, "host1", 20.9));
-        handler.send(buildStreamEvt(120, "host2", 22.1));
-        handler.send(buildStreamEvt(4000, "host2", 22.1));
-        handler.send(buildStreamEvt(50000, "host2", 22.1));
-        handler.send(buildStreamEvt(60150, "host2", 22.3));
-        handler.send(buildStreamEvt(60450, "host2", 22.9));
-        handler.send(buildStreamEvt(75000, "host1", 41.6));
-        handler.send(buildStreamEvt(85000, "host2", 45.6));
-    }
-
-    @SuppressWarnings("rawtypes")
-    private static class TestCollector implements Collector {
-        @Override
-        public void emit(Object o) {
-            AlertStreamEvent e = (AlertStreamEvent) o;
-            Object[] data = e.getData();
-            Assert.assertEquals("host2", data[1]);
-            LOG.info(e.toString());
-        }
-    }
-
-    private PolicyDefinition buildPolicyDef_provided() {
-        PolicyDefinition pd = new PolicyDefinition();
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        def.setValue("PT1M,provided,1,host,host1,host2");
-        def.setType("nodataalert");
-        pd.setDefinition(def);
-        pd.setInputStreams(Arrays.asList(inputStream));
-        pd.setOutputStreams(Arrays.asList(outputStream));
-        pd.setName("nodataalert-test");
-        return pd;
-    }
-
-    private PolicyDefinition buildPolicyDef_dynamic() {
-        PolicyDefinition pd = new PolicyDefinition();
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        def.setValue("PT1M,dynamic,1,host");
-        def.setType("nodataalert");
-        pd.setDefinition(def);
-        pd.setInputStreams(Arrays.asList(inputStream));
-        pd.setOutputStreams(Arrays.asList(outputStream));
-        pd.setName("nodataalert-test");
-        return pd;
-    }
-
-    private StreamDefinition buildStreamDef() {
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn tsColumn = new StreamColumn();
-        tsColumn.setName("timestamp");
-        tsColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("host");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn valueColumn = new StreamColumn();
-        valueColumn.setName("value");
-        valueColumn.setType(StreamColumn.Type.DOUBLE);
-
-        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
-        sd.setDataSource("testDataSource");
-        sd.setStreamId("testStreamId");
-        return sd;
-    }
-
-    private StreamEvent buildStreamEvt(long ts, String host, double value) {
-        StreamEvent e = new StreamEvent();
-        e.setData(new Object[] {ts, host, value});
-        e.setStreamId(inputStream);
-        e.setTimestamp(ts);
-        return e;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
deleted file mode 100644
index 334db29..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.nodata;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestNoDataPolicyTimeBatchHandler {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class);
-
-    private static final String inputStream = "testInputStream";
-    private static final String outputStream = "testOutputStream";
-
-    @Before
-    public void setup() {
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testDynamic1() throws Exception {
-        Map<String, StreamDefinition> sds = new HashMap<>();
-        sds.put("testInputStream", buildStreamDef());
-        sds.put("testOutputStream", buildOutputStreamDef());
-        NoDataPolicyTimeBatchHandler handler = new NoDataPolicyTimeBatchHandler(sds);
-
-        PolicyHandlerContext context = new PolicyHandlerContext();
-        context.setPolicyDefinition(buildPolicyDef_dynamic());
-        handler.prepare(new TestCollector(), context);
-
-        long now = System.currentTimeMillis();
-
-        handler.send(buildStreamEvt(now, "host1", 12.5));
-
-        Thread.sleep(2000);
-
-        handler.send(buildStreamEvt(now, "host2", 12.6));
-        handler.send(buildStreamEvt(now, "host1", 20.9));
-        handler.send(buildStreamEvt(now, "host2", 22.1));
-        handler.send(buildStreamEvt(now, "host2", 22.1));
-
-        Thread.sleep(5000);
-
-        handler.send(buildStreamEvt(now, "host2", 22.1));
-        handler.send(buildStreamEvt(now, "host2", 22.3));
-
-        Thread.sleep(5000);
-
-        handler.send(buildStreamEvt(now, "host2", 22.9));
-        handler.send(buildStreamEvt(now, "host1", 41.6));
-        handler.send(buildStreamEvt(now, "host2", 45.6));
-
-        Thread.sleep(1000);
-    }
-
-    @SuppressWarnings("rawtypes")
-    private static class TestCollector implements Collector {
-        @Override
-        public void emit(Object o) {
-            AlertStreamEvent e = (AlertStreamEvent) o;
-            Object[] data = e.getData();
-
-            LOG.info("alert data: {}, {}", data[1], data[0]);
-        }
-    }
-
-    private PolicyDefinition buildPolicyDef_dynamic() {
-        PolicyDefinition pd = new PolicyDefinition();
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        def.setValue("PT5S,dynamic");
-        def.setType("nodataalert");
-        Map<String, Object> properties = new HashMap<String, Object>();
-        properties.put("nodataColumnName", "host");
-        def.setProperties(properties);
-        pd.setDefinition(def);
-        pd.setInputStreams(Arrays.asList(inputStream));
-        pd.setOutputStreams(Arrays.asList(outputStream));
-        pd.setName("nodataalert-test");
-        return pd;
-    }
-
-    private StreamDefinition buildStreamDef() {
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn tsColumn = new StreamColumn();
-        tsColumn.setName("timestamp");
-        tsColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("host");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn valueColumn = new StreamColumn();
-        valueColumn.setName("value");
-        valueColumn.setType(StreamColumn.Type.DOUBLE);
-
-        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
-        sd.setDataSource("testDataSource");
-        sd.setStreamId("testInputStream");
-        return sd;
-    }
-
-    private StreamDefinition buildOutputStreamDef() {
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn tsColumn = new StreamColumn();
-        tsColumn.setName("timestamp");
-        tsColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("host");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn valueColumn = new StreamColumn();
-        valueColumn.setName("originalStreamName");
-        valueColumn.setType(StreamColumn.Type.STRING);
-
-        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
-        sd.setDataSource("testDataSource");
-        sd.setStreamId("testOutputStream");
-        return sd;
-    }
-
-    private StreamEvent buildStreamEvt(long ts, String host, double value) {
-        StreamEvent e = new StreamEvent();
-        e.setData(new Object[] {ts, host, value});
-        e.setStreamId(inputStream);
-        e.setTimestamp(ts);
-        return e;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
deleted file mode 100644
index 82d8c99..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.perf;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Since 5/13/16.
- */
-public class TestSerDeserPer {
-    @Rule
-    public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-    Object[] data = null;
-
-    @Before
-    public void before() {
-        int max = 100;
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < max; i++) {
-            sb.append("a");
-        }
-        data = new Object[] {sb.toString()};
-    }
-
-    @Test
-    public void testSerDeserPerf() throws Exception {
-        Kryo kryo = new Kryo();
-        String outputPath = temporaryFolder.newFile().toString();
-        Output output = new Output(new FileOutputStream(outputPath));
-        for (int i = 0; i < 1000; i++) {
-            kryo.writeObject(output, constructPE());
-        }
-        output.close();
-        Input input = new Input(new FileInputStream(outputPath));
-        PartitionedEvent someObject = kryo.readObject(input, PartitionedEvent.class);
-        input.close();
-        Assert.assertTrue(someObject.getData().length == 1);
-    }
-
-    private PartitionedEvent constructPE() {
-        StreamEvent e = new StreamEvent();
-        e.setStreamId("testStreamId");
-        e.setTimestamp(1463159382000L);
-        e.setData(data);
-        StreamPartition sp = new StreamPartition();
-        List<String> col = new ArrayList<>();
-        col.add("host");
-        sp.setColumns(col);
-        StreamSortSpec sortSpec = new StreamSortSpec();
-        sortSpec.setWindowMargin(30000);
-        sortSpec.setWindowPeriod("PT1M");
-        sp.setSortSpec(sortSpec);
-        sp.setStreamId("testStreamId");
-        sp.setType(StreamPartition.Type.GROUPBY);
-        PartitionedEvent pe = new PartitionedEvent();
-        pe.setEvent(e);
-        pe.setPartition(sp);
-        pe.setPartitionKey(1000);
-        return pe;
-    }
-
-    @Test
-    public void testSerDeserPerf2() throws Exception {
-        Kryo kryo = new Kryo();
-        String outputPath = temporaryFolder.newFile().toString();
-        Output output = new Output(new FileOutputStream(outputPath));
-        for (int i = 0; i < 1000; i++) {
-            kryo.writeObject(output, constructNewPE());
-        }
-        output.close();
-        Input input = new Input(new FileInputStream(outputPath));
-        NewPartitionedEvent someObject = kryo.readObject(input, NewPartitionedEvent.class);
-        input.close();
-        Assert.assertTrue(someObject.getData().length == 1);
-    }
-
-    private NewPartitionedEvent constructNewPE() {
-        NewPartitionedEvent pe = new NewPartitionedEvent();
-        pe.setStreamId("testStreamId");
-        pe.setTimestamp(1463159382000L);
-        pe.setData(data);
-
-        pe.setType(StreamPartition.Type.GROUPBY);
-        List<String> col = new ArrayList<>();
-        col.add("host");
-        pe.setColumns(col);
-        pe.setPartitionKey(1000);
-
-        pe.setWindowMargin(30000);
-        pe.setWindowPeriod("PT1M");
-        return pe;
-    }
-
-    @Test
-    public void testSerDeserPerf3() throws Exception {
-        Kryo kryo = new Kryo();
-        String outputPath = temporaryFolder.newFile().toString();
-        Output output = new Output(new FileOutputStream(outputPath));
-        for (int i = 0; i < 1000; i++) {
-            kryo.writeObject(output, constructNewPE2());
-        }
-        output.close();
-        Input input = new Input(new FileInputStream(outputPath));
-        NewPartitionedEvent2 someObject = kryo.readObject(input, NewPartitionedEvent2.class);
-        input.close();
-        Assert.assertTrue(someObject.getData().length == 1);
-    }
-
-    private NewPartitionedEvent2 constructNewPE2() {
-        NewPartitionedEvent2 pe = new NewPartitionedEvent2();
-        pe.setStreamId(100);
-        pe.setTimestamp(1463159382000L);
-        pe.setData(data);
-
-        pe.setType(1);
-        int[] col = new int[1];
-        col[0] = 1;
-        pe.setColumns(col);
-        pe.setPartitionKey(1000);
-
-        pe.setWindowMargin(30000);
-        pe.setWindowPeriod(60);
-        return pe;
-    }
-
-    public static class NewPartitionedEvent implements Serializable {
-        private static final long serialVersionUID = -3840016190614238593L;
-        // basic
-        private String streamId;
-        private long timestamp;
-        private Object[] data;
-
-        // stream partition
-        private StreamPartition.Type type;
-        private List<String> columns = new ArrayList<>();
-        private long partitionKey;
-
-        // sort spec
-        private String windowPeriod = "";
-        private long windowMargin = 30 * 1000;
-
-        public NewPartitionedEvent() {
-        }
-
-        public String getStreamId() {
-            return streamId;
-        }
-
-        public void setStreamId(String streamId) {
-            this.streamId = streamId;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-
-        public void setTimestamp(long timestamp) {
-            this.timestamp = timestamp;
-        }
-
-        public Object[] getData() {
-            return data;
-        }
-
-        public void setData(Object[] data) {
-            this.data = data;
-        }
-
-        public StreamPartition.Type getType() {
-            return type;
-        }
-
-        public void setType(StreamPartition.Type type) {
-            this.type = type;
-        }
-
-        public List<String> getColumns() {
-            return columns;
-        }
-
-        public void setColumns(List<String> columns) {
-            this.columns = columns;
-        }
-
-        public long getPartitionKey() {
-            return partitionKey;
-        }
-
-        public void setPartitionKey(long partitionKey) {
-            this.partitionKey = partitionKey;
-        }
-
-        public String getWindowPeriod() {
-            return windowPeriod;
-        }
-
-        public void setWindowPeriod(String windowPeriod) {
-            this.windowPeriod = windowPeriod;
-        }
-
-        public long getWindowMargin() {
-            return windowMargin;
-        }
-
-        public void setWindowMargin(long windowMargin) {
-            this.windowMargin = windowMargin;
-        }
-    }
-
-    public static class NewPartitionedEvent2 implements Serializable {
-        private static final long serialVersionUID = -3840016190614238593L;
-        // basic
-        private int streamId;
-        private long timestamp;
-        private Object[] data;
-
-        // stream partition
-        private int type;
-        private int[] columns;
-        private long partitionKey;
-
-        // sort spec
-        private long windowPeriod;
-        private long windowMargin = 30 * 1000;
-
-        public NewPartitionedEvent2() {
-        }
-
-        public int getStreamId() {
-            return streamId;
-        }
-
-        public void setStreamId(int streamId) {
-            this.streamId = streamId;
-        }
-
-        public int getType() {
-            return type;
-        }
-
-        public void setType(int type) {
-            this.type = type;
-        }
-
-        public int[] getColumns() {
-            return columns;
-        }
-
-        public void setColumns(int[] columns) {
-            this.columns = columns;
-        }
-
-        public long getPartitionKey() {
-            return partitionKey;
-        }
-
-        public void setPartitionKey(long partitionKey) {
-            this.partitionKey = partitionKey;
-        }
-
-        public long getWindowPeriod() {
-            return windowPeriod;
-        }
-
-        public void setWindowPeriod(long windowPeriod) {
-            this.windowPeriod = windowPeriod;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-
-        public void setTimestamp(long timestamp) {
-            this.timestamp = timestamp;
-        }
-
-        public Object[] getData() {
-            return data;
-        }
-
-        public void setData(Object[] data) {
-            this.data = data;
-        }
-
-        public long getWindowMargin() {
-            return windowMargin;
-        }
-
-        public void setWindowMargin(long windowMargin) {
-            this.windowMargin = windowMargin;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
deleted file mode 100644
index 50fb07d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher;
-
-import com.dumbster.smtp.SimpleSmtpServer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.publisher.email.AlertEmailConstants;
-import org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-public class AlertEmailPublisherTest {
-    private static final String EMAIL_PUBLISHER_TEST_POLICY = "Test Policy Alert";
-    private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisherTest.class);
-    private static final int SMTP_PORT = 5025;
-    private Config config;
-    private SimpleSmtpServer server;
-
-    @Before
-    public void setUp() {
-        config = ConfigFactory.load("application-test.conf");
-        server = SimpleSmtpServer.start(SMTP_PORT);
-    }
-
-    @After
-    public void clear() {
-        if (server != null) {
-            server.stop();
-        }
-    }
-
-    @Test
-    public void testAlertEmailPublisher() throws Exception {
-        AlertEmailPublisher publisher = new AlertEmailPublisher();
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(PublishConstants.SUBJECT, EMAIL_PUBLISHER_TEST_POLICY);
-        properties.put(PublishConstants.SENDER, "eagle@localhost");
-        properties.put(PublishConstants.RECIPIENTS, "somebody@localhost");
-        Publishment publishment = new Publishment();
-        publishment.setName("testEmailPublishment");
-        publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName());
-        publishment.setPolicyIds(Collections.singletonList(EMAIL_PUBLISHER_TEST_POLICY));
-        publishment.setDedupIntervalMin("PT0M");
-        publishment.setSerializer(org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer.class.getName());
-        publishment.setProperties(properties);
-        Map<String, String> conf = new HashMap<>();
-        publisher.init(config, publishment, conf);
-        publisher.onAlert(AlertPublisherTestHelper.mockEvent(EMAIL_PUBLISHER_TEST_POLICY));
-        Assert.assertEquals(1, server.getReceivedEmailSize());
-        Assert.assertTrue(server.getReceivedEmail().hasNext());
-        LOG.info("EMAIL:\n {}", server.getReceivedEmail().next());
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java
deleted file mode 100644
index 33cb103..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *  <p/>
- *  http://www.apache.org/licenses/LICENSE-2.0
- *  <p/>
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.AlertFilePublisher;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public class AlertFilePublisherTest {
-    private static final String TEST_POLICY_ID = "testPolicy";
-
-    @Test
-    public void testAlertFilePublisher() throws Exception {
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(PublishConstants.ROTATE_EVERY_KB, 1);
-        properties.put(PublishConstants.NUMBER_OF_FILES, 1);
-
-        String property = "java.io.tmpdir";
-        String tempDir = System.getProperty(property);
-        System.out.println("OS current temporary directory is " + tempDir);
-
-        //properties.put(PublishConstants.FILE_NAME, tempDir+"eagle-alert.log");
-
-        Publishment publishment = new Publishment();
-        publishment.setName("testFilePublishment");
-        publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName());
-        publishment.setPolicyIds(Arrays.asList(TEST_POLICY_ID));
-        publishment.setDedupIntervalMin("PT0M");
-        publishment.setSerializer(org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer.class.getName());
-        publishment.setProperties(properties);
-
-        AlertStreamEvent event = AlertPublisherTestHelper.mockEvent(TEST_POLICY_ID);
-
-        AlertFilePublisher publisher = new AlertFilePublisher();
-        publisher.init(null, publishment, null);
-
-        publisher.onAlert(event);
-        publisher.close();
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
deleted file mode 100644
index ddf2001..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher;
-
-import java.util.*;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
-import org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher;
-import org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer;
-import org.apache.eagle.alert.utils.KafkaEmbedded;
-import org.junit.*;
-
-import com.google.common.collect.ImmutableMap;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-
-public class AlertKafkaPublisherTest {
-
-    private static final String TEST_TOPIC_NAME = "test";
-    private static final String TEST_POLICY_ID = "testPolicy";
-    private static final int TEST_KAFKA_BROKER_PORT = 59092;
-    private static final int TEST_KAFKA_ZOOKEEPER_PORT = 52181;
-    private static KafkaEmbedded kafka;
-    private static Config config;
-
-    private static List<String> outputMessages = new ArrayList<String>();
-
-    @BeforeClass
-    public static void setup() {
-        kafka = new KafkaEmbedded(TEST_KAFKA_BROKER_PORT, TEST_KAFKA_ZOOKEEPER_PORT);
-        System.setProperty("config.resource", "/simple/application-integration.conf");
-        config = ConfigFactory.load();
-        consumeWithOutput(outputMessages);
-    }
-
-    @AfterClass
-    public static void end() {
-        if (kafka != null) {
-            kafka.shutdown();
-        }
-    }
-
-    @Test @Ignore
-    public void testAsync() throws Exception {
-        AlertKafkaPublisher publisher = new AlertKafkaPublisher();
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(PublishConstants.BROKER_LIST, "localhost:" + TEST_KAFKA_BROKER_PORT);
-        properties.put(PublishConstants.TOPIC, TEST_TOPIC_NAME);
-        
-        List<Map<String, Object>> kafkaClientConfig = new ArrayList<Map<String, Object>>();
-        kafkaClientConfig.add(ImmutableMap.of("name", "producer.type", "value", "async"));
-        kafkaClientConfig.add(ImmutableMap.of("name", "batch.num.messages", "value", 3000));
-        kafkaClientConfig.add(ImmutableMap.of("name", "queue.buffering.max.ms", "value", 5000));
-        kafkaClientConfig.add(ImmutableMap.of("name", "queue.buffering.max.messages", "value", 10000));
-        properties.put("kafka_client_config", kafkaClientConfig);
-
-        Publishment publishment = new Publishment();
-        publishment.setName("testAsyncPublishment");
-        publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName());
-        publishment.setPolicyIds(Arrays.asList(TEST_POLICY_ID));
-        publishment.setDedupIntervalMin("PT0M");
-        publishment.setSerializer(org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer.class.getName());
-        publishment.setProperties(properties);
-        
-        Map<String, String> conf = new HashMap<String, String>();
-        publisher.init(config, publishment, conf);
-
-        AlertStreamEvent event = AlertPublisherTestHelper.mockEvent(TEST_POLICY_ID);
-
-        outputMessages.clear();
-
-        publisher.onAlert(event);
-        Thread.sleep(3000);
-        Assert.assertEquals(1, outputMessages.size());
-        publisher.close();
-    }
-
-    @Test @Ignore
-    public void testSync() throws Exception {
-        AlertKafkaPublisher publisher = new AlertKafkaPublisher();
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(PublishConstants.BROKER_LIST, "localhost:" + TEST_KAFKA_BROKER_PORT);
-        properties.put(PublishConstants.TOPIC, TEST_TOPIC_NAME);
-        List<Map<String, Object>> kafkaClientConfig = new ArrayList<Map<String, Object>>();
-        kafkaClientConfig.add(ImmutableMap.of("name", "producer.type", "value", "sync"));
-        properties.put("kafka_client_config", kafkaClientConfig);
-        Publishment publishment = new Publishment();
-        publishment.setName("testAsyncPublishment");
-        publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName());
-        publishment.setPolicyIds(Collections.singletonList(TEST_POLICY_ID));
-        publishment.setDedupIntervalMin("PT0M");
-        publishment.setSerializer(JsonEventSerializer.class.getName());
-        publishment.setProperties(properties);
-        Map<String, String> conf = new HashMap<>();
-        publisher.init(config, publishment, conf);
-        AlertStreamEvent event = AlertPublisherTestHelper.mockEvent(TEST_POLICY_ID);
-        outputMessages.clear();
-        publisher.onAlert(event);
-        Thread.sleep(3000);
-        Assert.assertEquals(1, outputMessages.size());
-        publisher.close();
-    }
-
-    private static void consumeWithOutput(final List<String> outputMessages) {
-        Thread t = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                Properties props = new Properties();
-                props.put("group.id", "B");
-                props.put("zookeeper.connect", "127.0.0.1:" + + TEST_KAFKA_ZOOKEEPER_PORT);
-                props.put("zookeeper.session.timeout.ms", "4000");
-                props.put("zookeeper.sync.time.ms", "2000");
-                props.put("auto.commit.interval.ms", "1000");
-                props.put("auto.offset.reset", "smallest");
-
-                ConsumerConnector jcc = null;
-                try {
-                    ConsumerConfig ccfg = new ConsumerConfig(props);
-                    jcc = Consumer.createJavaConsumerConnector(ccfg);
-                    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-                    topicCountMap.put(TEST_TOPIC_NAME, 1);
-                    Map<String, List<KafkaStream<byte[], byte[]>>> topicMap = jcc.createMessageStreams(topicCountMap);
-                    KafkaStream<byte[], byte[]> cstrm = topicMap.get(TEST_TOPIC_NAME).get(0);
-                    for (MessageAndMetadata<byte[], byte[]> mm : cstrm) {
-                        String message = new String(mm.message());
-                        outputMessages.add(message);
-
-                        try {
-                            Thread.sleep(5000);
-                        } catch (InterruptedException e) {
-                        }
-                    }
-                } finally {
-                    if (jcc != null) {
-                        jcc.shutdown();
-                    }
-                }
-            }
-        });
-        t.start();
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java
deleted file mode 100644
index bd06c9b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher;
-
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
-import org.junit.Assert;
-
-import java.util.Arrays;
-import java.util.HashMap;
-
-public class AlertPublisherTestHelper {
-
-    public static AlertStreamEvent mockEvent(String policyId){
-        StreamDefinition stream = createStream();
-        PolicyDefinition policy = createPolicyGroupByStreamId(stream.getStreamId(), policyId);
-        return  createEvent(stream, policy,
-            new Object[] {System.currentTimeMillis(), "host1", "testPolicy-host1-01", "open", 0, 0});
-    }
-
-    public static AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
-        AlertStreamEvent event = new AlertStreamEvent();
-        event.setPolicyId(policy.getName());
-        event.setSchema(stream);
-        event.setStreamId(stream.getStreamId());
-        event.setTimestamp(System.currentTimeMillis());
-        event.setCreatedTime(System.currentTimeMillis());
-        event.setSubject("Namenode Disk Used 98%");
-        event.setBody("Disk Usage of Test cluster's name node (<a href=\"#\">namenode.hostname.domain</a>) is <strong style=\"color: red\">98%</strong> at <strong>2016-11-30 12:30:45</strong>, exceeding alert threshold <strong>90</strong>%");
-        event.setData(data);
-        event.ensureAlertId();
-        event.setSeverity(AlertSeverity.CRITICAL);
-        event.setCategory("HDFS");
-        event.setContext(new HashMap<String,Object>(){{
-            put(AlertPublishEvent.SITE_ID_KEY,"TestCluster");
-        }});
-        Assert.assertNotNull(event.getAlertId());
-        return event;
-    }
-
-    public static StreamDefinition createStream() {
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn tsColumn = new StreamColumn();
-        tsColumn.setName("timestamp");
-        tsColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("host");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn alertKeyColumn = new StreamColumn();
-        alertKeyColumn.setName("alertKey");
-        alertKeyColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn stateColumn = new StreamColumn();
-        stateColumn.setName("state");
-        stateColumn.setType(StreamColumn.Type.STRING);
-
-        // dedupCount, dedupFirstOccurrence
-
-        StreamColumn dedupCountColumn = new StreamColumn();
-        dedupCountColumn.setName("dedupCount");
-        dedupCountColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
-        dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
-        dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
-
-        sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn,
-            dedupFirstOccurrenceColumn));
-        sd.setDataSource("testDatasource");
-        sd.setStreamId("testStream");
-        sd.setDescription("test stream");
-        return sd;
-    }
-
-    public static  PolicyDefinition createPolicyGroupByStreamId(String streamName, String policyName) {
-        PolicyDefinition pd = new PolicyDefinition();
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        // expression, something like "PT5S,dynamic,1,host"
-        def.setValue("test");
-        def.setType("siddhi");
-        pd.setDefinition(def);
-        pd.setInputStreams(Arrays.asList("inputStream"));
-        pd.setOutputStreams(Arrays.asList("outputStream"));
-        pd.setName(policyName);
-        pd.setDescription(String.format("Test policy for stream %s", streamName));
-
-        StreamPartition sp = new StreamPartition();
-        sp.setStreamId(streamName);
-        sp.setColumns(Arrays.asList("host"));
-        sp.setType(StreamPartition.Type.GROUPBY);
-        pd.addPartition(sp);
-        return pd;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java
deleted file mode 100644
index 3df5fc8..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.publisher;
-
-import org.junit.Test;
-
-public class PublishementTypeLoaderTest {
-    @Test
-    public void testPublishmentTypeLoader() {
-        PublishementTypeLoader.loadPublishmentTypes();
-    }
-}


Mime
View raw message