eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [28/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:36 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
deleted file mode 100644
index e2618c2..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
+++ /dev/null
@@ -1,87 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-############################# Server Basics #############################
-# The id of the broker. This must be set to a unique integer for each broker.
-broker.id=0
-############################# Socket Server Settings #############################
-# The address the socket server listens on. It will get the value returned from
-# java.net.InetAddress.getCanonicalHostName() if not configured.
-#   FORMAT:
-#     listeners = security_protocol://host_name:port
-#   EXAMPLE:
-#     listeners = PLAINTEXT://your.host.name:9092
-#listeners=PLAINTEXT://:9092
-# Hostname and port the broker will advertise to producers and consumers. If not set,
-# it uses the value for "listeners" if configured.  Otherwise, it will use the value
-# returned from java.net.InetAddress.getCanonicalHostName().
-#advertised.listeners=PLAINTEXT://your.host.name:9092
-# The number of threads handling network requests
-num.network.threads=3
-# The number of threads doing disk I/O
-num.io.threads=8
-# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer.bytes=102400
-# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer.bytes=102400
-# The maximum size of a request that the socket server will accept (protection against OOM)
-socket.request.max.bytes=104857600
-############################# Log Basics #############################
-# A comma seperated list of directories under which to store log files
-log.dirs=/tmp/dev-kafka-logs
-# The default number of log partitions per topic. More partitions allow greater
-# parallelism for consumption, but this will also result in more files across
-# the brokers.
-num.partitions=1
-# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
-# This value is recommended to be increased for installations with data dirs located in RAID array.
-num.recovery.threads.per.data.dir=1
-############################# Log Flush Policy #############################
-# Messages are immediately written to the filesystem but by default we only fsync() to sync
-# the OS cache lazily. The following configurations control the flush of data to disk.
-# There are a few important trade-offs here:
-#    1. Durability: Unflushed data may be lost if you are not using replication.
-#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
-#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
-# The settings below allow one to configure the flush policy to flush data after a period of time or
-# every N messages (or both). This can be done globally and overridden on a per-topic basis.
-# The number of messages to accept before forcing a flush of data to disk
-#log.flush.interval.messages=10000
-# The maximum amount of time a message can sit in a log before we force a flush
-#log.flush.interval.ms=1000
-############################# Log Retention Policy #############################
-# The following configurations control the disposal of log segments. The policy can
-# be set to delete segments after a period of time, or after a given size has accumulated.
-# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
-# from the end of the log.
-# The minimum age of a log file to be eligible for deletion
-log.retention.hours=168
-# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.bytes.
-#log.retention.bytes=1073741824
-# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.segment.bytes=1073741824
-# The interval at which log segments are checked to see if they can be deleted according
-# to the retention policies
-log.retention.check.interval.ms=300000
-############################# Zookeeper #############################
-# Zookeeper connection string (see zookeeper docs for details).
-# This is a comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
-# You can also append an optional chroot string to the urls to specify the
-# root directory for all kafka znodes.
-zookeeper.connect=localhost:2181
-# Timeout in ms for connecting to zookeeper
-zookeeper.connection.timeout.ms=6000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/log4j.properties
deleted file mode 100644
index 9c6875d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/log4j.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-log4j.rootLogger=INFO, stdout
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/zookeeper-server.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/zookeeper-server.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/zookeeper-server.properties
deleted file mode 100644
index dd71ffc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/zookeeper-server.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=/tmp/dev-zookeeper-data
-# the port at which the clients will connect
-clientPort=2181
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/pom.xml
deleted file mode 100644
index 2860e52..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/pom.xml
+++ /dev/null
@@ -1,108 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
-	~ contributor license agreements. See the NOTICE file distributed with ~
-	this work for additional information regarding copyright ownership. ~ The
-	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
-	"License"); you may not use this file except in compliance with ~ the License.
-	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
-	~ ~ Unless required by applicable law or agreed to in writing, software ~
-	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
-	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
-	License for the specific language governing permissions and ~ limitations
-	under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.eagle</groupId>
-        <artifactId>eagle-alert</artifactId>
-        <version>0.5.0-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>alert-devtools</artifactId>
-    <name>Eagle::Core::Alert::DevTools</name>
-
-    <properties>
-        <maven-scala.version>2.15.0</maven-scala.version>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>alert-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>${kafka.artifact.id}</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <sourceDirectory>src/main/java</sourceDirectory>
-
-        <plugins>
-            <plugin>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>${maven-compiler.version}</version>
-            </plugin>
-            <!-- <plugin>
-                <groupId>org.scala-tools</groupId>
-                <artifactId>maven-scala-plugin</artifactId>
-                <version>${maven-scala.version}</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>compile</goal>
-                            <goal>testCompile</goal>
-                        </goals>
-                        <configuration>
-                            <args>
-                                <arg>-make:transitive</arg>
-                                <arg>-dependencyfile</arg>
-                                <arg>${project.build.directory}/.scala_dependencies</arg>
-                            </args>
-                            <sourceDir>src/main/scala</sourceDir>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin> -->
-
-            <plugin>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <phase>install</phase>
-                        <goals>
-                            <goal>copy-dependencies</goal>
-                        </goals>
-                        <configuration>
-                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
deleted file mode 100644
index 8f98d71..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.tools;
-
-import java.util.Map;
-
-public class KafkaConsumerOffset {
-    public Map<String, String> topology;
-    public Long offset;
-    public Long partition;
-    public Map<String, String> broker;
-    public String topic;
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
deleted file mode 100644
index 04a8cba..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.tools;
-
-import org.apache.eagle.alert.config.ZKConfig;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class KafkaConsumerOffsetFetcher {
-    public CuratorFramework curator;
-    public String zkRoot;
-    public ObjectMapper mapper;
-    public String zkPathToPartition;
-
-    public KafkaConsumerOffsetFetcher(ZKConfig config, String... parameters) {
-        try {
-            this.curator = CuratorFrameworkFactory.newClient(config.zkQuorum, config.zkSessionTimeoutMs, 15000,
-                new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval));
-            curator.start();
-            this.zkRoot = config.zkRoot;
-            mapper = new ObjectMapper();
-            Module module = new SimpleModule("offset").registerSubtypes(new NamedType(KafkaConsumerOffset.class));
-            mapper.registerModule(module);
-            zkPathToPartition = String.format(config.zkRoot, parameters);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Map<String, Long> fetch() throws Exception {
-        Map<String, Long> map = new HashMap<String, Long>();
-        if (curator.checkExists().forPath(zkPathToPartition) != null) {
-            List<String> partitions = curator.getChildren().forPath(zkPathToPartition);
-            for (String partition : partitions) {
-                String partitionPath = zkPathToPartition + "/" + partition;
-                String data = new String(curator.getData().forPath(partitionPath));
-                KafkaConsumerOffset offset = mapper.readValue(data, KafkaConsumerOffset.class);
-                map.put(partition, offset.offset);
-            }
-        }
-        return map;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
deleted file mode 100644
index e547a25..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.tools;
-
-
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-import java.util.*;
-
-public class KafkaLatestOffsetFetcher {
-
-    private List<String> brokerList;
-    private int port;
-
-    public KafkaLatestOffsetFetcher(String brokerList) {
-        this.brokerList = new ArrayList<>();
-        String[] brokers = brokerList.split(",");
-        for (String broker : brokers) {
-            this.brokerList.add(broker.split(":")[0]);
-        }
-        this.port = Integer.valueOf(brokers[0].split(":")[1]);
-    }
-
-    public Map<Integer, Long> fetch(String topic, int partitionCount) {
-        Map<Integer, PartitionMetadata> metadatas = fetchPartitionMetadata(brokerList, port, topic, partitionCount);
-        Map<Integer, Long> ret = new HashMap<>();
-        for (int partition = 0; partition < partitionCount; partition++) {
-            PartitionMetadata metadata = metadatas.get(partition);
-            if (metadata == null || metadata.leader() == null) {
-                ret.put(partition, -1L);
-                //throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
-            }
-            String leadBroker = metadata.leader().host();
-            String clientName = "Client_" + topic + "_" + partition;
-            SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
-            long latestOffset = getLatestOffset(consumer, topic, partition, clientName);
-            if (consumer != null) {
-                consumer.close();
-            }
-            ret.put(partition, latestOffset);
-        }
-        return ret;
-    }
-
-    public long getLatestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
-        Map<TopicAndPartition, kafka.api.PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
-        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
-        OffsetResponse response = consumer.getOffsetsBefore(request);
-        if (response.hasError()) {
-            throw new RuntimeException("Error fetching data offset from the broker. Reason: " + response.errorCode(topic, partition));
-        }
-        long[] offsets = response.offsets(topic, partition);
-        return offsets[0];
-    }
-
-    private Map<Integer, PartitionMetadata> fetchPartitionMetadata(List<String> brokerList, int port, String topic, int partitionCount) {
-        Map<Integer, PartitionMetadata> partitionMetadata = new HashMap<>();
-        for (String broker : brokerList) {
-            SimpleConsumer consumer = null;
-            try {
-                consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, "leaderLookup");
-                List<String> topics = Collections.singletonList(topic);
-                TopicMetadataRequest req = new TopicMetadataRequest(topics);
-                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
-                List<TopicMetadata> metaData = resp.topicsMetadata();
-                for (TopicMetadata item : metaData) {
-                    for (PartitionMetadata part : item.partitionsMetadata()) {
-                        partitionMetadata.put(part.partitionId(), part);
-                    }
-                }
-                if (partitionMetadata.size() == partitionCount) {
-                    break;
-                }
-            } catch (Exception e) {
-                throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: ", e);
-            } finally {
-                if (consumer != null) {
-                    consumer.close();
-                }
-            }
-        }
-        return partitionMetadata;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
deleted file mode 100644
index b89737a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.contrib.kafka
-
-import java.io._
-import java.nio.charset.StandardCharsets
-import java.util.Properties
-import java.util.concurrent.Executors
-
-import joptsimple._
-import kafka.message._
-import kafka.producer.ConsoleProducer.{LineMessageReader, MessageReader}
-import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import kafka.serializer._
-import org.apache.commons.io.FileUtils
-
-import scala.collection.JavaConversions._
-
-object ProducerTool {
-  def main(args: Array[String]) {
-    val parser = new OptionParser
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
-      .withRequiredArg
-      .describedAs("topic")
-      .ofType(classOf[String])
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
-      .withRequiredArg
-      .describedAs("broker-list")
-      .ofType(classOf[String])
-    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
-    val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
-    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(200)
-    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.")
-      .withRequiredArg
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3)
-    val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
-      .withRequiredArg
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(100l)
-    val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
-      " a message will queue awaiting suffient batch size. The value is given in ms.")
-      .withRequiredArg
-      .describedAs("timeout_ms")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(1000l)
-    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
-      " messages will queue awaiting suffient batch size.")
-      .withRequiredArg
-      .describedAs("queue_size")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(10000l)
-    val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
-      .withRequiredArg
-      .describedAs("queue enqueuetimeout ms")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(Int.MaxValue.toLong)
-    val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
-      .withRequiredArg
-      .describedAs("request required acks")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-    val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
-      .withRequiredArg
-      .describedAs("request timeout ms")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1500)
-    val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
-      .withRequiredArg
-      .describedAs("encoder_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[StringEncoder].getName)
-    val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
-      .withRequiredArg
-      .describedAs("encoder_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[StringEncoder].getName)
-    val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
-      "By default each line is read as a separate message.")
-      .withRequiredArg
-      .describedAs("reader_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[LineMessageReader].getName)
-    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1024 * 100)
-    val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
-      "This allows custom configuration for a user-defined message reader.")
-      .withRequiredArg
-      .describedAs("prop")
-      .ofType(classOf[String])
-
-    val dataOpt = parser.accepts("data", "Input message data content")
-      .withRequiredArg()
-      .describedAs("input message data")
-      .ofType(classOf[String])
-
-    val fileOpt = parser.accepts("file", "Input message file name")
-      .withRequiredArg()
-      .describedAs("input message file")
-      .ofType(classOf[String])
-
-    val replicationOpt = parser.accepts("replication", "Message data replication count")
-      .withRequiredArg()
-      .describedAs("message replication count, default: 1")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-
-    val options = parser.parse(args: _*)
-    for (arg <- List(topicOpt, brokerListOpt)) {
-      if (!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    val topic = options.valueOf(topicOpt)
-    val brokerList = options.valueOf(brokerListOpt)
-    val sync = options.has(syncOpt)
-    val compress = options.has(compressOpt)
-    val batchSize = options.valueOf(batchSizeOpt)
-    val sendTimeout = options.valueOf(sendTimeoutOpt)
-    val queueSize = options.valueOf(queueSizeOpt)
-    val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
-    val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
-    val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
-    val keyEncoderClass = options.valueOf(keyEncoderOpt)
-    val valueEncoderClass = options.valueOf(valueEncoderOpt)
-    val readerClass = options.valueOf(messageReaderOpt)
-    val socketBuffer = options.valueOf(socketBufferSizeOpt)
-    val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
-    val messageData = options.valuesOf(dataOpt)
-    val messageFile = options.valuesOf(fileOpt)
-    val messageReplication = options.valueOf(replicationOpt)
-
-    cmdLineProps.put("topic", topic)
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-    val codec = if (compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
-    props.put("compression.codec", codec.toString)
-    props.put("producer.type", if (sync) "sync" else "async")
-    if (options.has(batchSizeOpt))
-      props.put("batch.num.messages", batchSize.toString)
-
-    props.put("message.send.max.retries", options.valueOf(messageSendMaxRetriesOpt).toString)
-    props.put("retry.backoff.ms", options.valueOf(retryBackoffMsOpt).toString)
-    props.put("queue.buffering.max.ms", sendTimeout.toString)
-    props.put("queue.buffering.max.messages", queueSize.toString)
-    props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
-    props.put("request.required.acks", requestRequiredAcks.toString)
-    props.put("request.timeout.ms", requestTimeoutMs.toString)
-    props.put("key.serializer.class", keyEncoderClass)
-    props.put("serializer.class", valueEncoderClass)
-    props.put("send.buffer.bytes", socketBuffer.toString)
-
-    val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
-
-    if (messageData.size() > 0) {
-      reader.init(new ByteArrayInputStream(messageData.get(0).getBytes(StandardCharsets.UTF_8)), cmdLineProps)
-    } else if (messageFile.size() > 0) {
-      reader.init(FileUtils.openInputStream(new File(messageFile.get(0))), cmdLineProps)
-    } else {
-      reader.init(System.in, cmdLineProps)
-    }
-
-    try {
-      val executor = Executors.newCachedThreadPool()
-      val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
-      Runtime.getRuntime.addShutdownHook(new Thread() {
-        override def run(): Unit = {
-          producer.close()
-          executor.shutdown()
-        }
-      })
-
-      var message: KeyedMessage[AnyRef, AnyRef] = null
-      do {
-        message = reader.readMessage()
-        if (message != null) {
-          var i = 0
-          while (i < messageReplication) {
-            executor.submit(new Runnable {
-              override def run(): Unit = producer.send(message)
-            }).get()
-            i += 1
-          }
-          System.out.println("Produced %d messages".format(messageReplication))
-        }
-      } while (message != null)
-    } catch {
-      case e: Exception =>
-        e.printStackTrace()
-        System.exit(1)
-    }
-    System.exit(0)
-  }
-
-  def parseLineReaderArgs(args: Iterable[String]): Properties = {
-    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
-    if (!splits.forall(_.length == 2)) {
-      System.err.println("Invalid line reader properties: " + args.mkString(" "))
-      System.exit(1)
-    }
-    val props = new Properties
-    for (a <- splits)
-      props.put(a(0), a(1))
-    props
-  }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
deleted file mode 100644
index 89b2921..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.eagle.alert.tools;
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Map;
-
-public class TestKafkaOffset {
-    @Ignore
-    @Test
-    public void test() throws Exception {
-        System.setProperty("config.resource", "/kafka-offset-test.application.conf");
-        Config config = ConfigFactory.load();
-        ZKConfig zkConfig = new ZKConfig();
-        zkConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
-        zkConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot");
-        zkConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
-        zkConfig.connectionTimeoutMs = config.getInt("dataSourceConfig.connectionTimeoutMs");
-        zkConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
-        zkConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
-
-        String topic = "testTopic1";
-        String topology = "alertUnitTopology_1";
-
-        while (true) {
-            KafkaConsumerOffsetFetcher consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(zkConfig, topic, topology);
-            String kafkaBrokerList = config.getString("dataSourceConfig.kafkaBrokerList");
-            KafkaLatestOffsetFetcher latestOffsetFetcher = new KafkaLatestOffsetFetcher(kafkaBrokerList);
-
-            Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
-            if (consumedOffset.size() == 0) {
-                System.out.println("no any consumer offset found for this topic " + topic);
-            }
-            Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(topic, consumedOffset.size());
-            if (latestOffset.size() == 0) {
-                System.out.println("no any latest offset found for this topic " + topic);
-            }
-            for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
-                String partition = entry.getKey();
-                Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
-                Long lag = latestOffset.get(partitionNumber) - entry.getValue();
-                System.out.println(String.format("parition %s, total: %d, consumed: %d, lag: %d",
-                    partition, latestOffset.get(partitionNumber), entry.getValue(), lag));
-            }
-            Thread.sleep(10000);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
deleted file mode 100644
index 1da6fb1..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"dataSourceConfig": {
-  "kafkaBrokerList": "kafka-broker:9092",
-  "zkQuorum": "zk-admin:2181",
-  "transactionZKRoot": "/consumers/eagle_consumer/%s/%s",
-  "zkSessionTimeoutMs": 10000,
-  "connectionTimeoutMs": 10000,
-  "zkRetryTimes": 3,
-  "zkRetryInterval": 3000
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
deleted file mode 100644
index 9c6875d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-log4j.rootLogger=INFO, stdout
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/.gitignore b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/.gitignore
deleted file mode 100644
index b83d222..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
deleted file mode 100644
index 78c0bc4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
+++ /dev/null
@@ -1,180 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one
-	or more ~ * contributor license agreements. See the NOTICE file distributed
-	with ~ * this work for additional information regarding copyright ownership.
-	~ * The ASF licenses this file to You under the Apache License, Version 2.0
-	~ * (the "License"); you may not use this file except in compliance with
-	~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0
-	~ * ~ * Unless required by applicable law or agreed to in writing, software
-	~ * distributed under the License is distributed on an "AS IS" BASIS, ~ *
-	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-	~ * See the License for the specific language governing permissions and ~
-	* limitations under the License. ~ */ -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.eagle</groupId>
-        <artifactId>eagle-alert</artifactId>
-        <version>0.5.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>alert-engine</artifactId>
-    <name>Eagle::Core::Alert::Engine</name>
-    <packaging>jar</packaging>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>alert-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>${kafka.artifact.id}</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>com.sun.jersey</groupId>
-            <artifactId>jersey-client</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.netflix.archaius</groupId>
-            <artifactId>archaius-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.wso2.siddhi</groupId>
-            <artifactId>siddhi-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.wso2.siddhi</groupId>
-            <artifactId>siddhi-query-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.wso2.siddhi</groupId>
-            <artifactId>siddhi-query-compiler</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.wso2.siddhi</groupId>
-            <artifactId>siddhi-extension-regex</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.wso2.siddhi</groupId>
-            <artifactId>siddhi-extension-string</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.101tec</groupId>
-            <artifactId>zkclient</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>joda-time</groupId>
-            <artifactId>joda-time</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.typesafe</groupId>
-            <artifactId>config</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.dropwizard.metrics</groupId>
-            <artifactId>metrics-jvm</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.mapdb</groupId>
-            <artifactId>mapdb</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.velocity</groupId>
-            <artifactId>velocity</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>javax.mail</groupId>
-            <artifactId>mail</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-cli</groupId>
-            <artifactId>commons-cli</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.mongodb</groupId>
-            <artifactId>mongo-java-driver</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.ullink.slack</groupId>
-            <artifactId>simpleslackapi</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>de.flapdoodle.embed</groupId>
-            <artifactId>de.flapdoodle.embed.mongo</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>dumbster</groupId>
-            <artifactId>dumbster</artifactId>
-            <version>1.6</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <version>2.6</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>test-jar</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-source-plugin</artifactId>
-                <version>2.1.2</version>
-                <executions>
-                    <execution>
-                        <id>attach-sources</id>
-                        <phase>verify</phase>
-                        <goals>
-                            <goal>jar-no-fork</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
deleted file mode 100755
index 4e3c275..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.eagle.alert.engine;
-
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface AlertStreamCollector extends Collector<AlertStreamEvent> {
-    /**
-     * No need to be thread-safe, but should be called on in synchronous like in Storm bolt execute method.
-     */
-    void flush();
-
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
deleted file mode 100755
index d1a2b56..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.eagle.alert.engine;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@FunctionalInterface
-public interface Collector<T> {
-    /**
-     * Must make sure thread-safe.
-     *
-     * @param t
-     */
-    void emit(T t);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
deleted file mode 100755
index 7310d8b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-/**
- * Executed in thread-safe trace.
- */
-public interface PartitionedEventCollector extends Collector<PartitionedEvent> {
-    /**
-     * @param event to be dropped.
-     */
-    void drop(PartitionedEvent event);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java
deleted file mode 100644
index 26760ab..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine;
-
-import backtype.storm.metric.api.MultiCountMetric;
-
-public class StormMultiCountMetric implements StreamCounter {
-    private MultiCountMetric countMetric;
-
-    public StormMultiCountMetric(MultiCountMetric counter) {
-        this.countMetric = counter;
-    }
-
-    @Override
-    public void incr(String scopeName) {
-        countMetric.scope(scopeName).incr();
-    }
-
-    @Override
-    public void incrBy(String scopeName, int length) {
-        countMetric.scope(scopeName).incrBy(length);
-    }
-
-    @Override
-    public void scope(String scopeName) {
-        countMetric.scope(scopeName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
deleted file mode 100644
index c2d5f2e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine;
-
-import com.typesafe.config.Config;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface StreamContext {
-    StreamCounter counter();
-
-    Config config();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
deleted file mode 100644
index e77a41b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.TopologyContext;
-import com.typesafe.config.Config;
-
-public class StreamContextImpl implements StreamContext {
-    private final Config config;
-    private final StreamCounter counter;
-
-    public StreamContextImpl(Config config, MultiCountMetric counter, TopologyContext context) {
-        this.counter = new StormMultiCountMetric(counter);
-        this.config = config;
-    }
-
-    @Override
-    public StreamCounter counter() {
-        return this.counter;
-    }
-
-    @Override
-    public Config config() {
-        return this.config;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java
deleted file mode 100644
index 2d9ea69..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine;
-
-public interface StreamCounter {
-    void incr(String scopeName);
-
-    void incrBy(String scopeName, int length);
-
-    void scope(String scopeName);
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
deleted file mode 100644
index 055032f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Options;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-
-/**
- * Make sure unit topology can be started either from command line
- * or from remote A few parameters for starting unit topology 1. number of spout
- * tasks 2. number of router bolts 3. number of alert bolts 4. number of publish
- * bolts
- *
- * <p>Connections 1. spout and router bolt 2. router bolt and alert bolt 3. alert
- * bolt and publish bolt.</p>
- *
- * @since 5/3/16.
- *
- */
-public class UnitTopologyMain {
-
-    public static void main(String[] args) throws Exception {
-        // command line parse
-        Options options = new Options();
-        options.addOption("c", true,
-            "config URL (valid file name) - defaults application.conf according to typesafe config default behavior.");
-        CommandLineParser parser = new DefaultParser();
-        CommandLine cmd = parser.parse(options, args);
-
-        if (cmd.hasOption("c")) {
-            String fileName = cmd.getOptionValue("c", "application.conf");
-            System.setProperty("config.resource", fileName.startsWith("/") ? fileName : "/" + fileName);
-            ConfigFactory.invalidateCaches();
-        }
-        Config config = ConfigFactory.load();
-
-        // load config and start
-        String topologyId = getTopologyName(config);
-        ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
-        new UnitTopologyRunner(changeNotifyService).run(topologyId, config);
-    }
-
-    public static void runTopology(Config config, backtype.storm.Config stormConfig) {
-        // load config and start
-        String topologyId = getTopologyName(config);
-        ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
-        new UnitTopologyRunner(changeNotifyService, stormConfig).run(topologyId, config);
-    }
-
-    private static ZKMetadataChangeNotifyService createZKNotifyService(Config config, String topologyId) {
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
-        return changeNotifyService;
-    }
-
-    public static StormTopology createTopology(Config config) {
-        String topologyId = getTopologyName(config);
-        ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
-
-        return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config).createTopology();
-    }
-    
-    /**
-     * Returns a builder instead of topology itself. This make it possible to run storm-flink conversion.
-     * 
-     * @param config
-     * @return
-     */
-    public static TopologyBuilder createTopologyBuilder(Config config) {
-        String topologyId = getTopologyName(config);
-        ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
-
-        return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config);
-    }
-
-    /**
-     * Try to get topology name from app framework .e.g "appId" or "topology.name"
-     */
-    private static String getTopologyName(Config config) {
-        if (config.hasPath("topology.name")) {
-            return config.getString("topology.name");
-        } else if (config.hasPath("appId")) {
-            return config.getString("appId");
-        } else {
-            throw new IllegalStateException("Not topology.name or appId provided from config: " + config.toString());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
deleted file mode 100644
index dab9f5a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
-import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
-import org.apache.eagle.alert.engine.router.SpoutSpecListener;
-import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
-import com.typesafe.config.Config;
-
-import java.io.Closeable;
-import java.io.Serializable;
-
-/**
- * IMetadataChangeNotifyService defines the following features
- * 1) initialization
- * 2) register metadata change listener
- *
- * <p>In distributed environment for example storm platform,
- * subclass implementing this interface should have the following lifecycle
- * 1. object is created in client machine
- * 2. object is serialized and transferred to other machine
- * 3. object is created through deserialization
- * 4. invoke init() method to do initialization
- * 5. invoke various registerListener to get notified of config change
- * 6. invoke close() to release system resource</p>
- */
-public interface IMetadataChangeNotifyService extends Closeable, Serializable {
-
-    void init(Config config, MetadataType type);
-
-    void registerListener(SpoutSpecListener listener);
-
-    void registerListener(AlertBoltSpecListener listener);
-
-    void registerListener(StreamRouterBoltSpecListener listener);
-
-    void registerListener(AlertPublishSpecListener listener);
-
-    default void activateFetchMetaData() throws Exception {
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
deleted file mode 100644
index ac68e26..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.coordinator;
-
-/**
- * Since 5/4/16.
- */
-public enum MetadataType {
-    SPOUT,
-    STREAM_ROUTER_BOLT,
-    ALERT_BOLT,
-    ALERT_PUBLISH_BOLT
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamNotDefinedException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamNotDefinedException.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamNotDefinedException.java
deleted file mode 100644
index e44c630..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamNotDefinedException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.io.IOException;
-
-public class StreamNotDefinedException extends IOException {
-    private static final long serialVersionUID = 6027811718016485808L;
-
-    public StreamNotDefinedException() {
-    }
-
-    public StreamNotDefinedException(String streamId) {
-        super("Stream definition not found: " + streamId);
-    }
-
-    public StreamNotDefinedException(String streamName, String specVersion) {
-        super(String.format("Stream '%s' not found! Current spec version '%s'. Possibly metadata not loaded or metadata mismatch between upstream and alert bolts yet!", streamName, specVersion));
-    }
-
-    public StreamNotDefinedException(String streamName, String streamMetaVersion, String specVersion) {
-        super(String.format("Stream '%s' has meta version '%s' which is different from current spec version '%s'.", streamName, streamMetaVersion, specVersion));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
deleted file mode 100755
index 3264e61..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator.impl;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
-import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
-import org.apache.eagle.alert.engine.router.SpoutSpecListener;
-import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * notify 3 components of metadata change Spout, StreamRouterBolt and AlertBolt.
- */
-@SuppressWarnings( {"serial"})
-public abstract class AbstractMetadataChangeNotifyService implements IMetadataChangeNotifyService, Closeable, Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractMetadataChangeNotifyService.class);
-    private final List<StreamRouterBoltSpecListener> streamRouterBoltSpecListeners = new ArrayList<>();
-    private final List<SpoutSpecListener> spoutSpecListeners = new ArrayList<>();
-    private final List<AlertBoltSpecListener> alertBoltSpecListeners = new ArrayList<>();
-    private final List<AlertPublishSpecListener> alertPublishSpecListeners = new ArrayList<>();
-    protected MetadataType type;
-
-    @Override
-    public void init(Config config, MetadataType type) {
-        this.type = type;
-    }
-
-    @Override
-    public void registerListener(AlertPublishSpecListener listener) {
-        synchronized (alertBoltSpecListeners) {
-            Preconditions.checkNotNull(alertPublishSpecListeners, "Not initialized yet");
-            LOG.info("Register {}", listener);
-            alertPublishSpecListeners.add(listener);
-        }
-    }
-
-    @Override
-    public void registerListener(StreamRouterBoltSpecListener listener) {
-        synchronized (streamRouterBoltSpecListeners) {
-            streamRouterBoltSpecListeners.add(listener);
-        }
-    }
-
-    @Override
-    public void registerListener(AlertBoltSpecListener listener) {
-        synchronized (alertBoltSpecListeners) {
-            alertBoltSpecListeners.add(listener);
-        }
-    }
-
-    @Override
-    public void registerListener(SpoutSpecListener listener) {
-        synchronized (spoutSpecListeners) {
-            spoutSpecListeners.add(listener);
-        }
-    }
-
-    protected void notifySpout(SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) {
-        spoutSpecListeners.forEach(s -> s.onSpoutSpecChange(spoutSpec, sds));
-    }
-
-    protected void notifyStreamRouterBolt(RouterSpec routerSpec, Map<String, StreamDefinition> sds) {
-        streamRouterBoltSpecListeners.forEach(s -> s.onStreamRouteBoltSpecChange(routerSpec, sds));
-    }
-
-    protected void notifyAlertBolt(AlertBoltSpec alertBoltSpec, Map<String, StreamDefinition> sds) {
-        alertBoltSpecListeners.forEach(s -> s.onAlertBoltSpecChange(alertBoltSpec, sds));
-    }
-
-    protected void notifyAlertPublishBolt(PublishSpec alertPublishSpec, Map<String, StreamDefinition> sds) {
-        alertPublishSpecListeners.forEach(s -> s.onAlertPublishSpecChange(alertPublishSpec, sds));
-    }
-
-    protected void notifyAlertPublishBolt(Map<String, PolicyDefinition> pds, Map<String, StreamDefinition> sds) {
-        alertPublishSpecListeners.forEach(s -> s.onAlertPolicyChange(pds, sds));
-    }
-
-    public void close() throws IOException {
-        LOG.info("Closed");
-    }
-}
\ No newline at end of file


Mime
View raw message