eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [34/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:42 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
deleted file mode 100644
index 688e321..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metric;
-
-import com.codahale.metrics.*;
-import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import kafka.admin.AdminUtils;
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.message.MessageAndOffset;
-import kafka.utils.ZKStringSerializer$;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.io.FileUtils;
-import org.apache.eagle.alert.metric.entity.MetricEvent;
-import org.apache.eagle.alert.metric.sink.ConsoleSink;
-import org.apache.eagle.alert.metric.sink.Slf4jSink;
-import org.apache.eagle.alert.metric.source.JVMMetricSource;
-import org.apache.eagle.alert.metric.source.MetricSource;
-import org.apache.eagle.alert.utils.JsonUtils;
-import org.apache.eagle.alert.utils.KafkaEmbedded;
-import org.junit.*;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Scanner;
-
-public class MetricSystemTest {
-
-    public static final String END_LINE = System.getProperty("line.separator");
-    @Rule
-    public TemporaryFolder tempFolder = new TemporaryFolder();
-    private static final int DATA_BEGIN_INDEX = 55;
-    private static final String TOPIC = "alert_metric_test";
-    private static final String clientName = "test";
-
-    @Test
-    public void testMetricEvent() {
-        MetricEvent metricEvent = MetricEvent.of("test").build();
-        Assert.assertEquals(metricEvent.get("name"), "test");
-        Assert.assertNotNull(metricEvent.get("timestamp"));
-
-        metricEvent = MetricEvent.of("test1").build();
-        metricEvent.put("timestamp", 1);
-        Assert.assertEquals(metricEvent.get("name"), "test1");
-        Assert.assertEquals(metricEvent.get("timestamp"), 1);
-
-        Counter counter = new Counter();
-        counter.inc(10);
-        metricEvent = MetricEvent.of("testcount").from(counter).build();
-        Assert.assertEquals(metricEvent.get("count"), 10l);
-
-        Gauge gauge = Mockito.mock(FileDescriptorRatioGauge.class);
-        Mockito.when(gauge.getValue()).thenReturn(new Double("0.4"));
-        metricEvent = MetricEvent.of("testGauge").from(gauge).build();
-        Assert.assertEquals(metricEvent.get("value"), 0.4);
-
-        //Histogram
-        Histogram histogram = Mockito.mock(Histogram.class);
-        Snapshot snapshot = Mockito.mock(Snapshot.class);
-        Mockito.when(histogram.getCount()).thenReturn(11l);
-        Mockito.when(histogram.getSnapshot()).thenReturn(snapshot);
-        Mockito.when(snapshot.getMin()).thenReturn(1l);
-        Mockito.when(snapshot.getMax()).thenReturn(2l);
-        Mockito.when(snapshot.getMean()).thenReturn(3d);
-        Mockito.when(snapshot.getStdDev()).thenReturn(4d);
-        Mockito.when(snapshot.getMedian()).thenReturn(5d);
-        Mockito.when(snapshot.get75thPercentile()).thenReturn(6d);
-        Mockito.when(snapshot.get95thPercentile()).thenReturn(7d);
-        Mockito.when(snapshot.get98thPercentile()).thenReturn(8d);
-        Mockito.when(snapshot.get99thPercentile()).thenReturn(9d);
-        Mockito.when(snapshot.get999thPercentile()).thenReturn(10d);
-        metricEvent = MetricEvent.of("testHistogram").from(histogram).build();
-
-        Assert.assertEquals(metricEvent.get("count"), 11l);
-        Assert.assertEquals(metricEvent.get("min"), 1l);
-        Assert.assertEquals(metricEvent.get("max"), 2l);
-        Assert.assertEquals(metricEvent.get("mean"), 3d);
-        Assert.assertEquals(metricEvent.get("stddev"), 4d);
-        Assert.assertEquals(metricEvent.get("median"), 5d);
-        Assert.assertEquals(metricEvent.get("75thPercentile"), 6d);
-        Assert.assertEquals(metricEvent.get("95thPercentile"), 7d);
-        Assert.assertEquals(metricEvent.get("98thPercentile"), 8d);
-        Assert.assertEquals(metricEvent.get("99thPercentile"), 9d);
-        Assert.assertEquals(metricEvent.get("999thPercentile"), 10d);
-
-        //Meter
-        Meter meter = Mockito.mock(Meter.class);
-        Mockito.when(meter.getCount()).thenReturn(1l);
-        Mockito.when(meter.getOneMinuteRate()).thenReturn(2d);
-        Mockito.when(meter.getFiveMinuteRate()).thenReturn(3d);
-        Mockito.when(meter.getFifteenMinuteRate()).thenReturn(4d);
-        Mockito.when(meter.getMeanRate()).thenReturn(5d);
-        metricEvent = MetricEvent.of("testMeter").from(meter).build();
-
-        Assert.assertEquals(metricEvent.get("value"), 1l);
-        Assert.assertEquals(metricEvent.get("1MinRate"), 2d);
-        Assert.assertEquals(metricEvent.get("5MinRate"), 3d);
-        Assert.assertEquals(metricEvent.get("15MinRate"), 4d);
-        Assert.assertEquals(metricEvent.get("mean"), 5d);
-
-        //Timer
-        Timer value = Mockito.mock(Timer.class);
-        Mockito.when(value.getCount()).thenReturn(1l);
-        Mockito.when(value.getOneMinuteRate()).thenReturn(2d);
-        Mockito.when(value.getFiveMinuteRate()).thenReturn(3d);
-        Mockito.when(value.getFifteenMinuteRate()).thenReturn(4d);
-        Mockito.when(value.getMeanRate()).thenReturn(5d);
-        metricEvent = MetricEvent.of("testTimer").from(value).build();
-
-        Assert.assertEquals(metricEvent.get("value"), 1l);
-        Assert.assertEquals(metricEvent.get("1MinRate"), 2d);
-        Assert.assertEquals(metricEvent.get("5MinRate"), 3d);
-        Assert.assertEquals(metricEvent.get("15MinRate"), 4d);
-        Assert.assertEquals(metricEvent.get("mean"), 5d);
-
-    }
-
-    @Test
-    public void testMerticSystemWithKafkaSink() throws IOException {
-
-        JVMMetricSource jvmMetricSource = mockMetricRegistry();
-        //setup kafka
-        KafkaEmbedded kafkaEmbedded = new KafkaEmbedded();
-        makeSureTopic(kafkaEmbedded.getZkConnectionString());
-        //setup metric system
-        File file = genKafkaSinkConfig(kafkaEmbedded.getBrokerConnectionString());
-        Config config = ConfigFactory.parseFile(file);
-        MetricSystem system = MetricSystem.load(config);
-        system.register(jvmMetricSource);
-        system.start();
-        system.report();
-
-        SimpleConsumer consumer = assertMsgFromKafka(kafkaEmbedded);
-        system.stop();
-        consumer.close();
-        kafkaEmbedded.shutdown();
-    }
-
-    @Test
-    public void testConsoleSink() throws IOException {
-        PrintStream console = System.out;
-        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-        System.setOut(new PrintStream(bytes));
-
-        ConsoleSink sink = new ConsoleSink();
-        MetricRegistry registry = new MetricRegistry();
-        JvmAttributeGaugeSet jvm = Mockito.mock(JvmAttributeGaugeSet.class);
-        Map<String, Metric> metrics = new HashMap<>();
-        metrics.put("name", (Gauge) () -> "testname");
-        metrics.put("uptime", (Gauge) () -> "testuptime");
-        metrics.put("vendor", (Gauge) () -> "testvendor");
-        Mockito.when(jvm.getMetrics()).thenReturn(metrics);
-        registry.registerAll(jvm);
-        File file = genConsoleSinkConfig();
-        Config config = ConfigFactory.parseFile(file);
-        sink.prepare(config, registry);
-        sink.report();
-        sink.stop();
-        String result = bytes.toString();
-        result = result.substring(result.indexOf(END_LINE) + END_LINE.length());//remove first line
-        Assert.assertEquals("" + END_LINE + "" +
-                "-- Gauges ----------------------------------------------------------------------" + END_LINE + "" +
-                "name" + END_LINE + "" +
-                "             value = testname" + END_LINE + "" +
-                "uptime" + END_LINE + "" +
-                "             value = testuptime" + END_LINE + "" +
-                "vendor" + END_LINE + "" +
-                "             value = testvendor" + END_LINE + "" +
-                "" + END_LINE + "" +
-                "" + END_LINE + "", result);
-        System.setOut(console);
-    }
-
-    @Test
-    public void testSlf4jSink() throws IOException {
-        PrintStream console = System.out;
-        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-        System.setOut(new PrintStream(bytes));
-
-        Slf4jSink sink = new Slf4jSink();
-        MetricRegistry registry = new MetricRegistry();
-        JvmAttributeGaugeSet jvm = Mockito.mock(JvmAttributeGaugeSet.class);
-        Map<String, Metric> metrics = new HashMap<>();
-        metrics.put("name", (Gauge) () -> "testname");
-        metrics.put("uptime", (Gauge) () -> "testuptime");
-        metrics.put("vendor", (Gauge) () -> "testvendor");
-        Mockito.when(jvm.getMetrics()).thenReturn(metrics);
-        registry.registerAll(jvm);
-        File file = genSlf4jSinkConfig();
-        Config config = ConfigFactory.parseFile(file);
-        sink.prepare(config, registry);
-        sink.report();
-        sink.stop();
-        String result = bytes.toString();
-        String finalResult = "";
-        Scanner scanner = new Scanner(result);
-        while (scanner.hasNext()) {
-            finalResult += scanner.nextLine().substring(DATA_BEGIN_INDEX) + END_LINE;
-        }
-        Assert.assertEquals("type=GAUGE, name=name, value=testname" + END_LINE + "" +
-                "type=GAUGE, name=uptime, value=testuptime" + END_LINE + "" +
-                "type=GAUGE, name=vendor, value=testvendor" + END_LINE + "", finalResult);
-        System.setOut(console);
-    }
-
-    private SimpleConsumer assertMsgFromKafka(KafkaEmbedded kafkaEmbedded) throws IOException {
-        SimpleConsumer consumer = new SimpleConsumer("localhost", kafkaEmbedded.getPort(), 100000, 64 * 1024, clientName);
-        long readOffset = getLastOffset(consumer, TOPIC, 0, kafka.api.OffsetRequest.EarliestTime(), clientName);
-        FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(TOPIC, 0, readOffset, 100000).build();
-        FetchResponse fetchResponse = consumer.fetch(req);
-        Map<Integer, Map<String, String>> resultCollector = new HashMap<>();
-        int count = 1;
-        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(TOPIC, 0)) {
-            long currentOffset = messageAndOffset.offset();
-            if (currentOffset < readOffset) {
-                System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);
-                continue;
-            }
-
-            readOffset = messageAndOffset.nextOffset();
-            ByteBuffer payload = messageAndOffset.message().payload();
-
-            byte[] bytes = new byte[payload.limit()];
-            payload.get(bytes);
-            String message = new String(bytes, "UTF-8");
-            Map<String, String> covertedMsg = JsonUtils.mapper.readValue(message, Map.class);
-            covertedMsg.remove("timestamp");
-            resultCollector.put(count, covertedMsg);
-            count++;
-        }
-        Assert.assertEquals("{1={name=heap.committed, value=175636480}, 2={name=heap.init, value=262144000}, 3={name=heap.max, value=3704094720}, 4={name=heap.usage, value=0.01570181876990446}, 5={name=heap.used, value=58491576}, 6={name=name, value=testname}, 7={name=non-heap.committed, value=36405248}, 8={name=non-heap.init, value=2555904}, 9={name=non-heap.max, value=-1}, 10={name=non-heap.usage, value=-3.5588712E7}, 11={name=non-heap.used, value=35596496}, 12={name=pools.Code-Cache.usage, value=0.020214080810546875}, 13={name=pools.Compressed-Class-Space.usage, value=0.0035556256771087646}, 14={name=pools.Metaspace.usage, value=0.9777212526244751}, 15={name=pools.PS-Eden-Space.usage, value=0.03902325058129612}, 16={name=pools.PS-Old-Gen.usage, value=0.001959359247654333}, 17={name=pools.PS-Survivor-Space.usage, value=0.0}, 18={name=total.committed, value=212107264}, 19={name=total.init, value=264699904}, 20={name=total.max, value=3704094719}, 21={name=total.used, value=94644240}
 , 22={name=uptime, value=testuptime}, 23={name=vendor, value=testvendor}}", resultCollector.toString());
-        return consumer;
-    }
-
-    private JVMMetricSource mockMetricRegistry() {
-        JvmAttributeGaugeSet jvm = Mockito.mock(JvmAttributeGaugeSet.class);
-        Map<String, Metric> metrics = new HashMap<>();
-        metrics.put("name", (Gauge) () -> "testname");
-        metrics.put("uptime", (Gauge) () -> "testuptime");
-        metrics.put("vendor", (Gauge) () -> "testvendor");
-        Mockito.when(jvm.getMetrics()).thenReturn(metrics);
-        JVMMetricSource jvmMetricSource = new JVMMetricSource();
-        Assert.assertEquals("jvm", jvmMetricSource.name());
-        MetricRegistry realRegistry = jvmMetricSource.registry();
-        Assert.assertTrue(realRegistry.remove("name"));
-        Assert.assertTrue(realRegistry.remove("uptime"));
-        Assert.assertTrue(realRegistry.remove("vendor"));
-        realRegistry.registerAll(jvm);
-
-        MemoryUsageGaugeSet mem = Mockito.mock(MemoryUsageGaugeSet.class);
-        Map<String, Metric> memMetrics = new HashMap<>();
-        Assert.assertTrue(realRegistry.remove("heap.committed"));
-        Assert.assertTrue(realRegistry.remove("heap.init"));
-        Assert.assertTrue(realRegistry.remove("heap.max"));
-        Assert.assertTrue(realRegistry.remove("heap.usage"));
-        Assert.assertTrue(realRegistry.remove("heap.used"));
-        Assert.assertTrue(realRegistry.remove("non-heap.committed"));
-        Assert.assertTrue(realRegistry.remove("non-heap.init"));
-        Assert.assertTrue(realRegistry.remove("non-heap.max"));
-        Assert.assertTrue(realRegistry.remove("non-heap.usage"));
-        Assert.assertTrue(realRegistry.remove("non-heap.used"));
-        Assert.assertTrue(realRegistry.remove("pools.Code-Cache.usage"));
-        Assert.assertTrue(realRegistry.remove("pools.Compressed-Class-Space.usage"));
-        Assert.assertTrue(realRegistry.remove("pools.Metaspace.usage"));
-        Assert.assertTrue(realRegistry.remove("pools.PS-Eden-Space.usage"));
-        Assert.assertTrue(realRegistry.remove("pools.PS-Old-Gen.usage"));
-        Assert.assertTrue(realRegistry.remove("pools.PS-Survivor-Space.usage"));
-        Assert.assertTrue(realRegistry.remove("total.committed"));
-        Assert.assertTrue(realRegistry.remove("total.init"));
-        Assert.assertTrue(realRegistry.remove("total.max"));
-        memMetrics.put("heap.committed", (Gauge) () -> 175636480);
-        memMetrics.put("heap.init", (Gauge) () -> 262144000);
-        memMetrics.put("heap.max", (Gauge) () -> 3704094720l);
-        memMetrics.put("heap.usage", (Gauge) () -> 0.01570181876990446);
-        memMetrics.put("heap.used", (Gauge) () -> 58491576);
-        memMetrics.put("non-heap.committed", (Gauge) () -> 36405248);
-        memMetrics.put("non-heap.init", (Gauge) () -> 2555904);
-        memMetrics.put("non-heap.max", (Gauge) () -> -1);
-        memMetrics.put("non-heap.usage", (Gauge) () -> -3.5588712E7);
-        memMetrics.put("non-heap.used", (Gauge) () -> 35596496);
-        memMetrics.put("pools.Code-Cache.usage", (Gauge) () -> 0.020214080810546875);
-        memMetrics.put("pools.Compressed-Class-Space.usage", (Gauge) () -> 0.0035556256771087646);
-        memMetrics.put("pools.Metaspace.usage", (Gauge) () -> 0.9777212526244751);
-        memMetrics.put("pools.PS-Eden-Space.usage", (Gauge) () -> 0.03902325058129612);
-        memMetrics.put("pools.PS-Old-Gen.usage", (Gauge) () -> 0.001959359247654333);
-        memMetrics.put("pools.PS-Survivor-Space.usage", (Gauge) () -> 0.0);
-        memMetrics.put("total.committed", (Gauge) () -> 212107264);
-        memMetrics.put("total.init", (Gauge) () -> 264699904);
-        memMetrics.put("total.max", (Gauge) () -> 3704094719l);
-        memMetrics.put("total.used", (Gauge) () -> 94644240);
-        Mockito.when(mem.getMetrics()).thenReturn(memMetrics);
-        Assert.assertTrue(realRegistry.remove("total.used"));
-        realRegistry.registerAll(mem);
-        return jvmMetricSource;
-    }
-
-    private long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
-        OffsetResponse response = consumer.getOffsetsBefore(request);
-        if (response.hasError()) {
-            System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));
-            return 0;
-        }
-        long[] offsets = response.offsets(topic, partition);
-        return offsets[0];
-    }
-
-    private File genKafkaSinkConfig(String brokerConnectionString) throws IOException {
-        File file = tempFolder.newFile("application.conf");
-        String fileContent = "{" + END_LINE + "" +
-                "  metric {" + END_LINE + "" +
-                "    sink {" + END_LINE + "" +
-                "       kafka {" + END_LINE + "" +
-                "        \"topic\": \"" + TOPIC + "\"" + END_LINE + "" +
-                "        \"bootstrap.servers\": \"" + brokerConnectionString + "\"" + END_LINE + "" +
-                "      }" + END_LINE + "" +
-                "    }" + END_LINE + "" +
-                "  }" + END_LINE + "" +
-                "}";
-        FileUtils.writeStringToFile(file, fileContent);
-        return file;
-    }
-
-    private File genConsoleSinkConfig() throws IOException {
-        File file = tempFolder.newFile("application-console.conf");
-        String fileContent = "{" + END_LINE + "" +
-                "  metric {" + END_LINE + "" +
-                "    sink {" + END_LINE + "" +
-                "      stdout {" + END_LINE + "" +
-                "        // console metric sink" + END_LINE + "" +
-                "      }" + END_LINE + "" +
-                "    }" + END_LINE + "" +
-                "  }" + END_LINE + "" +
-                "}";
-        FileUtils.writeStringToFile(file, fileContent);
-        return file;
-    }
-
-    private File genSlf4jSinkConfig() throws IOException {
-        File file = tempFolder.newFile("application-slf4j.conf");
-        String fileContent = "{" + END_LINE + "" +
-                "        metric {" + END_LINE + "" +
-                "        sink {" + END_LINE + "" +
-                "            logger {" + END_LINE + "" +
-                "                level = \"INFO\"" + END_LINE + "" +
-                "            }" + END_LINE + "" +
-                "        }" + END_LINE + "" +
-                "    }" + END_LINE + "" +
-                "    }";
-        FileUtils.writeStringToFile(file, fileContent);
-        return file;
-    }
-
-    public void makeSureTopic(String zkConnectionString) {
-        ZkClient zkClient = new ZkClient(zkConnectionString, 10000, 10000, ZKStringSerializer$.MODULE$);
-        Properties topicConfiguration = new Properties();
-        AdminUtils.createTopic(zkClient, TOPIC, 1, 1, topicConfiguration);
-    }
-
-
-    @Test
-    @Ignore
-    public void testMetaConflict() {
-        MetricSystem system = MetricSystem.load(ConfigFactory.load());
-        system.register(new MetaConflictMetricSource());
-        system.start();
-        system.report();
-        system.stop();
-    }
-
-    private class MetaConflictMetricSource implements MetricSource {
-        private MetricRegistry registry = new MetricRegistry();
-
-        public MetaConflictMetricSource() {
-            registry.register("meta.conflict", (Gauge<String>) () -> "meta conflict happening!");
-        }
-
-        @Override
-        public String name() {
-            return "metaConflict";
-        }
-
-        @Override
-        public MetricRegistry registry() {
-            return registry;
-        }
-    }
-
-    private class SampleMetricSource implements MetricSource {
-        private MetricRegistry registry = new MetricRegistry();
-
-        public SampleMetricSource() {
-            registry.register("sample.long", (Gauge<Long>) System::currentTimeMillis);
-            registry.register("sample.map", (Gauge<Map<String, Object>>) () -> new HashMap<String, Object>() {
-                private static final long serialVersionUID = 3948508906655117683L;
-
-                {
-                    put("int", 1234);
-                    put("str", "text");
-                    put("bool", true);
-                }
-            });
-        }
-
-        @Override
-        public String name() {
-            return "sampleSource";
-        }
-
-        @Override
-        public MetricRegistry registry() {
-            return registry;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java
deleted file mode 100644
index 50b00d9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.service;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-
-public class TestMetadataServiceClientImpl {
-    @SuppressWarnings("resource")
-    @Ignore
-    @Test
-    public void test() throws Exception {
-        MetadataServiceClientImpl impl = new MetadataServiceClientImpl("localhost", 58080, "/api/metadata/policies");
-        List<PolicyDefinition> policies = impl.listPolicies();
-        ObjectMapper mapper = new ObjectMapper();
-        String ret = mapper.writeValueAsString(policies);
-        System.out.println(ret);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/ConfigUtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/ConfigUtilsTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/ConfigUtilsTest.java
deleted file mode 100644
index a97d09b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/ConfigUtilsTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.util;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.io.FileUtils;
-import org.apache.eagle.alert.utils.ConfigUtils;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Properties;
-
-public class ConfigUtilsTest {
-
-    @Rule
-    public TemporaryFolder tempFolder = new TemporaryFolder();
-    public static final String END_LINE = System.getProperty("line.separator");
-
-    @Test
-    public void testToProperties() throws IOException {
-        Config config = ConfigFactory.parseFile(genConfig());
-        Properties properties = ConfigUtils.toProperties(config);
-        System.out.print(properties);
-        Assert.assertEquals("{metric={sink={stdout={}, elasticsearch={hosts=[localhost:9200], index=alert_metric_test}, kafka={topic=alert_metric_test, bootstrap.servers=localhost:9092}, logger={level=INFO}}}, zkConfig={zkQuorum=localhost:2181, zkRoot=/alert}}", properties.toString());
-    }
-
-    private File genConfig() throws IOException {
-        File file = tempFolder.newFile("application-config.conf");
-        String fileContent = "{" + END_LINE + "" +
-                "  metric {" + END_LINE + "" +
-                "    sink {" + END_LINE + "" +
-                "      stdout {" + END_LINE + "" +
-                "        // console metric sink" + END_LINE + "" +
-                "      }" + END_LINE + "" +
-                "      kafka {" + END_LINE + "" +
-                "        \"topic\": \"alert_metric_test\"" + END_LINE + "" +
-                "        \"bootstrap.servers\": \"localhost:9092\"" + END_LINE + "" +
-                "      }" + END_LINE + "" +
-                "      logger {" + END_LINE + "" +
-                "        level = \"INFO\"" + END_LINE + "" +
-                "      }" + END_LINE + "" +
-                "      elasticsearch {" + END_LINE + "" +
-                "        hosts = [\"localhost:9200\"]" + END_LINE + "" +
-                "        index = \"alert_metric_test\"" + END_LINE + "" +
-                "      }" + END_LINE + "" +
-                "    }" + END_LINE + "" +
-                "  }" + END_LINE + "" +
-                "  zkConfig {" + END_LINE + "" +
-                "    \"zkQuorum\": \"localhost:2181\"" + END_LINE + "" +
-                "    \"zkRoot\": \"/alert\"" + END_LINE + "" +
-                "  }" + END_LINE + "" +
-                "}";
-        FileUtils.writeStringToFile(file, fileContent);
-        return file;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/JsonUtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/JsonUtilsTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/JsonUtilsTest.java
deleted file mode 100644
index 6d6244a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/JsonUtilsTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.util;
-
-import org.apache.eagle.alert.utils.JsonUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class JsonUtilsTest {
-
-    @Test
-    public void testWriteValueAsString() {
-
-        Map<String, Object> jsonMap = new HashMap<>();
-        jsonMap.put("policyId", "policyId");
-        jsonMap.put("streamId", "streamId");
-        jsonMap.put("createBy", "createBy");
-        jsonMap.put("createTime", "createTime");
-        Assert.assertEquals("{\"createBy\":\"createBy\",\"policyId\":\"policyId\",\"streamId\":\"streamId\",\"createTime\":\"createTime\"}", JsonUtils.writeValueAsString(jsonMap));
-
-        jsonMap = new HashMap<>();
-        Assert.assertEquals("{}", JsonUtils.writeValueAsString(jsonMap));
-
-        Assert.assertEquals("null", JsonUtils.writeValueAsString(null));
-
-        Assert.assertEquals("", JsonUtils.writeValueAsString(new Object()));
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/StreamIdConversionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/StreamIdConversionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/StreamIdConversionTest.java
deleted file mode 100644
index 2ea8915..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/StreamIdConversionTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.util;
-
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class StreamIdConversionTest {
-    @Test
-    public void testGenerateStreamIdBetween() {
-        String result = StreamIdConversion.generateStreamIdBetween("source1", "target1");
-        Assert.assertEquals("stream_source1_to_target1", result);
-        result = StreamIdConversion.generateStreamIdBetween("", "target1");
-        Assert.assertEquals("stream__to_target1", result);
-        result = StreamIdConversion.generateStreamIdBetween("source1", null);
-        Assert.assertEquals("stream_source1_to_null", result);
-    }
-
-    @Test
-    public void testGenerateStreamIdByPartition() {
-        String result = StreamIdConversion.generateStreamIdByPartition(1);
-        Assert.assertEquals("stream_1", result);
-        result = StreamIdConversion.generateStreamIdByPartition(-1);
-        Assert.assertEquals("stream_-1", result);
-        result = StreamIdConversion.generateStreamIdByPartition(0);
-        Assert.assertEquals("stream_0", result);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/TimePeriodUtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/TimePeriodUtilsTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/TimePeriodUtilsTest.java
deleted file mode 100644
index 878f162..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/util/TimePeriodUtilsTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.util;
-
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.apache.eagle.common.DateTimeUtil;
-import org.joda.time.Period;
-import org.joda.time.Seconds;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.text.ParseException;
-
-public class TimePeriodUtilsTest {
-    @Test
-    public void testJodaTimePeriod() throws ParseException {
-        String periodText = "PT10m";
-        Period period = new Period(periodText);
-        int seconds = period.toStandardSeconds().getSeconds();
-        Assert.assertEquals(600, seconds);
-        Assert.assertEquals(60, period.toStandardSeconds().dividedBy(10).getSeconds());
-    }
-
-    @Test
-    public void testJodaTimePeriodBySeconds() throws ParseException {
-        String periodText = "PT10s";
-        Period period = new Period(periodText);
-        int seconds = period.toStandardSeconds().getSeconds();
-        Assert.assertEquals(10, seconds);
-    }
-
-    @Test
-    public void testFormatSecondsByPeriod15M() throws ParseException {
-
-        Period period = new Period("PT15m");
-        Seconds seconds = period.toStandardSeconds();
-        Assert.assertEquals(15 * 60, seconds.getSeconds());
-
-        long time = DateTimeUtil.humanDateToSeconds("2015-07-01 13:56:12");
-        long expect = DateTimeUtil.humanDateToSeconds("2015-07-01 13:45:00");
-        long result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
-        Assert.assertEquals(expect, result);
-
-        time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
-        expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
-        result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
-        Assert.assertEquals(expect, result);
-
-        time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
-        expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
-        result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
-        Assert.assertEquals(expect, result);
-    }
-
-    @Test
-    public void testFormatSecondsByPeriod1H() throws ParseException {
-
-        Period period = new Period("PT1h");
-        Seconds seconds = period.toStandardSeconds();
-        Assert.assertEquals(60 * 60, seconds.getSeconds());
-
-        long time = DateTimeUtil.humanDateToSeconds("2015-07-01 13:56:12");
-        long expect = DateTimeUtil.humanDateToSeconds("2015-07-01 13:00:00");
-        long result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
-        Assert.assertEquals(expect, result);
-
-        time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
-        expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
-        result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
-        Assert.assertEquals(expect, result);
-
-        time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:30:59");
-        expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
-        result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
-        Assert.assertEquals(expect, result);
-    }
-
-
-    @Test
-    public void testPeriod() {
-        Assert.assertEquals(30 * 60 * 1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m")));
-        Assert.assertEquals(30 * 60 * 1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30 * 60 * 1000)));
-        Assert.assertEquals("PT1800S", Period.millis(30 * 60 * 1000).toString());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java
deleted file mode 100644
index ff0a3f9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.correlation.meta;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.junit.Test;
-
-/**
- * Created on 3/11/16.
- */
-public class JsonTest {
-
-    @Test
-    public void streamDefTest() throws Exception {
-
-        ObjectMapper mapper = new ObjectMapper();
-        mapper.readValue(JsonTest.class.getResourceAsStream("/streamDef.json"), StreamDefinition.class);
-
-        com.fasterxml.jackson.databind.ObjectMapper mapper2 = new com.fasterxml.jackson.databind.ObjectMapper();
-        mapper2.readValue(JsonTest.class.getResourceAsStream("/streamDef.json"), StreamDefinition.class);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/StreamPartitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/StreamPartitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/StreamPartitionTest.java
deleted file mode 100644
index 524f76a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/StreamPartitionTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.correlation.meta;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-public class StreamPartitionTest {
-    @Test
-    public void testStreamPartitionEqual(){
-        StreamPartition partition1 = new StreamPartition();
-        partition1.setStreamId("unittest");
-        partition1.setColumns(Arrays.asList("col1","col2"));
-        StreamPartition partition2 = new StreamPartition();
-        partition2.setStreamId("unittest");
-        partition2.setColumns(Arrays.asList("col1","col2"));
-        Assert.assertTrue(partition1.equals(partition2));
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java
deleted file mode 100644
index 6cb3696..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringListSizeFunctionExtensionTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.siddhiext;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-import java.util.concurrent.Semaphore;
-
-public class StringListSizeFunctionExtensionTest {
-    private static final Logger LOG = LoggerFactory.getLogger(StringSubtractFunctionExtensionTest.class);
-
-    @Test
-    public void testStringListSize() throws Exception {
-        Semaphore semp = new Semaphore(1);
-        String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " +
-                " from log select string:listSize(switchLabel) as alertKey insert into output; ";
-        SiddhiManager manager = new SiddhiManager();
-        ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
-        runtime.addCallback("output", new StreamCallback() {
-            @Override
-            public void receive(Event[] events) {
-                EventPrinter.print(events);
-                Assert.assertTrue(events.length == 1);
-                Assert.assertTrue(Integer.parseInt(events[0].getData(0).toString()) == 5);
-                semp.release();
-            }
-        });
-
-        runtime.start();
-
-        InputHandler logInput = runtime.getInputHandler("log");
-        semp.acquire();
-        Event e = new Event();
-        e.setTimestamp(System.currentTimeMillis());
-        String ths = "[\"a\", \"b\", \"c\", \"d\", \"e\"]";
-        String rhs = "[\"b\", \"d\"]";
-        e.setData(new Object[] {System.currentTimeMillis(), ths, "port01", rhs});
-        logInput.send(e);
-
-        semp.acquire();
-        runtime.shutdown();
-
-    }
-
-    @Test
-    public void testStringListSize2() throws Exception {
-        Semaphore semp = new Semaphore(1);
-        String ql = " define stream log(timestamp long, site string, component string, resource string, host string, value string); " +
-                " from a = log[resource == \"hadoop.namenode.namenodeinfo.corruptfiles\"],\n" +
-                "b = log[component == a.component and resource == a.resource and host == a.host and a.value != b.value]\n" +
-                "select b.site as site, b.host as host, b.component as component, b.resource as resource, " +
-                "b.timestamp as timestamp, string:listSize(b.value) as newMissingBlocksNumber, string:listSize(a.value) as oldMissingBlocksNumber, string:subtract(b.value, a.value) as missingBlocks\n" +
-                "insert into output;";
-        SiddhiManager manager = new SiddhiManager();
-        ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
-        runtime.addCallback("output", new StreamCallback() {
-            @Override
-            public void receive(Event[] events) {
-                EventPrinter.print(events);
-                Assert.assertTrue(events.length == 1);
-                Assert.assertTrue(Integer.parseInt(events[0].getData(5).toString()) == 5);
-                Assert.assertTrue(Integer.parseInt(events[0].getData(6).toString()) == 2);
-                Assert.assertTrue(events[0].getData(7).toString().equals("a\nc\ne"));
-                semp.release();
-            }
-        });
-
-        runtime.start();
-
-        InputHandler logInput = runtime.getInputHandler("log");
-        semp.acquire();
-        Event e = new Event();
-        e.setTimestamp(System.currentTimeMillis());
-        String rhs = "[\"b\", \"d\"]";
-        e.setData(new Object[] {System.currentTimeMillis(), "a", "a", "hadoop.namenode.namenodeinfo.corruptfiles", "port01", rhs});
-        logInput.send(e);
-
-        e.setTimestamp(System.currentTimeMillis());
-        String ths = "[\"a\", \"b\", \"c\", \"d\", \"e\"]";
-        e.setData(new Object[] {System.currentTimeMillis(), "a", "a", "hadoop.namenode.namenodeinfo.corruptfiles", "port01", ths});
-        logInput.send(e);
-
-        semp.acquire();
-        runtime.shutdown();
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java
deleted file mode 100644
index 4a31c69..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.siddhiext;
-
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-import java.util.concurrent.Semaphore;
-
-public class StringSubtractFunctionExtensionTest {
-    private static final Logger LOG = LoggerFactory.getLogger(StringSubtractFunctionExtensionTest.class);
-
-    @Test
-    public void testStringSubtract() throws Exception {
-        Semaphore semp = new Semaphore(1);
-        String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " +
-                " from log select string:subtract(switchLabel, message) as alertKey insert into output; ";
-        SiddhiManager manager = new SiddhiManager();
-        ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
-        runtime.addCallback("output", new StreamCallback() {
-            @Override
-            public void receive(Event[] events) {
-                EventPrinter.print(events);
-                Assert.assertTrue(events.length == 1);
-                Assert.assertTrue(events[0].getData(0).toString().equals("a\nc\ne"));
-                semp.release();
-            }
-        });
-
-        runtime.start();
-
-        InputHandler logInput = runtime.getInputHandler("log");
-        semp.acquire();
-        Event e = new Event();
-        e.setTimestamp(System.currentTimeMillis());
-        String ths = "[\"a\", \"b\", \"c\", \"d\", \"e\"]";
-        String rhs = "[\"b\", \"d\"]";
-        e.setData(new Object[] {System.currentTimeMillis(), ths, "port01", rhs});
-        logInput.send(e);
-
-        semp.acquire();
-        runtime.shutdown();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
deleted file mode 100644
index b763acf..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-  metric {
-    sink {
-      stdout {
-        // console metric sink
-      }
-      kafka {
-        "topic": "alert_metric_test"
-        "bootstrap.servers": "localhost:9092"
-      }
-      logger {
-        level = "INFO"
-      }
-      elasticsearch {
-        hosts = ["localhost:9200"]
-        index = "alert_metric_test"
-      }
-    }
-  }
-  zkConfig {
-    "zkQuorum": "localhost:2181"
-    "zkRoot": "/alert"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
deleted file mode 100644
index ba06033..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-log4j.rootLogger=DEBUG, stdout
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
deleted file mode 100644
index 5a78b6a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
+++ /dev/null
@@ -1,45 +0,0 @@
-{
-  "streamId": "perfmon_cpu_stream",
-  "dataSource": "perfmon_datasource",
-  "description": "the data stream for perfmon cpu metrics",
-  "validate": false,
-  "timeseries": false,
-  "columns": [
-    {
-      "name": "host",
-      "type": "string",
-      "defaultValue": "",
-      "required": true
-    },
-    {
-      "name": "timestamp",
-      "type": "long",
-      "defaultValue": 0,
-      "required": true
-    },
-    {
-      "name": "floatField",
-      "type": "float",
-      "defaultValue": "1.2",
-      "required": true
-    },
-    {
-      "name": "intField",
-      "type": "int",
-      "defaultValue": "3",
-      "required": true
-    },
-    {
-      "name": "value",
-      "type": "double",
-      "defaultValue": 0.0,
-      "required": true
-    },
-    {
-      "name": "boolField",
-      "type": "bool",
-      "defaultValue": true,
-      "required": true
-    }
-  ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext
deleted file mode 100644
index 7176611..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-empty=org.apache.eagle.alert.siddhiext.StringEmptyFunctionExtension
-subtract=org.apache.eagle.alert.siddhiext.StringSubtractFunctionExtension
-listSize=org.apache.eagle.alert.siddhiext.StringListSizeFunctionExtension
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/.gitignore b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/.gitignore
deleted file mode 100644
index b83d222..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
deleted file mode 100644
index 9022843..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
-	~ contributor license agreements. See the NOTICE file distributed with ~
-	this work for additional information regarding copyright ownership. ~ The
-	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
-	"License"); you may not use this file except in compliance with ~ the License.
-	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
-	~ ~ Unless required by applicable law or agreed to in writing, software ~
-	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
-	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
-	License for the specific language governing permissions and ~ limitations
-	under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.eagle</groupId>
-        <artifactId>eagle-alert</artifactId>
-        <version>0.5.0-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>alert-coordinator</artifactId>
-    <packaging>jar</packaging>
-
-    <name>Eagle::Core::Alert::Coordinator</name>
-    <url>http://maven.apache.org</url>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>alert-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.sun.jersey</groupId>
-            <artifactId>jersey-server</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.sun.jersey</groupId>
-            <artifactId>jersey-servlet</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.sun.jersey.contribs</groupId>
-            <artifactId>jersey-multipart</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.typesafe</groupId>
-            <artifactId>config</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.tomcat.embed</groupId>
-            <artifactId>tomcat-embed-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.swagger</groupId>
-            <artifactId>swagger-jaxrs</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.wso2.siddhi</groupId>
-            <artifactId>siddhi-core</artifactId>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.eclipse.jetty</groupId>
-                <artifactId>jetty-maven-plugin</artifactId>
-                <configuration>
-                    <scanIntervalSeconds>5</scanIntervalSeconds>
-                    <httpConnector>
-                        <port>9090</port>
-                    </httpConnector>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <version>2.6</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>test-jar</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-source-plugin</artifactId>
-                <version>2.1.2</version>
-                <executions>
-                    <execution>
-                        <id>attach-sources</id>
-                        <phase>verify</phase>
-                        <goals>
-                            <goal>jar-no-fork</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
deleted file mode 100644
index cccf2e3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import com.google.common.base.Stopwatch;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.config.ConfigBusProducer;
-import org.apache.eagle.alert.config.ConfigValue;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.impl.MetadataValdiator;
-import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
-import org.apache.eagle.alert.coordinator.trigger.CoordinatorTrigger;
-import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader;
-import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener;
-import org.apache.eagle.alert.coordinator.trigger.ScheduleStateCleaner;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * @since Mar 24, 2016.
- */
-public class Coordinator {
-
-    private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
-
-    private static final String COORDINATOR = "coordinator";
-    
-    /**
-     * /alert/{topologyName}/spout
-     * /router
-     * /alert
-     * /publisher
-     * .
-     */
-    private static final String ZK_ALERT_CONFIG_SPOUT = "{0}/spout";
-    private static final String ZK_ALERT_CONFIG_ROUTER = "{0}/router";
-    private static final String ZK_ALERT_CONFIG_ALERT = "{0}/alert";
-    private static final String ZK_ALERT_CONFIG_PUBLISHER = "{0}/publisher";
-
-
-    private static final String METADATA_SERVICE_HOST = "metadataService.host";
-    private static final String METADATA_SERVICE_PORT = "metadataService.port";
-    private static final String METADATA_SERVICE_CONTEXT = "metadataService.context";
-    private static final String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
-    private static final String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
-    private static final String DYNAMIC_SCHEDULE_STATE_CLEAR_MIN = "metadataDynamicCheck.stateClearPeriodMin";
-    private static final String DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY = "metadataDynamicCheck.stateReservedCapacity";
-
-    private static final int DEFAULT_STATE_RESERVE_CAPACITY = 1000;
-
-    public static final String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader";
-
-    private volatile ScheduleState currentState = null;
-    private ZKConfig zkConfig = null;
-    private final IMetadataServiceClient client;
-    private Config config;
-
-    // FIXME : UGLY global state
-    private static final AtomicBoolean forcePeriodicallyBuild = new AtomicBoolean(true);
-
-    public Coordinator() {
-        config = ConfigFactory.load().getConfig(COORDINATOR);
-        zkConfig = ZKConfigBuilder.getZKConfig(config);
-        client = new MetadataServiceClientImpl(config);
-    }
-
-    public Coordinator(Config config, ZKConfig zkConfig, IMetadataServiceClient client) {
-        this.config = config;
-        this.zkConfig = zkConfig;
-        this.client = client;
-    }
-
-    public synchronized ScheduleState schedule(ScheduleOption option) throws TimeoutException {
-        ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig);
-        AtomicReference<ScheduleState> reference = new AtomicReference<>();
-        try {
-            executor.execute(GREEDY_SCHEDULER_ZK_PATH, () -> {
-                ScheduleState state = null;
-                Stopwatch watch = Stopwatch.createStarted();
-                IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext();
-                TopologyMgmtService mgmtService = new TopologyMgmtService();
-                IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
-
-                scheduler.init(context, mgmtService);
-                state = scheduler.schedule(option);
-
-                long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS);
-                state.setScheduleTimeMillis((int) scheduleTime);// hardcode to integer
-                watch.reset();
-                watch.start();
-
-                // persist & notify
-                try (ConfigBusProducer producer = new ConfigBusProducer(ZKConfigBuilder.getZKConfig(config))) {
-                    postSchedule(client, state, producer);
-                }
-
-                watch.stop();
-                long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
-                LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !", scheduleTime, postTime);
-                reference.set(state);
-                currentState = state;
-            });
-        } catch (TimeoutException e1) {
-            LOG.error("time out when schedule", e1);
-            throw e1;
-        } finally {
-            try {
-                executor.close();
-            } catch (IOException e) {
-                LOG.error("Exception when close exclusive executor, log and ignore!", e);
-            }
-        }
-        return reference.get();
-    }
-
-    public static void postSchedule(IMetadataServiceClient client, ScheduleState state, ConfigBusProducer producer) {
-        // persist state
-        client.addScheduleState(state);
-
-        // notify
-        ConfigValue value = new ConfigValue();
-        value.setValue(state.getVersion());
-        value.setValueVersionId(true);
-        for (String topo : state.getSpoutSpecs().keySet()) {
-            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_SPOUT, topo), value);
-        }
-        for (String topo : state.getGroupSpecs().keySet()) {
-            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_ROUTER, topo), value);
-        }
-        for (String topo : state.getAlertSpecs().keySet()) {
-            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_ALERT, topo), value);
-        }
-        for (String topo : state.getPublishSpecs().keySet()) {
-            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_PUBLISHER, topo), value);
-        }
-
-    }
-
-    public ScheduleState getState() {
-        return currentState;
-    }
-
-    public ValidateState validate() {
-        return new MetadataValdiator(client).validate();
-    }
-
-    /**
-     * shutdown background threads and release various resources.
-     */
-    private static class CoordinatorShutdownHook implements Runnable {
-        private static final Logger LOG = LoggerFactory.getLogger(CoordinatorShutdownHook.class);
-        private ScheduledExecutorService executorSrv;
-
-        public CoordinatorShutdownHook(ScheduledExecutorService executorSrv) {
-            this.executorSrv = executorSrv;
-        }
-
-        @Override
-        public void run() {
-            LOG.info("start shutdown coordinator ...");
-            LOG.info("Step 1 shutdown dynamic policy loader thread ");
-            // we should catch every exception to make best effort for clean
-            // shutdown
-            try {
-                executorSrv.shutdown();
-                executorSrv.awaitTermination(2000, TimeUnit.MILLISECONDS);
-            } catch (Throwable t) {
-                LOG.error("error shutdown dynamic policy loader", t);
-            } finally {
-                executorSrv.shutdownNow();
-            }
-        }
-    }
-
-    private static class PolicyChangeHandler implements PolicyChangeListener {
-        private static final Logger LOG = LoggerFactory.getLogger(PolicyChangeHandler.class);
-        private Config config;
-        private IMetadataServiceClient client;
-
-        public PolicyChangeHandler(Config config, IMetadataServiceClient client) {
-            this.config = config;
-            this.client = client;
-        }
-
-        @Override
-        public void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies,
-                                   Collection<String> removedPolicies, Collection<String> modifiedPolicies) {
-            LOG.info("policy changed ... ");
-            LOG.info("allPolicies: " + allPolicies + ", addedPolicies: " + addedPolicies + ", removedPolicies: "
-                    + removedPolicies + ", modifiedPolicies: " + modifiedPolicies);
-
-            CoordinatorTrigger trigger = new CoordinatorTrigger(config, client);
-            trigger.run();
-
-        }
-    }
-
-    public static void startSchedule() {
-        Config config = ConfigFactory.load().getConfig(COORDINATOR);
-        String host = config.getString(METADATA_SERVICE_HOST);
-        int port = config.getInt(METADATA_SERVICE_PORT);
-        String context = config.getString(METADATA_SERVICE_CONTEXT);
-        IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
-
-        // schedule dynamic policy loader
-        long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
-        long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
-        ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, r -> {
-            Thread t = new Thread(r);
-            t.setDaemon(true);
-            return t;
-        });
-
-        DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
-        loader.addPolicyChangeListener(new PolicyChangeHandler(config, client));
-        scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
-
-        if (config.hasPath(DYNAMIC_SCHEDULE_STATE_CLEAR_MIN) && config.hasPath(DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY)) {
-            int period = config.getInt(DYNAMIC_SCHEDULE_STATE_CLEAR_MIN);
-            int capacity = config.getInt(DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY);
-            ScheduleStateCleaner cleaner = new ScheduleStateCleaner(client, capacity);
-            scheduleSrv.scheduleAtFixedRate(cleaner, period, period, TimeUnit.MINUTES);
-        }
-
-        Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
-        LOG.info("Eagle Coordinator started ...");
-    }
-
-    public void enforcePeriodicallyBuild() {
-        forcePeriodicallyBuild.set(true);
-    }
-
-    public void disablePeriodicallyBuild() {
-        forcePeriodicallyBuild.set(false);
-    }
-
-    public static boolean isPeriodicallyForceBuildEnable() {
-        return forcePeriodicallyBuild.get();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
deleted file mode 100644
index c026785..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-public class CoordinatorConstants {
-    public static final String CONFIG_ITEM_COORDINATOR = "coordinator";
-    public static final String CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND = "topologyLoadUpbound";
-    public static final String CONFIG_ITEM_BOLT_LOAD_UPBOUND = "boltLoadUpbound";
-    public static final String POLICY_DEFAULT_PARALLELISM = "policyDefaultParallelism";
-    public static final String BOLT_PARALLELISM = "boltParallelism";
-    public static final String NUM_OF_ALERT_BOLTS_PER_TOPOLOGY = "numOfAlertBoltsPerTopology";
-    public static final String POLICIES_PER_BOLT = "policiesPerBolt";
-    public static final String REUSE_BOLT_IN_STREAMS = "reuseBoltInStreams";
-    public static final String STREAMS_PER_BOLT = "streamsPerBolt";
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
deleted file mode 100644
index 7ebf26a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import javax.servlet.ServletContextEvent;
-import javax.servlet.ServletContextListener;
-
-/**
- * @since Jun 16, 2016.
- */
-public class CoordinatorListener implements ServletContextListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorListener.class);
-
-    public CoordinatorListener() {
-    }
-
-    @Override
-    public void contextInitialized(ServletContextEvent sce) {
-        LOG.info("start coordinator background tasks..");
-        Coordinator.startSchedule();
-    }
-
-    @Override
-    public void contextDestroyed(ServletContextEvent sce) {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
deleted file mode 100644
index 567e1e2..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import com.google.common.base.Stopwatch;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class ExclusiveExecutor implements Closeable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ExclusiveExecutor.class);
-
-    private static final int ZK_RETRYPOLICY_SLEEP_TIME_MS = 1000;
-    private static final int ZK_RETRYPOLICY_MAX_RETRIES = 3;
-
-    public static final int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 3000;
-    public static final int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 100; //about 5 minutes
-
-    private CuratorFramework client;
-    private LeaderSelector selector;
-
-    public ExclusiveExecutor(ZKConfig zkConfig ) {
-        client = CuratorFrameworkFactory.newClient(
-            zkConfig.zkQuorum,
-            zkConfig.zkSessionTimeoutMs,
-            zkConfig.connectionTimeoutMs,
-            new RetryNTimes(ZK_RETRYPOLICY_MAX_RETRIES, ZK_RETRYPOLICY_SLEEP_TIME_MS)
-        );
-        client.start();
-    }
-
-    public void execute(String path, final Runnable r) throws TimeoutException {
-        execute(path, r, ACQUIRE_LOCK_MAX_RETRIES_TIMES * ACQUIRE_LOCK_WAIT_INTERVAL_MS);
-    }
-
-    public void execute(String path, final Runnable r, int timeoutMillis) throws TimeoutException {
-        final AtomicBoolean executed = new AtomicBoolean(false);
-        Stopwatch watch = Stopwatch.createUnstarted();
-        watch.start();
-        LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() {
-
-            @Override
-            public void takeLeadership(CuratorFramework client) throws Exception {
-                // this callback will get called when you are the leader
-                // do whatever leader work you need to and only exit
-                // this method when you want to relinquish leadership
-                LOG.info("this is leader node right now..");
-                executed.set(true);
-                try {
-                    r.run();
-                } catch (Throwable t) {
-                    LOG.warn("failed to run exclusive executor", t);
-                }
-                LOG.info("leader node executed done!..");
-            }
-
-            @Override
-            public void stateChanged(CuratorFramework client, ConnectionState newState) {
-                LOG.info(String.format("leader selector state change listener, new state: %s", newState.toString()));
-            }
-
-        };
-
-        selector = new LeaderSelector(client, path, listener);
-        // selector.autoRequeue(); // not required, but this is behavior that you
-        // will probably expect
-        selector.start();
-
-        // wait for given times
-        while (watch.elapsed(TimeUnit.MILLISECONDS) < timeoutMillis) { //about 3 minutes waiting
-            if (!executed.get()) {
-                try {
-                    Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
-                } catch (InterruptedException e) {
-                    // ignored
-                }
-                continue;
-            } else {
-                break;
-            }
-        }
-        watch.stop();
-
-        if (!executed.get()) {
-            throw new TimeoutException(String.format("Get exclusive lock for operation on path %s failed due to wait too much time: %d ms",
-                path, watch.elapsed(TimeUnit.MILLISECONDS)));
-        }
-        LOG.info("Exclusive operation done with execution time (lock plus operation) {} ms !", watch.elapsed(TimeUnit.MILLISECONDS));
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (selector != null) {
-            CloseableUtils.closeQuietly(this.selector);
-        }
-        if (client != null) {
-            CloseableUtils.closeQuietly(this.client);
-        }
-    }
-
-}


Mime
View raw message