eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [17/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:25 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
deleted file mode 100644
index 533e486..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-/**
- * Since 6/29/16.
- */
-public class SampleClient5AbsenceAlert {
-    private static final Logger LOG = LoggerFactory.getLogger(SampleClient5AbsenceAlert.class);
-    private static long currentTimestamp = 1467240000000L;
-    private static long interval = 3000L;
-
-    public static void main(String[] args) throws Exception {
-        System.setProperty("config.resource", "/absence/application-absence.conf");
-        ConfigFactory.invalidateCaches();
-
-        Config config = ConfigFactory.load();
-        KafkaProducer producer = createProducer(config);
-        ProducerRecord record = null;
-        record = new ProducerRecord("absenceAlertTopic", createEvent("job1"));
-        producer.send(record);
-        record = new ProducerRecord("absenceAlertTopic", createEvent("job2"));
-        producer.send(record);
-        record = new ProducerRecord("absenceAlertTopic", createEvent("host3"));
-        producer.send(record);
-    }
-
-    private static class AbsenceEvent {
-        @JsonProperty
-        long timestamp;
-        @JsonProperty
-        String jobID;
-        @JsonProperty
-        String status;
-
-        public String toString() {
-            return "timestamp=" + timestamp + ",jobID=" + jobID + ",status=" + status;
-        }
-    }
-
-    private static String createEvent(String jobID) throws Exception {
-        AbsenceEvent e = new AbsenceEvent();
-        long expectTS = currentTimestamp + interval;
-        // adjust back 1 second random
-        long adjust = Math.round(2 * Math.random());
-        e.timestamp = expectTS - adjust;
-        e.jobID = jobID;
-        e.status = "running";
-        LOG.info("sending event {} ", e);
-        ObjectMapper mapper = new ObjectMapper();
-        String value = mapper.writeValueAsString(e);
-        return value;
-    }
-
-
-    public static KafkaProducer<String, String> createProducer(Config config) {
-        String servers = config.getString("kafkaProducer.bootstrapServers");
-        Properties configMap = new Properties();
-        configMap.put("bootstrap.servers", servers);
-        configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("request.required.acks", "1");
-        configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap);
-        return proceduer;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
deleted file mode 100755
index 4552417..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator;
-
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorThreadSafeWrapper;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class AlertBoltOutputCollectorThreadSafeWrapperTest {
-    @Test
-    public void testThreadSafeAlertBoltOutputCollector() {
-        MockedStormAlertOutputCollector stormOutputCollector = new MockedStormAlertOutputCollector(null);
-        AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(new StormOutputCollector(stormOutputCollector));
-        alertBoltOutputCollectorWrapper.emit(create("mockAlert_1"));
-        alertBoltOutputCollectorWrapper.emit(create("mockAlert_2"));
-        Assert.assertEquals(0, stormOutputCollector.getCollected().size());
-        Assert.assertEquals(0, stormOutputCollector.getTupleSize());
-        alertBoltOutputCollectorWrapper.flush();
-        Assert.assertEquals(2, stormOutputCollector.getCollected().size());
-        Assert.assertEquals(2, stormOutputCollector.getTupleSize());
-        alertBoltOutputCollectorWrapper.emit(create("mockAlert_3"));
-        Assert.assertEquals(2, stormOutputCollector.getCollected().size());
-        Assert.assertEquals(2, stormOutputCollector.getTupleSize());
-        alertBoltOutputCollectorWrapper.flush();
-        alertBoltOutputCollectorWrapper.flush();
-        alertBoltOutputCollectorWrapper.flush();
-        Assert.assertEquals(3, stormOutputCollector.getCollected().size());
-        Assert.assertEquals(3, stormOutputCollector.getTupleSize());
-    }
-
-    private AlertStreamEvent create(String streamId) {
-        AlertStreamEvent alert = new AlertStreamEvent();
-        alert.setCreatedBy(this.toString());
-        alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[] {"field_1", 2, "field_3"});
-        alert.setStreamId(streamId);
-        return alert;
-    }
-
-    private class MockedStormAlertOutputCollector extends OutputCollector {
-        private final Map<Object, List<Object>> collected;
-
-        MockedStormAlertOutputCollector(IOutputCollector delegate) {
-            super(delegate);
-            collected = new HashMap<>();
-        }
-
-        @Override
-        public List<Integer> emit(String streamId, List<Object> tuple) {
-            if (!collected.containsKey(tuple.get(0))) {
-                collected.put(tuple.get(0), new LinkedList<>());
-            }
-            collected.get(tuple.get(0)).add(tuple);
-            return null;
-        }
-
-        Map<Object, List<Object>> getCollected() {
-            return collected;
-        }
-
-        int getTupleSize() {
-            int size = 0;
-            for (List<Object> alerts : collected.values()) {
-                size += alerts.size();
-            }
-            return size;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java
deleted file mode 100644
index 1e65edd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created on 9/7/16.
- */
-public class PoilcyExtendedTest {
-
-    private static final ObjectMapper mapper = new ObjectMapper();
-
-    @Test
-    public void test() throws Exception {
-        ArrayNode arrayNode = (ArrayNode)
-            mapper.readTree(PoilcyExtendedTest.class.getResourceAsStream("/extend_policy.json"));
-        Assert.assertEquals(1, arrayNode.size());
-        for (JsonNode node : arrayNode) {
-            PolicyDefinition definition = mapper.treeToValue(node, PolicyDefinition.class);
-
-            Assert.assertNotNull(definition);
-            Assert.assertNotNull(definition.getName());
-            Assert.assertNotNull(definition.getDefinition());
-
-            Assert.assertEquals(PolicyStreamHandlers.CUSTOMIZED_ENGINE, definition.getDefinition().getType());
-            Assert.assertNotNull(definition.getDefinition().getProperties());
-
-            Assert.assertTrue(definition.getDefinition().getProperties().containsKey("parentKey"));
-            Map pkSetting = (Map) definition.getDefinition().getProperties().get("parentKey");
-            Assert.assertTrue(pkSetting.containsKey("syslogStream"));
-
-            Map syslogStreamSetting = (Map) pkSetting.get("syslogStream");
-            Assert.assertTrue(syslogStreamSetting.containsKey("pattern"));
-            Assert.assertEquals("%s-%s", syslogStreamSetting.get("pattern"));
-
-            Assert.assertTrue(syslogStreamSetting.containsKey("columns"));
-            Assert.assertEquals(3, ((List) syslogStreamSetting.get("columns")).size());
-
-            break;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
deleted file mode 100755
index 89039f5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.StormMultiCountMetric;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.mock.MockStreamCollector;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-public class SiddhiCEPPolicyEventHandlerTest {
-    private final static Logger LOG = LoggerFactory.getLogger(SiddhiCEPPolicyEventHandlerTest.class);
-
-    private Map<String, StreamDefinition> createDefinition(String... streamIds) {
-        Map<String, StreamDefinition> sds = new HashMap<>();
-        for (String streamId : streamIds) {
-            // construct StreamDefinition
-            StreamDefinition sd = MockSampleMetadataFactory.createSampleStreamDefinition(streamId);
-            sds.put(streamId, sd);
-        }
-        return sds;
-    }
-
-    @SuppressWarnings("serial")
-    @Test
-    public void testBySendSimpleEvent() throws Exception {
-        SiddhiPolicyHandler handler;
-        MockStreamCollector collector;
-
-        handler = new SiddhiPolicyHandler(createDefinition("sampleStream_1", "sampleStream_2"), 0);
-        collector = new MockStreamCollector();
-        PolicyDefinition policyDefinition = MockSampleMetadataFactory.createSingleMetricSamplePolicy();
-        PolicyHandlerContext context = new PolicyHandlerContext();
-        context.setPolicyDefinition(policyDefinition);
-        context.setPolicyCounter(new StormMultiCountMetric(new MultiCountMetric()));
-        context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId"));
-        handler.prepare(collector, context);
-        StreamEvent event = StreamEvent.builder()
-            .schema(MockSampleMetadataFactory.createSampleStreamDefinition("sampleStream_1"))
-            .streamId("sampleStream_1")
-            .timestamep(System.currentTimeMillis())
-            .attributes(new HashMap<String, Object>() {{
-                put("name", "cpu");
-                put("value", 60.0);
-                put("bad", "bad column value");
-            }}).build();
-        handler.send(event);
-        handler.close();
-    }
-
-    @SuppressWarnings("serial")
-    @Test
-    public void testWithTwoStreamJoinPolicy() throws Exception {
-        Map<String, StreamDefinition> ssd = createDefinition("sampleStream_1", "sampleStream_2");
-
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("SampleJoinPolicyForTest");
-        policyDefinition.setInputStreams(Arrays.asList("sampleStream_1", "sampleStream_2"));
-        policyDefinition.setOutputStreams(Collections.singletonList("joinedStream"));
-        policyDefinition.setDefinition(new PolicyDefinition.Definition(PolicyStreamHandlers.SIDDHI_ENGINE,
-            "from sampleStream_1#window.length(10) as left " +
-                "join sampleStream_2#window.length(10) as right " +
-                "on left.name == right.name and left.value == right.value " +
-                "select left.timestamp,left.name,left.value " +
-                "insert into joinedStream"));
-        policyDefinition.setPartitionSpec(Collections.singletonList(MockSampleMetadataFactory.createSampleStreamGroupbyPartition("sampleStream_1", Collections.singletonList("name"))));
-        SiddhiPolicyHandler handler;
-        Semaphore mutex = new Semaphore(0);
-        List<AlertStreamEvent> alerts = new ArrayList<>(0);
-        Collector<AlertStreamEvent> collector = (event) -> {
-            LOG.info("Collected {}", event);
-            Assert.assertTrue(event != null);
-            alerts.add(event);
-            mutex.release();
-        };
-
-        handler = new SiddhiPolicyHandler(ssd, 0);
-        PolicyHandlerContext context = new PolicyHandlerContext();
-        context.setPolicyDefinition(policyDefinition);
-        context.setPolicyCounter(new StormMultiCountMetric(new MultiCountMetric()));
-        context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId"));
-        handler.prepare(collector, context);
-
-
-        long ts_1 = System.currentTimeMillis();
-        long ts_2 = System.currentTimeMillis() + 1;
-
-        handler.send(StreamEvent.builder()
-            .schema(ssd.get("sampleStream_1"))
-            .streamId("sampleStream_1")
-            .timestamep(ts_1)
-            .attributes(new HashMap<String, Object>() {{
-                put("name", "cpu");
-                put("value", 60.0);
-                put("bad", "bad column value");
-            }}).build());
-
-        handler.send(StreamEvent.builder()
-            .schema(ssd.get("sampleStream_2"))
-            .streamId("sampleStream_2")
-            .timestamep(ts_2)
-            .attributes(new HashMap<String, Object>() {{
-                put("name", "cpu");
-                put("value", 61.0);
-            }}).build());
-
-        handler.send(StreamEvent.builder()
-            .schema(ssd.get("sampleStream_2"))
-            .streamId("sampleStream_2")
-            .timestamep(ts_2)
-            .attributes(new HashMap<String, Object>() {{
-                put("name", "disk");
-                put("value", 60.0);
-            }}).build());
-
-        handler.send(StreamEvent.builder()
-            .schema(ssd.get("sampleStream_2"))
-            .streamId("sampleStream_2")
-            .timestamep(ts_2)
-            .attributes(new HashMap<String, Object>() {{
-                put("name", "cpu");
-                put("value", 60.0);
-            }}).build());
-
-        handler.close();
-
-        Assert.assertTrue("Should get result in 5 s", mutex.tryAcquire(5, TimeUnit.SECONDS));
-        Assert.assertEquals(1, alerts.size());
-        Assert.assertEquals("joinedStream", alerts.get(0).getStreamId());
-        Assert.assertEquals("cpu", alerts.get(0).getData()[1]);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java
deleted file mode 100644
index 9febed5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.StreamCounter;
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.router.StreamOutputCollector;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.mockito.Mockito.*;
-
-public class AlertBoltOutputCollectorWrapperTest {
-
-    private AlertBoltOutputCollectorWrapper alertBoltOutputCollectorWrapper;
-
-    // mock objects
-    private StreamOutputCollector outputCollector;
-    private Object outputLock;
-    private StreamContext streamContext;
-    private StreamCounter streamCounter;
-
-    private Set<PublishPartition> publishPartitions = new HashSet<>();
-
-    private static final String samplePublishId = "samplePublishId";
-    private static final String samplePublishId2 = "samplePublishId2";
-    private static final String samplePolicyId = "samplePolicyId";
-    private static final String sampleStreamId = "sampleStreamId";
-    private static final String sampleStreamId2 = "sampleStreamId2";
-
-    @Before
-    public void setUp() throws Exception {
-        outputCollector = mock(StreamOutputCollector.class);
-        outputLock = mock(Object.class);
-        streamContext = mock(StreamContext.class);
-        streamCounter = mock(StreamCounter.class);
-        alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorWrapper(outputCollector, outputLock, streamContext);
-    }
-
-    @Before
-    public void tearDown() throws Exception {
-        alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(new HashSet<>(), publishPartitions, new HashSet<>());
-        publishPartitions.clear();
-    }
-
-    @Test
-    public void testNormal() throws Exception {
-        doReturn(streamCounter).when(streamContext).counter();
-
-        publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId));
-        publishPartitions.add(createPublishPartition(samplePublishId2, samplePolicyId, sampleStreamId2));
-        alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(publishPartitions, new HashSet<>(), new HashSet<>());
-
-        AlertStreamEvent event = new AlertStreamEvent();
-        event.setPolicyId(samplePolicyId);
-        StreamDefinition sd = new StreamDefinition();
-        sd.setStreamId(sampleStreamId);
-        sd.setColumns(new ArrayList<>());
-        event.setSchema(sd);
-
-        alertBoltOutputCollectorWrapper.emit(event);
-
-        verify(streamCounter, times(1)).incr(anyString());
-        verify(outputCollector, times(1)).emit(anyObject());
-    }
-
-    @Test
-    public void testExceptional() throws Exception {
-        doReturn(streamCounter).when(streamContext).counter();
-
-        publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId));
-        publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId));
-        alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(publishPartitions, new HashSet<>(), new HashSet<>());
-
-        AlertStreamEvent event = new AlertStreamEvent();
-        event.setPolicyId(samplePolicyId);
-        StreamDefinition sd = new StreamDefinition();
-        sd.setStreamId(sampleStreamId);
-        sd.setColumns(new ArrayList<>());
-        event.setSchema(sd);
-
-        alertBoltOutputCollectorWrapper.emit(event);
-
-        verify(streamCounter, times(1)).incr(anyString());
-        verify(outputCollector, times(1)).emit(anyObject());
-    }
-
-    private PublishPartition createPublishPartition(String publishId, String policyId, String streamId) {
-        PublishPartition publishPartition = new PublishPartition();
-        publishPartition.setPolicyId(policyId);
-        publishPartition.setStreamId(streamId);
-        publishPartition.setPublishId(publishId);
-        publishPartition.setColumns(new HashSet<>());
-        return publishPartition;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
deleted file mode 100644
index 2d3ee85..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.integration;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-
-import java.io.IOException;
-import java.util.List;
-
-@SuppressWarnings("serial")
-public class MockMetadataServiceClient implements IMetadataServiceClient {
-
-    @Override
-    public List<SpoutSpec> listSpoutMetadata() {
-        return null;
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
-
-    @Override
-    public ScheduleState getVersionedSpec(String version) {
-        return null;
-    }
-
-    @Override
-    public List<StreamingCluster> listClusters() {
-        return null;
-    }
-
-    @Override
-    public List<PolicyDefinition> listPolicies() {
-        return null;
-    }
-
-    @Override
-    public List<StreamDefinition> listStreams() {
-        return null;
-    }
-
-    @Override
-    public List<Kafka2TupleMetadata> listDataSources() {
-        return null;
-    }
-
-    @Override
-    public List<Publishment> listPublishment() {
-        return null;
-    }
-
-    @Override
-    public ScheduleState getVersionedSpec() {
-        return null;
-    }
-
-    @Override
-    public void addScheduleState(ScheduleState state) {
-
-    }
-
-    @Override
-    public List<Topology> listTopologies() {
-        return null;
-    }
-
-    @Override
-    public void addStreamingCluster(StreamingCluster cluster) {
-
-    }
-
-    @Override
-    public void addStreamingClusters(List<StreamingCluster> clusters) {
-
-    }
-
-    @Override
-    public void addTopology(Topology t) {
-
-    }
-
-    @Override
-    public void addTopologies(List<Topology> topologies) {
-
-    }
-
-    @Override
-    public void addPolicy(PolicyDefinition policy) {
-
-    }
-
-    @Override
-    public void addPolicies(List<PolicyDefinition> policies) {
-
-    }
-
-    @Override
-    public void addStreamDefinition(StreamDefinition streamDef) {
-
-    }
-
-    @Override
-    public void addStreamDefinitions(List<StreamDefinition> streamDefs) {
-
-    }
-
-    @Override
-    public void addDataSource(Kafka2TupleMetadata k2t) {
-
-    }
-
-    @Override
-    public void addDataSources(List<Kafka2TupleMetadata> k2ts) {
-
-    }
-
-    @Override
-    public void addPublishment(Publishment pub) {
-
-    }
-
-    @Override
-    public void addPublishments(List<Publishment> pubs) {
-
-    }
-
-    @Override
-    public void clear() {
-
-    }
-
-    @Override
-    public void clearScheduleState(int maxCapacity) {
-
-    }
-
-    @Override
-    public List<AlertPublishEvent> listAlertPublishEvent() {
-        return null;
-    }
-
-    @Override
-    public void addAlertPublishEvent(AlertPublishEvent event) {
-
-    }
-
-    @Override
-    public void addAlertPublishEvents(List<AlertPublishEvent> events) {
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
deleted file mode 100644
index 4047fc1..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.interpreter;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.exception.DefinitionNotExistException;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-import java.util.*;
-
-public class PolicyInterpreterTest {
-    // -------------------------
-    // Single Stream Test Cases
-    // -------------------------
-    @Test
-    public void testParseSingleStreamPolicyQuery() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) "
-            + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
-        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
-        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
-        Assert.assertEquals(1, executionPlan.getStreamPartitions().size());
-        Assert.assertEquals(2*60*1000,executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
-    }
-
-    @Test
-    public void testParseSingleStreamPolicyWithPattern() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
-            "from e1=Stream1[price >= 20] -> e2=Stream2[price >= e1.price] \n"
-                + "select e1.symbol as symbol, e2.price as price, e1.price+e2.price as total_price \n"
-                + "group by symbol, company insert into OutStream");
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("Stream1"));
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("Stream2"));
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("OutStream"));
-        Assert.assertEquals(StreamPartition.Type.GROUPBY,executionPlan.getStreamPartitions().get(0).getType());
-        Assert.assertArrayEquals(new String[]{"symbol","company"},executionPlan.getStreamPartitions().get(0).getColumns().toArray());
-        Assert.assertEquals(StreamPartition.Type.GROUPBY,executionPlan.getStreamPartitions().get(1).getType());
-        Assert.assertArrayEquals(new String[]{"symbol","company"},executionPlan.getStreamPartitions().get(1).getColumns().toArray());
-    }
-
-    @Test
-    public void testParseSingleStreamPolicyQueryWithMultiplePartitionUsingLargerWindow() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
-            "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 1 min) "
-            + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
-            + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 1 hour) "
-            + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
-        );
-        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
-        Assert.assertEquals(1, executionPlan.getStreamPartitions().size());
-        Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
-    }
-
-    @Test(expected = ExecutionPlanValidationException.class)
-    public void testParseSingleStreamPolicyQueryWithConflictPartition() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
-            "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 5 min) "
-            + "select cmd, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
-            + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) "
-            + "select user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
-        );
-        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
-        Assert.assertEquals(2, executionPlan.getStreamPartitions().size());
-        Assert.assertEquals(5*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
-    }
-
-    @Test
-    public void testValidPolicyWithExternalTimeWindow() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1#window.externalTime(timestamp, 2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
-                put("INPUT_STREAM_3", mockStreamDefinition("INPUT_STREAM_3"));
-                put("INPUT_STREAM_4", mockStreamDefinition("INPUT_STREAM_4"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
-        Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
-    }
-
-    @Test
-    public void testValidPolicyWithTimeWindow() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1#window.time(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
-        Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
-    }
-
-    @Test
-    public void testValidPolicyWithTooManyInputStreams() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
-    }
-
-    @Test
-    public void testValidPolicyWithTooFewOutputStreams() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue(
-            "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
-                + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
-        );
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
-    }
-
-    @Test
-    public void testInvalidPolicyForSyntaxError() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM", mockStreamDefinition("INPUT_STREAM"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    @Test
-    public void testInvalidPolicyForNotDefinedInputStream() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    @Test
-    public void testInvalidPolicyForNotDefinedOutputStream() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    // ---------------------
-    // Two Stream Test Cases
-    // ---------------------
-
-    @Test
-    public void testParseTwoStreamPolicyQueryWithMultiplePartition() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
-            "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1#window.externalTime(timestamp, 1 min) "
-                + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
-                + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2#window.externalTime(timestamp, 1 hour) "
-                + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
-        );
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1"));
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2"));
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
-        Assert.assertEquals(2, executionPlan.getStreamPartitions().size());
-        Assert.assertEquals(60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
-        Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(1).getSortSpec().getWindowPeriodMillis());
-    }
-
-    @Test
-    public void testParseTwoStreamPolicyQueryWithSinglePartition() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
-            "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1#window.externalTime(timestamp, 1 min) "
-                + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
-                + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2 select * insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
-        );
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1"));
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2"));
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
-        Assert.assertEquals(2, executionPlan.getStreamPartitions().size());
-        Assert.assertEquals(60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
-        Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(1).getType());
-    }
-
-
-    @Test
-    public void testParseTwoStreamPolicyQueryInnerJoin() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
-            "from TickEvent[symbol=='EBAY']#window.length(2000) " +
-                "join NewsEvent#window.externalTime(timestamp, 1000 sec) \n" +
-                "select * insert into JoinStream"
-        );
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent"));
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent"));
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream"));
-        Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType());
-        Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
-        Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
-        Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(1).getType());
-        Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec());
-    }
-
-    @Test
-    public void testParseTwoStreamPolicyQueryInnerJoinWithCondition() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
-            "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" +
-                "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" +
-                "on TickEvent.symbol == NewsEvent.company \n" +
-                "insert into JoinStream "
-        );
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent"));
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent"));
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream"));
-        Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType());
-        Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
-        Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
-        Assert.assertEquals(StreamPartition.Type.GROUPBY, executionPlan.getStreamPartitions().get(1).getType());
-        Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec());
-    }
-
-    @Test
-    public void testParseTwoStreamPolicyQueryInnerJoinWithConditionHavingAlias() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
-            "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" +
-                "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" +
-                "on t.symbol == n.company \n" +
-                "insert into JoinStream "
-        );
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent"));
-        Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent"));
-        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream"));
-        Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType());
-        Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
-        Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
-        Assert.assertEquals(StreamPartition.Type.GROUPBY, executionPlan.getStreamPartitions().get(1).getType());
-        Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec());
-    }
-
-    @Test(expected = DefinitionNotExistException.class)
-    public void testParseTwoStreamPolicyQueryInnerJoinWithConditionHavingNotFoundAlias() throws Exception {
-        PolicyInterpreter.parseExecutionPlan(
-            "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" +
-            "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" +
-            "on t.symbol == NOT_EXIST_ALIAS.company \n" +
-            "insert into JoinStream "
-        );
-    }
-
-    private static final ObjectMapper mapper = new ObjectMapper();
-
-    @Test
-    public void testLeftJoin() throws  Exception {
-        PolicyDefinition def = mapper.readValue(PolicyInterpreterTest.class.getResourceAsStream("/interpreter/policy.json"), PolicyDefinition.class);
-        ArrayNode array = (ArrayNode)mapper.readTree(PolicyInterpreterTest.class.getResourceAsStream("/interpreter/streams.json"));
-        Map<String, StreamDefinition> allDefinitions = new HashMap<>();
-        for(JsonNode node : array) {
-            StreamDefinition streamDef = mapper.readValue(node.toString(), StreamDefinition.class);
-            allDefinitions.put(streamDef.getStreamId(), streamDef);
-        }
-        PolicyValidationResult result = PolicyInterpreter.validate(def, allDefinitions);
-        Assert.assertTrue(result.isSuccess());
-    }
-
-    @Test
-    public void testExtendPolicy() throws  Exception {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test-extend-policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
-        policyDefinition.setDefinition(definition);
-
-        Map<String, StreamDefinition> allDefinitions = new HashMap<>();
-        allDefinitions.put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
-        PolicyValidationResult result = PolicyInterpreter.validate(policyDefinition, allDefinitions);
-        Assert.assertTrue(result.isSuccess());
-    }
-
-
-    // --------------
-    // Helper Methods
-    // --------------
-
-    private static StreamDefinition mockStreamDefinition(String streamId) {
-        StreamDefinition streamDefinition = new StreamDefinition();
-        streamDefinition.setStreamId(streamId);
-        List<StreamColumn> columns = new ArrayList<>();
-        columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
-        columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-        columns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
-        streamDefinition.setColumns(columns);
-        return streamDefinition;
-    }
-
-    @Test
-    public void testValidPolicyWithPattern() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        String policy =
-                "from every a = HADOOP_JMX_METRIC_STREAM_1[component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] " +
-                        "-> b = HADOOP_JMX_METRIC_STREAM_1[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " +
-                        "select b.metric, b.host as host, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site " +
-                        "group by b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;";
-        definition.setValue(policy);
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
-        Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
-        Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType());
-        Assert.assertArrayEquals(new String[]{"metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray());
-        Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId());
-    }
-
-    @Test
-    public void testValidPolicyWithPatternSort() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        String policy =
-                "from HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"]#window.externalTime(timestamp, 1 min) " +
-                        "select * group by site, host, component, metric insert into temp;\n" +
-                "\n" +
-                "from every a = HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"] -> b = HADOOP_JMX_METRIC_STREAM_1[b.component == a.component and b.metric == a.metric and b.host == a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " +
-                        "select b.site, b.host, b.component, b.metric, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, max(b.timestamp) as timestamp " +
-                        "group by b.site, b.host, b.component, b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;";
-        definition.setValue(policy);
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
-        Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
-        Assert.assertEquals(60000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
-        Assert.assertEquals(12000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowMargin());
-        Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType());
-        Assert.assertArrayEquals(new String[]{"site", "host", "component", "metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray());
-        Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId());
-    }
-
-    @Test
-    public void testValidPolicyWithSequence() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        String policy =
-                "from every a = HADOOP_JMX_METRIC_STREAM_1[component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] " +
-                        ", b = HADOOP_JMX_METRIC_STREAM_1[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " +
-                        "select b.metric, b.host as host, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site " +
-                        "group by b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;";
-        definition.setValue(policy);
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
-        Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
-        Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType());
-        Assert.assertArrayEquals(new String[]{"metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray());
-        Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId());
-    }
-
-    @Test
-    public void testValidPolicyWithSequenceSort() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        String policy =
-                "from HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"]#window.externalTime(timestamp, 1 min) " +
-                        "select * group by site, host, component, metric insert into temp;\n" +
-                        "\n" +
-                        "from every a = HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"], b = HADOOP_JMX_METRIC_STREAM_1[b.component == a.component and b.metric == a.metric and b.host == a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " +
-                        "select b.site, b.host, b.component, b.metric, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, max(b.timestamp) as timestamp " +
-                        "group by b.site, b.host, b.component, b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;";
-        definition.setValue(policy);
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
-            {
-                put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
-        Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
-        Assert.assertEquals(60000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
-        Assert.assertEquals(12000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowMargin());
-        Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType());
-        Assert.assertArrayEquals(new String[]{"site", "host", "component", "metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray());
-        Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
deleted file mode 100644
index 2305a0f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.apache.eagle.alert.engine.metric;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class MemoryUsageGaugeSetTest {
-    private final Logger LOG = LoggerFactory.getLogger(MemoryUsageGaugeSetTest.class);
-
-    @Test
-    public void testJVMMetrics() throws InterruptedException {
-        LOG.info("Starting testJVMMetrics");
-        final MetricRegistry metrics = new MetricRegistry();
-        ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics)
-            .convertRatesTo(TimeUnit.SECONDS)
-            .convertDurationsTo(TimeUnit.MILLISECONDS)
-            .build();
-        metrics.registerAll(new MemoryUsageGaugeSet());
-        metrics.register("sample", (Gauge<Double>) () -> 0.1234);
-        reporter.start(1, TimeUnit.SECONDS);
-        reporter.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
deleted file mode 100644
index 09f01e4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.eagle.alert.engine.mock;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class MockPartitionedCollector implements PartitionedEventCollector {
-    @SuppressWarnings("unused")
-    private final static Logger LOG = LoggerFactory.getLogger(MockPartitionedCollector.class);
-    private List<PartitionedEvent> cache;
-
-    public MockPartitionedCollector() {
-        cache = new LinkedList<>();
-    }
-
-    public void emit(PartitionedEvent event) {
-        cache.add(event);
-    }
-
-    public void clear() {
-        cache.clear();
-    }
-
-    public List<PartitionedEvent> get() {
-        return cache;
-    }
-
-    public int size() {
-        return cache.size();
-    }
-
-    @Override
-    public void drop(PartitionedEvent event) {
-
-    }
-}
\ No newline at end of file


Mime
View raw message