eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [17/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:07:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-devtools/conf/zookeeper-server.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/conf/zookeeper-server.properties b/eagle-core/eagle-alert/alert/alert-devtools/conf/zookeeper-server.properties
deleted file mode 100644
index dd71ffc..0000000
--- a/eagle-core/eagle-alert/alert/alert-devtools/conf/zookeeper-server.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=/tmp/dev-zookeeper-data
-# the port at which the clients will connect
-clientPort=2181
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-devtools/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/pom.xml b/eagle-core/eagle-alert/alert/alert-devtools/pom.xml
deleted file mode 100644
index f6f1f2b..0000000
--- a/eagle-core/eagle-alert/alert/alert-devtools/pom.xml
+++ /dev/null
@@ -1,107 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
-	~ contributor license agreements. See the NOTICE file distributed with ~
-	this work for additional information regarding copyright ownership. ~ The
-	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
-	"License"); you may not use this file except in compliance with ~ the License.
-	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
-	~ ~ Unless required by applicable law or agreed to in writing, software ~
-	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
-	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
-	License for the specific language governing permissions and ~ limitations
-	under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.eagle</groupId>
-        <artifactId>alert-parent</artifactId>
-        <version>0.0.1-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>alert-devtools</artifactId>
-
-    <properties>
-        <maven-scala.version>2.15.0</maven-scala.version>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>alert-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>${kafka.artifact.id}</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <sourceDirectory>src/main/java</sourceDirectory>
-
-        <plugins>
-            <plugin>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>${maven-compiler.version}</version>
-            </plugin>
-            <!-- <plugin>
-                <groupId>org.scala-tools</groupId>
-                <artifactId>maven-scala-plugin</artifactId>
-                <version>${maven-scala.version}</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>compile</goal>
-                            <goal>testCompile</goal>
-                        </goals>
-                        <configuration>
-                            <args>
-                                <arg>-make:transitive</arg>
-                                <arg>-dependencyfile</arg>
-                                <arg>${project.build.directory}/.scala_dependencies</arg>
-                            </args>
-                            <sourceDir>src/main/scala</sourceDir>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin> -->
-
-            <plugin>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <phase>install</phase>
-                        <goals>
-                            <goal>copy-dependencies</goal>
-                        </goals>
-                        <configuration>
-                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java b/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
deleted file mode 100644
index 8f98d71..0000000
--- a/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.tools;
-
-import java.util.Map;
-
-public class KafkaConsumerOffset {
-    public Map<String, String> topology;
-    public Long offset;
-    public Long partition;
-    public Map<String, String> broker;
-    public String topic;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java b/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
deleted file mode 100644
index 4fe471c..0000000
--- a/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.tools;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.eagle.alert.config.ZKConfig;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class KafkaConsumerOffsetFetcher {
-    public CuratorFramework curator;
-    public String zkRoot;
-    public ObjectMapper mapper;
-    public String zkPathToPartition;
-
-    public KafkaConsumerOffsetFetcher(ZKConfig config, String ... parameters) {
-        try {
-            this.curator = CuratorFrameworkFactory.newClient(config.zkQuorum, config.zkSessionTimeoutMs, 15000,
-                    new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval));
-            curator.start();
-            this.zkRoot = config.zkRoot;
-            mapper = new ObjectMapper();
-            Module module = new SimpleModule("offset").registerSubtypes(new NamedType(KafkaConsumerOffset.class));
-            mapper.registerModule(module);
-            zkPathToPartition = String.format(config.zkRoot, parameters);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Map<String, Long> fetch() throws Exception {
-        Map<String, Long> map = new HashMap<String, Long>();
-        if (curator.checkExists().forPath(zkPathToPartition) != null) {
-            List<String> partitions = curator.getChildren().forPath(zkPathToPartition);
-            for (String partition : partitions) {
-                String partitionPath = zkPathToPartition + "/" + partition;
-                String data = new String(curator.getData().forPath(partitionPath));
-                KafkaConsumerOffset offset = mapper.readValue(data, KafkaConsumerOffset.class);
-                map.put(partition, offset.offset);
-            }
-        }
-        return map;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java b/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
deleted file mode 100644
index b685e5c..0000000
--- a/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.eagle.alert.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.util.*;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.*;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-public class KafkaLatestOffsetFetcher {
-
-    private List<String> brokerList;
-    private int port;
-
-    public KafkaLatestOffsetFetcher(String brokerList) {
-        this.brokerList = new ArrayList<>();
-        String[] brokers = brokerList.split(",");
-        for (String broker : brokers) {
-            this.brokerList.add(broker.split(":")[0]);
-        }
-        this.port = Integer.valueOf(brokers[0].split(":")[1]);
-    }
-
-    public Map<Integer, Long> fetch(String topic, int partitionCount) {
-        Map<Integer, PartitionMetadata> metadatas = fetchPartitionMetadata(brokerList, port, topic, partitionCount);
-        Map<Integer, Long> ret = new HashMap<>();
-        for (int partition = 0; partition < partitionCount; partition++) {
-            PartitionMetadata metadata = metadatas.get(partition);
-            if (metadata == null || metadata.leader() == null) {
-                ret.put(partition, -1L);
-                //throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
-            }
-            String leadBroker = metadata.leader().host();
-            String clientName = "Client_" + topic + "_" + partition;
-            SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
-            long latestOffset = getLatestOffset(consumer, topic, partition, clientName);
-            if (consumer != null) consumer.close();
-            ret.put(partition, latestOffset);
-        }
-        return ret;
-    }
-
-    public long getLatestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
-        Map<TopicAndPartition, kafka.api.PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
-        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
-        OffsetResponse response = consumer.getOffsetsBefore(request);
-        if (response.hasError()) {
-            throw new RuntimeException("Error fetching data offset from the broker. Reason: " + response.errorCode(topic, partition) );
-        }
-        long[] offsets = response.offsets(topic, partition);
-        return offsets[0];
-    }
-
-    private Map<Integer, PartitionMetadata> fetchPartitionMetadata(List<String> brokerList, int port, String topic, int partitionCount) {
-        Map<Integer, PartitionMetadata> partitionMetadata = new HashMap<>();
-        for (String broker : brokerList) {
-            SimpleConsumer consumer = null;
-            try {
-                consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, "leaderLookup");
-                List<String> topics = Collections.singletonList(topic);
-                TopicMetadataRequest req = new TopicMetadataRequest(topics);
-                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
-                List<TopicMetadata> metaData = resp.topicsMetadata();
-                for (TopicMetadata item : metaData) {
-                    for (PartitionMetadata part : item.partitionsMetadata()) {
-                        partitionMetadata.put(part.partitionId(), part);
-                    }
-                }
-                if (partitionMetadata.size() == partitionCount) break;
-            } catch (Exception e) {
-                throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: ", e);
-            } finally {
-                if (consumer != null) consumer.close();
-            }
-        }
-        return partitionMetadata;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala b/eagle-core/eagle-alert/alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
deleted file mode 100644
index 830e9ac..0000000
--- a/eagle-core/eagle-alert/alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.contrib.kafka
-
-import java.io._
-import java.nio.charset.StandardCharsets
-import java.util.Properties
-import java.util.concurrent.Executors
-
-import joptsimple._
-import kafka.message._
-import kafka.producer.ConsoleProducer.{LineMessageReader, MessageReader}
-import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import kafka.serializer._
-import org.apache.commons.io.FileUtils
-
-import scala.collection.JavaConversions._
-
-object ProducerTool {
-  def main(args: Array[String]) {
-    val parser = new OptionParser
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
-      .withRequiredArg
-      .describedAs("topic")
-      .ofType(classOf[String])
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
-      .withRequiredArg
-      .describedAs("broker-list")
-      .ofType(classOf[String])
-    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
-    val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
-    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(200)
-    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.")
-      .withRequiredArg
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3)
-    val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
-      .withRequiredArg
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(100l)
-    val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
-      " a message will queue awaiting suffient batch size. The value is given in ms.")
-      .withRequiredArg
-      .describedAs("timeout_ms")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(1000l)
-    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
-      " messages will queue awaiting suffient batch size.")
-      .withRequiredArg
-      .describedAs("queue_size")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(10000l)
-    val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
-      .withRequiredArg
-      .describedAs("queue enqueuetimeout ms")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(Int.MaxValue.toLong)
-    val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
-      .withRequiredArg
-      .describedAs("request required acks")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-    val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
-      .withRequiredArg
-      .describedAs("request timeout ms")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1500)
-    val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
-      .withRequiredArg
-      .describedAs("encoder_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[StringEncoder].getName)
-    val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
-      .withRequiredArg
-      .describedAs("encoder_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[StringEncoder].getName)
-    val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
-      "By default each line is read as a separate message.")
-      .withRequiredArg
-      .describedAs("reader_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[LineMessageReader].getName)
-    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1024 * 100)
-    val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
-      "This allows custom configuration for a user-defined message reader.")
-      .withRequiredArg
-      .describedAs("prop")
-      .ofType(classOf[String])
-
-    val dataOpt = parser.accepts("data", "Input message data content")
-      .withRequiredArg()
-      .describedAs("input message data")
-      .ofType(classOf[String])
-
-    val fileOpt = parser.accepts("file", "Input message file name")
-      .withRequiredArg()
-      .describedAs("input message file")
-      .ofType(classOf[String])
-
-    val replicationOpt = parser.accepts("replication", "Message data replication count")
-      .withRequiredArg()
-      .describedAs("message replication count, default: 1")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-
-    val options = parser.parse(args: _*)
-    for (arg <- List(topicOpt, brokerListOpt)) {
-      if (!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    val topic = options.valueOf(topicOpt)
-    val brokerList = options.valueOf(brokerListOpt)
-    val sync = options.has(syncOpt)
-    val compress = options.has(compressOpt)
-    val batchSize = options.valueOf(batchSizeOpt)
-    val sendTimeout = options.valueOf(sendTimeoutOpt)
-    val queueSize = options.valueOf(queueSizeOpt)
-    val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
-    val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
-    val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
-    val keyEncoderClass = options.valueOf(keyEncoderOpt)
-    val valueEncoderClass = options.valueOf(valueEncoderOpt)
-    val readerClass = options.valueOf(messageReaderOpt)
-    val socketBuffer = options.valueOf(socketBufferSizeOpt)
-    val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
-    val messageData = options.valuesOf(dataOpt)
-    val messageFile = options.valuesOf(fileOpt)
-    val messageReplication = options.valueOf(replicationOpt)
-
-    cmdLineProps.put("topic", topic)
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-    val codec = if (compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
-    props.put("compression.codec", codec.toString)
-    props.put("producer.type", if (sync) "sync" else "async")
-    if (options.has(batchSizeOpt))
-      props.put("batch.num.messages", batchSize.toString)
-
-    props.put("message.send.max.retries", options.valueOf(messageSendMaxRetriesOpt).toString)
-    props.put("retry.backoff.ms", options.valueOf(retryBackoffMsOpt).toString)
-    props.put("queue.buffering.max.ms", sendTimeout.toString)
-    props.put("queue.buffering.max.messages", queueSize.toString)
-    props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
-    props.put("request.required.acks", requestRequiredAcks.toString)
-    props.put("request.timeout.ms", requestTimeoutMs.toString)
-    props.put("key.serializer.class", keyEncoderClass)
-    props.put("serializer.class", valueEncoderClass)
-    props.put("send.buffer.bytes", socketBuffer.toString)
-
-    val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
-
-    if (messageData.size()>0) {
-      reader.init(new ByteArrayInputStream(messageData.get(0).getBytes(StandardCharsets.UTF_8)), cmdLineProps)
-    } else if (messageFile.size()>0) {
-      reader.init(FileUtils.openInputStream(new File(messageFile.get(0))), cmdLineProps)
-    } else {
-      reader.init(System.in, cmdLineProps)
-    }
-
-    try {
-      val executor = Executors.newCachedThreadPool()
-      val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
-      Runtime.getRuntime.addShutdownHook(new Thread() {
-        override def run(): Unit = {
-          producer.close()
-          executor.shutdown()
-        }
-      })
-
-      var message: KeyedMessage[AnyRef, AnyRef] = null
-      do {
-        message = reader.readMessage()
-        if (message != null) {
-          var i = 0
-          while (i < messageReplication) {
-            executor.submit(new Runnable {
-              override def run(): Unit = producer.send(message)
-            }).get()
-            i += 1
-          }
-          System.out.println("Produced %d messages".format(messageReplication))
-        }
-      } while (message != null)
-    } catch {
-      case e: Exception =>
-        e.printStackTrace()
-        System.exit(1)
-    }
-    System.exit(0)
-  }
-
-  def parseLineReaderArgs(args: Iterable[String]): Properties = {
-    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
-    if (!splits.forall(_.length == 2)) {
-      System.err.println("Invalid line reader properties: " + args.mkString(" "))
-      System.exit(1)
-    }
-    val props = new Properties
-    for (a <- splits)
-      props.put(a(0), a(1))
-    props
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java b/eagle-core/eagle-alert/alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
deleted file mode 100644
index e4d9f8f..0000000
--- a/eagle-core/eagle-alert/alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.eagle.alert.tools;
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Map;
-
-public class TestKafkaOffset {
-    @Ignore
-    @Test
-    public void test() throws Exception {
-        System.setProperty("config.resource", "/kafka-offset-test.application.conf");
-        Config config = ConfigFactory.load();
-        ZKConfig zkConfig = new ZKConfig();
-        zkConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
-        zkConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot");
-        zkConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
-        zkConfig.connectionTimeoutMs = config.getInt("dataSourceConfig.connectionTimeoutMs");
-        zkConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
-        zkConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
-
-        String topic = "testTopic1";
-        String topology = "alertUnitTopology_1";
-
-        while(true) {
-            KafkaConsumerOffsetFetcher consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(zkConfig, topic, topology);
-            String kafkaBrokerList = config.getString("dataSourceConfig.kafkaBrokerList");
-            KafkaLatestOffsetFetcher latestOffsetFetcher = new KafkaLatestOffsetFetcher(kafkaBrokerList);
-
-            Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
-            if(consumedOffset.size() == 0){
-                System.out.println("no any consumer offset found for this topic " + topic);
-            }
-            Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(topic, consumedOffset.size());
-            if(latestOffset.size() == 0){
-                System.out.println("no any latest offset found for this topic " + topic);
-            }
-            for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
-                String partition = entry.getKey();
-                Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
-                Long lag = latestOffset.get(partitionNumber) - entry.getValue();
-                System.out.println(String.format("parition %s, total: %d, consumed: %d, lag: %d",
-                        partition, latestOffset.get(partitionNumber), entry.getValue(), lag));
-            }
-            Thread.sleep(10000);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf b/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
deleted file mode 100644
index 4d2d4ff..0000000
--- a/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-"dataSourceConfig":{
-  "kafkaBrokerList" : "kafka-broker:9092",
-  "zkQuorum" : "zk-admin:2181",
-  "transactionZKRoot" : "/consumers/eagle_consumer/%s/%s",
-  "zkSessionTimeoutMs" : 10000,
-  "connectionTimeoutMs" : 10000,
-  "zkRetryTimes" : 3,
-  "zkRetryInterval" : 3000
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/log4j.properties b/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/log4j.properties
deleted file mode 100644
index d59ded6..0000000
--- a/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/.gitignore b/eagle-core/eagle-alert/alert/alert-engine/.gitignore
deleted file mode 100644
index b83d222..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/pom.xml b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/pom.xml
deleted file mode 100644
index c128540..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/pom.xml
+++ /dev/null
@@ -1,76 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
-	~ contributor license agreements. See the NOTICE file distributed with ~
-	this work for additional information regarding copyright ownership. ~ The
-	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
-	"License"); you may not use this file except in compliance with ~ the License.
-	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
-	~ ~ Unless required by applicable law or agreed to in writing, software ~
-	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
-	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
-	License for the specific language governing permissions and ~ limitations
-	under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.eagle</groupId>
-        <artifactId>alert-engine-parent</artifactId>
-        <version>0.0.1-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>alert-engine-base</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.dropwizard.metrics</groupId>
-            <artifactId>metrics-jvm</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.mapdb</groupId>
-            <artifactId>mapdb</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>joda-time</groupId>
-            <artifactId>joda-time</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.velocity</groupId>
-            <artifactId>velocity</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>javax.mail</groupId>
-            <artifactId>mail</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <version>2.6</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>test-jar</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
deleted file mode 100755
index 7219271..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.eagle.alert.engine;
-
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface AlertStreamCollector extends Collector<AlertStreamEvent> {
-    /**
-     * No need to be thread-safe, but should be called on in synchronous like in Storm bolt execute method
-     */
-    void flush();
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/Collector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/Collector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/Collector.java
deleted file mode 100755
index 2f136ab..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/Collector.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine;
-
-@FunctionalInterface
-public interface Collector<T> {
-    /**
-     * Must make sure thread-safe
-     *
-     * @param t
-     */
-    void emit(T t);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
deleted file mode 100755
index 3a99e98..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-/**
- * Executed in thread-safe trace
- */
-public interface PartitionedEventCollector extends Collector<PartitionedEvent> {
-    /**
-     * @param event to be dropped
-     */
-    void drop(PartitionedEvent event);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContext.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
deleted file mode 100644
index 609378b..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.eagle.alert.engine;
-
-import backtype.storm.metric.api.MultiCountMetric;
-
-import com.typesafe.config.Config;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface StreamContext {
-    MultiCountMetric counter();
-
-    Config config();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
deleted file mode 100644
index 9ebcb60..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.eagle.alert.engine;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.TopologyContext;
-
-import com.typesafe.config.Config;
-
-public class StreamContextImpl implements StreamContext {
-    private final Config config;
-    private final MultiCountMetric counter;
-
-    public StreamContextImpl(Config config, MultiCountMetric counter, TopologyContext context) {
-        this.counter=counter;
-        this.config = config;
-    }
-
-    @Override
-    public MultiCountMetric counter() {
-        return this.counter;
-    }
-
-    @Override
-    public Config config() {
-        return this.config;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
deleted file mode 100644
index 2bca329..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine;
-
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
-
-import backtype.storm.generated.StormTopology;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * Since 5/3/16. Make sure unit topology can be started either from command line
- * or from remote A few parameters for starting unit topology 1. number of spout
- * tasks 2. number of router bolts 3. number of alert bolts 4. number of publish
- * bolts
- *
- * Connections 1. spout and router bolt 2. router bolt and alert bolt 3. alert
- * bolt and publish bolt
- */
-public class UnitTopologyMain {
-
-    public static void main(String[] args) {
-        Config config = ConfigFactory.load();
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        String topologyId = config.getString("topology.name");
-        ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
-
-        new UnitTopologyRunner(changeNotifyService).run(topologyId, config);
-    }
-
-    public static StormTopology createTopology(Config config) {
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        String topologyId = config.getString("topology.name");
-        ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
-
-        return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
deleted file mode 100644
index 22fe56a..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.io.Closeable;
-import java.io.Serializable;
-
-import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
-import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
-import org.apache.eagle.alert.engine.router.SpoutSpecListener;
-import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
-
-import com.typesafe.config.Config;
-
-/**
- * IMetadataChangeNotifyService defines the following features
- * 1) initialization
- * 2) register metadata change listener
- *
- * In distributed environment for example storm platform,
- * subclass implementing this interface should have the following lifecycle
- * 1. object is created in client machine
- * 2. object is serialized and transferred to other machine
- * 3. object is created through deserialization
- * 4. invoke init() method to do initialization
- * 5. invoke various registerListener to get notified of config change
- * 6. invoke close() to release system resource
- */
-public interface IMetadataChangeNotifyService extends Closeable,Serializable {
-    /**
-     *
-     * @param config
-     */
-    void init(Config config, MetadataType type);
-
-    void registerListener(SpoutSpecListener listener);
-
-    void registerListener(AlertBoltSpecListener listener);
-
-    void registerListener(StreamRouterBoltSpecListener listener);
-
-    void registerListener(AlertPublishSpecListener listener);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
deleted file mode 100644
index ac68e26..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.coordinator;
-
-/**
- * Since 5/4/16.
- */
-public enum MetadataType {
-    SPOUT,
-    STREAM_ROUTER_BOLT,
-    ALERT_BOLT,
-    ALERT_PUBLISH_BOLT
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
deleted file mode 100644
index 927eb8c..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.eagle.alert.engine.coordinator;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class StreamDefinitionNotFoundException extends Exception {
-    private static final long serialVersionUID = 6027811718016485808L;
-
-    public StreamDefinitionNotFoundException(String streamId){
-        super("Stream definition not found: "+streamId);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
deleted file mode 100755
index 52fcc04..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator.impl;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
-import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
-import org.apache.eagle.alert.engine.router.SpoutSpecListener;
-import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-/**
- * notify 3 components of metadata change Spout, StreamRouterBolt and AlertBolt
- */
-@SuppressWarnings({ "serial" })
-public abstract class AbstractMetadataChangeNotifyService implements IMetadataChangeNotifyService, Closeable,
-        Serializable {
-    private final static Logger LOG = LoggerFactory.getLogger(AbstractMetadataChangeNotifyService.class);
-    private final List<StreamRouterBoltSpecListener> streamRouterBoltSpecListeners = new ArrayList<>();
-    private final List<SpoutSpecListener> spoutSpecListeners = new ArrayList<>();
-    private final List<AlertBoltSpecListener> alertBoltSpecListeners = new ArrayList<>();
-    private final List<AlertPublishSpecListener> alertPublishSpecListeners = new ArrayList<>();
-    protected MetadataType type;
-
-    /**
-     * @param config
-     */
-    @Override
-    public void init(Config config, MetadataType type) {
-        this.type = type;
-    }
-
-    @Override
-    public void registerListener(AlertPublishSpecListener listener) {
-        synchronized (alertBoltSpecListeners) {
-            Preconditions.checkNotNull(alertPublishSpecListeners, "Not initialized yet");
-            LOG.info("Register {}", listener);
-            alertPublishSpecListeners.add(listener);
-        }
-    }
-
-    @Override
-    public void registerListener(StreamRouterBoltSpecListener listener) {
-        synchronized (streamRouterBoltSpecListeners) {
-            streamRouterBoltSpecListeners.add(listener);
-        }
-    }
-
-    @Override
-    public void registerListener(AlertBoltSpecListener listener) {
-        synchronized (alertBoltSpecListeners) {
-            alertBoltSpecListeners.add(listener);
-        }
-    }
-
-    @Override
-    public void registerListener(SpoutSpecListener listener) {
-        synchronized (spoutSpecListeners) {
-            spoutSpecListeners.add(listener);
-        }
-    }
-
-    protected void notifySpout(SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) {
-        spoutSpecListeners.forEach(s -> s.onSpoutSpecChange(spoutSpec, sds));
-    }
-
-    protected void notifyStreamRouterBolt(RouterSpec routerSpec, Map<String, StreamDefinition> sds) {
-        streamRouterBoltSpecListeners.forEach(s -> s.onStreamRouteBoltSpecChange(routerSpec, sds));
-    }
-
-    protected void notifyAlertBolt(AlertBoltSpec alertBoltSpec, Map<String, StreamDefinition> sds) {
-        alertBoltSpecListeners.forEach(s -> s.onAlertBoltSpecChange(alertBoltSpec, sds));
-    }
-
-    protected void notifyAlertPublishBolt(PublishSpec alertPublishSpec, Map<String, StreamDefinition> sds) {
-        alertPublishSpecListeners.forEach(s -> s.onAlertPublishSpecChange(alertPublishSpec, sds));
-    }
-
-    public void close() throws IOException {
-        LOG.info("Closed");
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
deleted file mode 100755
index 1b93072..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.eagle.alert.config.ConfigBusConsumer;
-import org.apache.eagle.alert.config.ConfigChangeCallback;
-import org.apache.eagle.alert.config.ConfigValue;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.VersionedPolicyDefinition;
-import org.apache.eagle.alert.coordination.model.VersionedStreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * <b>TODO</b>: performance tuning: It is not JVM level service, so it may cause
- * zookeeper burden in case of too many listeners This does not support
- * dynamically adding topic, all topics should be available when service object
- * is created.
- * <p>
- * ZK path format is as following:
- * <ul>
- * <li>/alert/topology_1/spout</li>
- * <li>/alert/topology_1/router</li>
- * <li>/alert/topology_1/alert</li>
- * <li>/alert/topology_1/publisher</li>
- * </ul>
- */
-public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyService implements ConfigChangeCallback {
-    private static final long serialVersionUID = -1509237694501235144L;
-    private static final Logger LOG = LoggerFactory.getLogger(ZKMetadataChangeNotifyService.class);
-    private ZKConfig zkConfig;
-    private String topologyId;
-    private ConfigBusConsumer consumer;
-
-    private transient IMetadataServiceClient client;
-
-    public ZKMetadataChangeNotifyService(ZKConfig config, String topologyId) {
-        this.zkConfig = config;
-        this.topologyId = topologyId;
-    }
-
-    @Override
-    public void init(Config config, MetadataType type) {
-        super.init(config, type);
-        client = new MetadataServiceClientImpl(config);
-        consumer = new ConfigBusConsumer(zkConfig, topologyId + "/" + getMetadataTopicSuffix(), this);
-        LOG.info("init called for client");
-    }
-
-    /**
-     * @seeAlso Coordinator
-     * @return
-     */
-    private String getMetadataTopicSuffix() {
-        switch (type) {
-        case ALERT_BOLT:
-            return "alert";
-        case ALERT_PUBLISH_BOLT:
-            return "publisher";
-        case SPOUT:
-            return "spout";
-        case STREAM_ROUTER_BOLT:
-            return "router";
-        default:
-            throw new RuntimeException(String.format("unexpected metadata type: %s !", type));
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        consumer.close();
-        LOG.info("Config consumer closed");
-    }
-
-    @Override
-    public void onNewConfig(ConfigValue value) {
-        LOG.info("Metadata changed {}",value);
-
-        if (client == null) {
-            LOG.error("OnNewConfig trigger, but metadata service client is null. Metadata type {}", type);
-            return;
-        }
-
-        // analyze config value and notify corresponding listeners
-        String version = value.getValue().toString();
-        // brute-force load all: this might introduce load's on metadata service.
-        // FIXME : after ScheduleState persisted with better normalization, load
-        // state based on type and version
-        ScheduleState state = client.getVersionedSpec(version);
-        if (state == null) {
-            LOG.error("Failed to load schedule state of version {}, this is possibly a bug, pls check coordinator log !", version);
-            return;
-        }
-        Map<String, StreamDefinition> sds = getStreams(state.getStreamSnapshots());
-        switch (type) {
-        case ALERT_BOLT:
-            // we might query metadata service query get metadata snapshot and StreamDefinition
-            AlertBoltSpec alertSpec = state.getAlertSpecs().get(topologyId);
-            if (alertSpec == null) {
-                LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
-            } else {
-                prePopulate(alertSpec, state.getPolicySnapshots());
-                notifyAlertBolt(alertSpec, sds);
-            }
-            break;
-        case ALERT_PUBLISH_BOLT:
-            PublishSpec pubSpec = state.getPublishSpecs().get(topologyId);
-            if (pubSpec == null) {
-                LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
-            } else {
-                notifyAlertPublishBolt(pubSpec, sds);
-            }
-            break;
-        case SPOUT:
-            SpoutSpec spoutSpec = state.getSpoutSpecs().get(topologyId);
-            if (spoutSpec == null) {
-                LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
-            } else {
-                notifySpout(spoutSpec, sds);
-            }
-            break;
-        case STREAM_ROUTER_BOLT:
-            RouterSpec gSpec = state.getGroupSpecs().get(topologyId);
-            if (gSpec == null) {
-                LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
-            } else {
-                notifyStreamRouterBolt(gSpec, sds);
-            }
-            break;
-        default:
-            LOG.error("unexpected metadata type: {} ", type);
-        }
-    }
-
-    private void prePopulate(AlertBoltSpec alertSpec, List<VersionedPolicyDefinition> list) {
-        Map<String, PolicyDefinition> policyMap = listToMap(list);
-        for (Entry<String, List<String>> policyEntry : alertSpec.getBoltPolicyIdsMap().entrySet()) {
-            List<PolicyDefinition> pds = alertSpec.getBoltPoliciesMap().get(policyEntry.getKey());
-            if (pds == null) {
-                pds = new ArrayList<PolicyDefinition>();
-                alertSpec.getBoltPoliciesMap().put(policyEntry.getKey(), pds);
-            }
-            for (String policyName : policyEntry.getValue()) {
-                if (policyMap.containsKey(policyName)) {
-                    pds.add(policyMap.get(policyName));
-                }
-            }
-        }
-    }
-
-    private Map<String, StreamDefinition> getStreams(List<VersionedStreamDefinition> streamSnapshots) {
-        Map<String, StreamDefinition> result = new HashMap<String, StreamDefinition>();
-        for (VersionedStreamDefinition vsd : streamSnapshots) {
-            result.put(vsd.getDefinition().getStreamId(), vsd.getDefinition());
-        }
-        return result;
-    }
-
-    private Map<String, PolicyDefinition> listToMap(List<VersionedPolicyDefinition> listStreams) {
-        Map<String, PolicyDefinition> result = new HashMap<String, PolicyDefinition>();
-        for (VersionedPolicyDefinition sd : listStreams) {
-            result.put(sd.getDefinition().getName(), sd.getDefinition());
-        }
-        return result;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
deleted file mode 100644
index 22f1408..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.eagle.alert.engine.evaluator;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface PolicyChangeListener {
-    void onPolicyChange(List<PolicyDefinition> added,
-                        List<PolicyDefinition> removed,
-                        List<PolicyDefinition> modified, Map<String, StreamDefinition> sds);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
deleted file mode 100644
index dd3ca24..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.eagle.alert.engine.evaluator;
-
-import java.io.Serializable;
-
-import org.apache.eagle.alert.engine.AlertStreamCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * policy group refers to the policies which belong to the same MonitoredStream
- * 3 lifecycle steps are involved in PolicyGroupEvaluator
- * Step 1: create object. Be aware that in distributed environment, this object may be serialized and transferred across network
- * Step 2: init. This normally is invoked only once before nextEvent is invoked
- * Step 3: nextEvent
- * Step 4: close
- */
-public interface PolicyGroupEvaluator extends PolicyChangeListener, Serializable{
-    void init(StreamContext context, AlertStreamCollector collector);
-
-    /**
-     * Evaluate event
-     */
-    void nextEvent(PartitionedEvent event);
-
-    String getName();
-
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
deleted file mode 100644
index 2898ebc..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.eagle.alert.engine.evaluator;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-import backtype.storm.metric.api.MultiCountMetric;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class PolicyHandlerContext {
-    private PolicyDefinition policyDefinition;
-    private PolicyGroupEvaluator parentEvaluator;
-    private MultiCountMetric policyCounter;
-    private String policyEvaluatorId;
-
-    public PolicyDefinition getPolicyDefinition() {
-        return policyDefinition;
-    }
-
-    public void setPolicyDefinition(PolicyDefinition policyDefinition) {
-        this.policyDefinition = policyDefinition;
-    }
-
-    public PolicyGroupEvaluator getParentEvaluator() {
-        return parentEvaluator;
-    }
-
-    public void setParentEvaluator(PolicyGroupEvaluator parentEvaluator) {
-        this.parentEvaluator = parentEvaluator;
-    }
-
-    public void setPolicyCounter(MultiCountMetric metric) {
-        this.policyCounter = metric;
-    }
-
-    public MultiCountMetric getPolicyCounter() {
-        return policyCounter;
-    }
-
-    public String getPolicyEvaluatorId() {
-        return policyEvaluatorId;
-    }
-
-    public void setPolicyEvaluatorId(String policyEvaluatorId) {
-        this.policyEvaluatorId = policyEvaluatorId;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
deleted file mode 100755
index 069d321..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.eagle.alert.engine.evaluator;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface PolicyStreamHandler {
-    void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception;
-    void send(StreamEvent event) throws Exception;
-    void close() throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
deleted file mode 100644
index 2aa70e8..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
-
-public class PolicyStreamHandlers {
-    public static final String SIDDHI_ENGINE ="siddhi";
-
-    public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds){
-        if(SIDDHI_ENGINE.equals(type)) {
-            return new SiddhiPolicyHandler(sds);
-        }
-        throw new IllegalArgumentException("Illegal policy stream handler type: "+type);
-    }
-}
\ No newline at end of file


Mime
View raw message