eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [13/19] incubator-eagle git commit: EAGLE-324: Init branch-v0.5
Date Wed, 01 Jun 2016 05:56:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-devtools/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/pom.xml b/eagle-core/eagle-alert/alert/alert-devtools/pom.xml
new file mode 100644
index 0000000..365445c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.eagle</groupId>
+        <artifactId>alert-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>alert-devtools</artifactId>
+
+    <properties>
+        <maven-scala.version>2.15.0</maven-scala.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>alert-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>${kafka.artifact.id}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>${maven-compiler.version}</version>
+            </plugin>
+            <!-- <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <version>${maven-scala.version}</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                        <configuration>
+                            <args>
+                                <arg>-make:transitive</arg>
+                                <arg>-dependencyfile</arg>
+                                <arg>${project.build.directory}/.scala_dependencies</arg>
+                            </args>
+                            <sourceDir>src/main/scala</sourceDir>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin> -->
+
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>install</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java b/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
new file mode 100644
index 0000000..8f98d71
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.tools;
+
+import java.util.Map;
+
+public class KafkaConsumerOffset {
+    public Map<String, String> topology;
+    public Long offset;
+    public Long partition;
+    public Map<String, String> broker;
+    public String topic;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java b/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
new file mode 100644
index 0000000..4fe471c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.tools;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.eagle.alert.config.ZKConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaConsumerOffsetFetcher {
+    public CuratorFramework curator;
+    public String zkRoot;
+    public ObjectMapper mapper;
+    public String zkPathToPartition;
+
+    public KafkaConsumerOffsetFetcher(ZKConfig config, String ... parameters) {
+        try {
+            this.curator = CuratorFrameworkFactory.newClient(config.zkQuorum, config.zkSessionTimeoutMs, 15000,
+                    new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval));
+            curator.start();
+            this.zkRoot = config.zkRoot;
+            mapper = new ObjectMapper();
+            Module module = new SimpleModule("offset").registerSubtypes(new NamedType(KafkaConsumerOffset.class));
+            mapper.registerModule(module);
+            zkPathToPartition = String.format(config.zkRoot, parameters);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Map<String, Long> fetch() throws Exception {
+        Map<String, Long> map = new HashMap<String, Long>();
+        if (curator.checkExists().forPath(zkPathToPartition) != null) {
+            List<String> partitions = curator.getChildren().forPath(zkPathToPartition);
+            for (String partition : partitions) {
+                String partitionPath = zkPathToPartition + "/" + partition;
+                String data = new String(curator.getData().forPath(partitionPath));
+                KafkaConsumerOffset offset = mapper.readValue(data, KafkaConsumerOffset.class);
+                map.put(partition, offset.offset);
+            }
+        }
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java b/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
new file mode 100644
index 0000000..b685e5c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
@@ -0,0 +1,97 @@
+package org.apache.eagle.alert.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.*;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+
+public class KafkaLatestOffsetFetcher {
+
+    private List<String> brokerList;
+    private int port;
+
+    public KafkaLatestOffsetFetcher(String brokerList) {
+        this.brokerList = new ArrayList<>();
+        String[] brokers = brokerList.split(",");
+        for (String broker : brokers) {
+            this.brokerList.add(broker.split(":")[0]);
+        }
+        this.port = Integer.valueOf(brokers[0].split(":")[1]);
+    }
+
+    public Map<Integer, Long> fetch(String topic, int partitionCount) {
+        Map<Integer, PartitionMetadata> metadatas = fetchPartitionMetadata(brokerList, port, topic, partitionCount);
+        Map<Integer, Long> ret = new HashMap<>();
+        for (int partition = 0; partition < partitionCount; partition++) {
+            PartitionMetadata metadata = metadatas.get(partition);
+            if (metadata == null || metadata.leader() == null) {
+                ret.put(partition, -1L);
+                //throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
+            }
+            String leadBroker = metadata.leader().host();
+            String clientName = "Client_" + topic + "_" + partition;
+            SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+            long latestOffset = getLatestOffset(consumer, topic, partition, clientName);
+            if (consumer != null) consumer.close();
+            ret.put(partition, latestOffset);
+        }
+        return ret;
+    }
+
+    public long getLatestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+        Map<TopicAndPartition, kafka.api.PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
+        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+        OffsetResponse response = consumer.getOffsetsBefore(request);
+        if (response.hasError()) {
+            throw new RuntimeException("Error fetching data offset from the broker. Reason: " + response.errorCode(topic, partition) );
+        }
+        long[] offsets = response.offsets(topic, partition);
+        return offsets[0];
+    }
+
+    private Map<Integer, PartitionMetadata> fetchPartitionMetadata(List<String> brokerList, int port, String topic, int partitionCount) {
+        Map<Integer, PartitionMetadata> partitionMetadata = new HashMap<>();
+        for (String broker : brokerList) {
+            SimpleConsumer consumer = null;
+            try {
+                consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, "leaderLookup");
+                List<String> topics = Collections.singletonList(topic);
+                TopicMetadataRequest req = new TopicMetadataRequest(topics);
+                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+                List<TopicMetadata> metaData = resp.topicsMetadata();
+                for (TopicMetadata item : metaData) {
+                    for (PartitionMetadata part : item.partitionsMetadata()) {
+                        partitionMetadata.put(part.partitionId(), part);
+                    }
+                }
+                if (partitionMetadata.size() == partitionCount) break;
+            } catch (Exception e) {
+                throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: ", e);
+            } finally {
+                if (consumer != null) consumer.close();
+            }
+        }
+        return partitionMetadata;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala b/eagle-core/eagle-alert/alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
new file mode 100644
index 0000000..830e9ac
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.contrib.kafka
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.Properties
+import java.util.concurrent.Executors
+
+import joptsimple._
+import kafka.message._
+import kafka.producer.ConsoleProducer.{LineMessageReader, MessageReader}
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.serializer._
+import org.apache.commons.io.FileUtils
+
+import scala.collection.JavaConversions._
+
+object ProducerTool {
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+      .withRequiredArg
+      .describedAs("broker-list")
+      .ofType(classOf[String])
+    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
+    val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
+    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(200)
+    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.")
+      .withRequiredArg
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3)
+    val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
+      .withRequiredArg
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(100l)
+    val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+      " a message will queue awaiting suffient batch size. The value is given in ms.")
+      .withRequiredArg
+      .describedAs("timeout_ms")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(1000l)
+    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
+      " messages will queue awaiting suffient batch size.")
+      .withRequiredArg
+      .describedAs("queue_size")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(10000l)
+    val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
+      .withRequiredArg
+      .describedAs("queue enqueuetimeout ms")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(Int.MaxValue.toLong)
+    val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
+      .withRequiredArg
+      .describedAs("request required acks")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+    val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
+      .withRequiredArg
+      .describedAs("request timeout ms")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1500)
+    val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
+      .withRequiredArg
+      .describedAs("encoder_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[StringEncoder].getName)
+    val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
+      .withRequiredArg
+      .describedAs("encoder_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[StringEncoder].getName)
+    val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+      "By default each line is read as a separate message.")
+      .withRequiredArg
+      .describedAs("reader_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[LineMessageReader].getName)
+    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1024 * 100)
+    val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
+      "This allows custom configuration for a user-defined message reader.")
+      .withRequiredArg
+      .describedAs("prop")
+      .ofType(classOf[String])
+
+    val dataOpt = parser.accepts("data", "Input message data content")
+      .withRequiredArg()
+      .describedAs("input message data")
+      .ofType(classOf[String])
+
+    val fileOpt = parser.accepts("file", "Input message file name")
+      .withRequiredArg()
+      .describedAs("input message file")
+      .ofType(classOf[String])
+
+    val replicationOpt = parser.accepts("replication", "Message data replication count")
+      .withRequiredArg()
+      .describedAs("message replication count, default: 1")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+
+    val options = parser.parse(args: _*)
+    for (arg <- List(topicOpt, brokerListOpt)) {
+      if (!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val topic = options.valueOf(topicOpt)
+    val brokerList = options.valueOf(brokerListOpt)
+    val sync = options.has(syncOpt)
+    val compress = options.has(compressOpt)
+    val batchSize = options.valueOf(batchSizeOpt)
+    val sendTimeout = options.valueOf(sendTimeoutOpt)
+    val queueSize = options.valueOf(queueSizeOpt)
+    val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
+    val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
+    val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
+    val keyEncoderClass = options.valueOf(keyEncoderOpt)
+    val valueEncoderClass = options.valueOf(valueEncoderOpt)
+    val readerClass = options.valueOf(messageReaderOpt)
+    val socketBuffer = options.valueOf(socketBufferSizeOpt)
+    val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
+    val messageData = options.valuesOf(dataOpt)
+    val messageFile = options.valuesOf(fileOpt)
+    val messageReplication = options.valueOf(replicationOpt)
+
+    cmdLineProps.put("topic", topic)
+    val props = new Properties()
+    props.put("metadata.broker.list", brokerList)
+    val codec = if (compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
+    props.put("compression.codec", codec.toString)
+    props.put("producer.type", if (sync) "sync" else "async")
+    if (options.has(batchSizeOpt))
+      props.put("batch.num.messages", batchSize.toString)
+
+    props.put("message.send.max.retries", options.valueOf(messageSendMaxRetriesOpt).toString)
+    props.put("retry.backoff.ms", options.valueOf(retryBackoffMsOpt).toString)
+    props.put("queue.buffering.max.ms", sendTimeout.toString)
+    props.put("queue.buffering.max.messages", queueSize.toString)
+    props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
+    props.put("request.required.acks", requestRequiredAcks.toString)
+    props.put("request.timeout.ms", requestTimeoutMs.toString)
+    props.put("key.serializer.class", keyEncoderClass)
+    props.put("serializer.class", valueEncoderClass)
+    props.put("send.buffer.bytes", socketBuffer.toString)
+
+    val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
+
+    if (messageData.size()>0) {
+      reader.init(new ByteArrayInputStream(messageData.get(0).getBytes(StandardCharsets.UTF_8)), cmdLineProps)
+    } else if (messageFile.size()>0) {
+      reader.init(FileUtils.openInputStream(new File(messageFile.get(0))), cmdLineProps)
+    } else {
+      reader.init(System.in, cmdLineProps)
+    }
+
+    try {
+      val executor = Executors.newCachedThreadPool()
+      val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
+      Runtime.getRuntime.addShutdownHook(new Thread() {
+        override def run(): Unit = {
+          producer.close()
+          executor.shutdown()
+        }
+      })
+
+      var message: KeyedMessage[AnyRef, AnyRef] = null
+      do {
+        message = reader.readMessage()
+        if (message != null) {
+          var i = 0
+          while (i < messageReplication) {
+            executor.submit(new Runnable {
+              override def run(): Unit = producer.send(message)
+            }).get()
+            i += 1
+          }
+          System.out.println("Produced %d messages".format(messageReplication))
+        }
+      } while (message != null)
+    } catch {
+      case e: Exception =>
+        e.printStackTrace()
+        System.exit(1)
+    }
+    System.exit(0)
+  }
+
+  def parseLineReaderArgs(args: Iterable[String]): Properties = {
+    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
+    if (!splits.forall(_.length == 2)) {
+      System.err.println("Invalid line reader properties: " + args.mkString(" "))
+      System.exit(1)
+    }
+    val props = new Properties
+    for (a <- splits)
+      props.put(a(0), a(1))
+    props
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java b/eagle-core/eagle-alert/alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
new file mode 100644
index 0000000..e4d9f8f
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
@@ -0,0 +1,69 @@
+package org.apache.eagle.alert.tools;
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TestKafkaOffset {
+    @Ignore
+    @Test
+    public void test() throws Exception {
+        System.setProperty("config.resource", "/kafka-offset-test.application.conf");
+        Config config = ConfigFactory.load();
+        ZKConfig zkConfig = new ZKConfig();
+        zkConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+        zkConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot");
+        zkConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+        zkConfig.connectionTimeoutMs = config.getInt("dataSourceConfig.connectionTimeoutMs");
+        zkConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+        zkConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+
+        String topic = "testTopic1";
+        String topology = "alertUnitTopology_1";
+
+        while(true) {
+            KafkaConsumerOffsetFetcher consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(zkConfig, topic, topology);
+            String kafkaBrokerList = config.getString("dataSourceConfig.kafkaBrokerList");
+            KafkaLatestOffsetFetcher latestOffsetFetcher = new KafkaLatestOffsetFetcher(kafkaBrokerList);
+
+            Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
+            if(consumedOffset.size() == 0){
+                System.out.println("no any consumer offset found for this topic " + topic);
+            }
+            Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(topic, consumedOffset.size());
+            if(latestOffset.size() == 0){
+                System.out.println("no any latest offset found for this topic " + topic);
+            }
+            for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
+                String partition = entry.getKey();
+                Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
+                Long lag = latestOffset.get(partitionNumber) - entry.getValue();
+                System.out.println(String.format("parition %s, total: %d, consumed: %d, lag: %d",
+                        partition, latestOffset.get(partitionNumber), entry.getValue(), lag));
+            }
+            Thread.sleep(10000);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf b/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
new file mode 100644
index 0000000..ca4d68a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
@@ -0,0 +1,9 @@
+"dataSourceConfig":{
+  "kafkaBrokerList" : "kafka-broker:9092",
+  "zkQuorum" : "zk-admin:2181",
+  "transactionZKRoot" : "/consumers/eagle_consumer/%s/%s",
+  "zkSessionTimeoutMs" : 10000,
+  "connectionTimeoutMs" : 10000,
+  "zkRetryTimes" : 3,
+  "zkRetryInterval" : 3000
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/log4j.properties b/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d59ded6
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/.gitignore b/eagle-core/eagle-alert/alert/alert-engine/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/pom.xml b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/pom.xml
new file mode 100644
index 0000000..9f4d39f
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.eagle</groupId>
+        <artifactId>alert-engine-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>alert-engine-base</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-jvm</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mapdb</groupId>
+            <artifactId>mapdb</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.velocity</groupId>
+            <artifactId>velocity</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>javax.mail</groupId>
+            <artifactId>mail</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.6</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
new file mode 100755
index 0000000..7219271
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
@@ -0,0 +1,27 @@
+package org.apache.eagle.alert.engine;
+
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface AlertStreamCollector extends Collector<AlertStreamEvent> {
+    /**
+     * No need to be thread-safe, but should be called on in synchronous like in Storm bolt execute method
+     */
+    void flush();
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/Collector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/Collector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/Collector.java
new file mode 100755
index 0000000..9a6489c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/Collector.java
@@ -0,0 +1,27 @@
+package org.apache.eagle.alert.engine;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@FunctionalInterface
+public interface Collector<T> {
+    /**
+     * Must make sure thread-safe
+     *
+     * @param t
+     */
+    void emit(T t);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
new file mode 100755
index 0000000..3a99e98
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+/**
+ * Executed in thread-safe trace
+ */
+public interface PartitionedEventCollector extends Collector<PartitionedEvent> {
+    /**
+     * @param event to be dropped
+     */
+    void drop(PartitionedEvent event);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContext.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
new file mode 100644
index 0000000..609378b
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
@@ -0,0 +1,27 @@
+package org.apache.eagle.alert.engine;
+
+import backtype.storm.metric.api.MultiCountMetric;
+
+import com.typesafe.config.Config;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface StreamContext {
+    MultiCountMetric counter();
+
+    Config config();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
new file mode 100644
index 0000000..f339cd8
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
@@ -0,0 +1,26 @@
+package org.apache.eagle.alert.engine;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.TopologyContext;
+
+import com.typesafe.config.Config;
+
+public class StreamContextImpl implements StreamContext {
+    private final Config config;
+    private final MultiCountMetric counter;
+
+    public StreamContextImpl(Config config, MultiCountMetric counter, TopologyContext context) {
+        this.counter=counter;
+        this.config = config;
+    }
+
+    @Override
+    public MultiCountMetric counter() {
+        return this.counter;
+    }
+
+    @Override
+    public Config config() {
+        return this.config;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
new file mode 100644
index 0000000..2bca329
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
@@ -0,0 +1,59 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine;
+
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
+import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
+
+import backtype.storm.generated.StormTopology;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 5/3/16. Make sure unit topology can be started either from command line
+ * or from remote A few parameters for starting unit topology 1. number of spout
+ * tasks 2. number of router bolts 3. number of alert bolts 4. number of publish
+ * bolts
+ *
+ * Connections 1. spout and router bolt 2. router bolt and alert bolt 3. alert
+ * bolt and publish bolt
+ */
+public class UnitTopologyMain {
+
+    public static void main(String[] args) {
+        Config config = ConfigFactory.load();
+        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+        String topologyId = config.getString("topology.name");
+        ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
+
+        new UnitTopologyRunner(changeNotifyService).run(topologyId, config);
+    }
+
+    public static StormTopology createTopology(Config config) {
+        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+        String topologyId = config.getString("topology.name");
+        ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
+
+        return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
new file mode 100644
index 0000000..22fe56a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+
+import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
+import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
+import org.apache.eagle.alert.engine.router.SpoutSpecListener;
+import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
+
+import com.typesafe.config.Config;
+
+/**
+ * IMetadataChangeNotifyService defines the following features
+ * 1) initialization
+ * 2) register metadata change listener
+ *
+ * In distributed environment for example storm platform,
+ * subclass implementing this interface should have the following lifecycle
+ * 1. object is created in client machine
+ * 2. object is serialized and transferred to other machine
+ * 3. object is created through deserialization
+ * 4. invoke init() method to do initialization
+ * 5. invoke various registerListener to get notified of config change
+ * 6. invoke close() to release system resource
+ */
+public interface IMetadataChangeNotifyService extends Closeable,Serializable {
+    /**
+     *
+     * @param config
+     */
+    void init(Config config, MetadataType type);
+
+    void registerListener(SpoutSpecListener listener);
+
+    void registerListener(AlertBoltSpecListener listener);
+
+    void registerListener(StreamRouterBoltSpecListener listener);
+
+    void registerListener(AlertPublishSpecListener listener);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
new file mode 100644
index 0000000..ac68e26
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
@@ -0,0 +1,30 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+/**
+ * Since 5/4/16.
+ */
+public enum MetadataType {
+    SPOUT,
+    STREAM_ROUTER_BOLT,
+    ALERT_BOLT,
+    ALERT_PUBLISH_BOLT
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
new file mode 100644
index 0000000..927eb8c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
@@ -0,0 +1,25 @@
+package org.apache.eagle.alert.engine.coordinator;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class StreamDefinitionNotFoundException extends Exception {
+    private static final long serialVersionUID = 6027811718016485808L;
+
+    public StreamDefinitionNotFoundException(String streamId){
+        super("Stream definition not found: "+streamId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
new file mode 100755
index 0000000..52fcc04
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.coordination.model.PublishSpec;
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
+import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
+import org.apache.eagle.alert.engine.router.SpoutSpecListener;
+import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+/**
+ * notify 3 components of metadata change Spout, StreamRouterBolt and AlertBolt
+ */
+@SuppressWarnings({ "serial" })
+public abstract class AbstractMetadataChangeNotifyService implements IMetadataChangeNotifyService, Closeable,
+        Serializable {
+    private final static Logger LOG = LoggerFactory.getLogger(AbstractMetadataChangeNotifyService.class);
+    private final List<StreamRouterBoltSpecListener> streamRouterBoltSpecListeners = new ArrayList<>();
+    private final List<SpoutSpecListener> spoutSpecListeners = new ArrayList<>();
+    private final List<AlertBoltSpecListener> alertBoltSpecListeners = new ArrayList<>();
+    private final List<AlertPublishSpecListener> alertPublishSpecListeners = new ArrayList<>();
+    protected MetadataType type;
+
+    /**
+     * @param config
+     */
+    @Override
+    public void init(Config config, MetadataType type) {
+        this.type = type;
+    }
+
+    @Override
+    public void registerListener(AlertPublishSpecListener listener) {
+        synchronized (alertBoltSpecListeners) {
+            Preconditions.checkNotNull(alertPublishSpecListeners, "Not initialized yet");
+            LOG.info("Register {}", listener);
+            alertPublishSpecListeners.add(listener);
+        }
+    }
+
+    @Override
+    public void registerListener(StreamRouterBoltSpecListener listener) {
+        synchronized (streamRouterBoltSpecListeners) {
+            streamRouterBoltSpecListeners.add(listener);
+        }
+    }
+
+    @Override
+    public void registerListener(AlertBoltSpecListener listener) {
+        synchronized (alertBoltSpecListeners) {
+            alertBoltSpecListeners.add(listener);
+        }
+    }
+
+    @Override
+    public void registerListener(SpoutSpecListener listener) {
+        synchronized (spoutSpecListeners) {
+            spoutSpecListeners.add(listener);
+        }
+    }
+
+    protected void notifySpout(SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) {
+        spoutSpecListeners.forEach(s -> s.onSpoutSpecChange(spoutSpec, sds));
+    }
+
+    protected void notifyStreamRouterBolt(RouterSpec routerSpec, Map<String, StreamDefinition> sds) {
+        streamRouterBoltSpecListeners.forEach(s -> s.onStreamRouteBoltSpecChange(routerSpec, sds));
+    }
+
+    protected void notifyAlertBolt(AlertBoltSpec alertBoltSpec, Map<String, StreamDefinition> sds) {
+        alertBoltSpecListeners.forEach(s -> s.onAlertBoltSpecChange(alertBoltSpec, sds));
+    }
+
+    protected void notifyAlertPublishBolt(PublishSpec alertPublishSpec, Map<String, StreamDefinition> sds) {
+        alertPublishSpecListeners.forEach(s -> s.onAlertPublishSpecChange(alertPublishSpec, sds));
+    }
+
+    public void close() throws IOException {
+        LOG.info("Closed");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
new file mode 100755
index 0000000..1b93072
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.eagle.alert.config.ConfigBusConsumer;
+import org.apache.eagle.alert.config.ConfigChangeCallback;
+import org.apache.eagle.alert.config.ConfigValue;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.coordination.model.PublishSpec;
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.VersionedPolicyDefinition;
+import org.apache.eagle.alert.coordination.model.VersionedStreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * <b>TODO</b>: performance tuning: It is not JVM level service, so it may cause
+ * zookeeper burden in case of too many listeners This does not support
+ * dynamically adding topic, all topics should be available when service object
+ * is created.
+ * <p>
+ * ZK path format is as following:
+ * <ul>
+ * <li>/alert/topology_1/spout</li>
+ * <li>/alert/topology_1/router</li>
+ * <li>/alert/topology_1/alert</li>
+ * <li>/alert/topology_1/publisher</li>
+ * </ul>
+ */
+public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyService implements ConfigChangeCallback {
+    private static final long serialVersionUID = -1509237694501235144L;
+    private static final Logger LOG = LoggerFactory.getLogger(ZKMetadataChangeNotifyService.class);
+    private ZKConfig zkConfig;
+    private String topologyId;
+    private ConfigBusConsumer consumer;
+
+    private transient IMetadataServiceClient client;
+
+    public ZKMetadataChangeNotifyService(ZKConfig config, String topologyId) {
+        this.zkConfig = config;
+        this.topologyId = topologyId;
+    }
+
+    @Override
+    public void init(Config config, MetadataType type) {
+        super.init(config, type);
+        client = new MetadataServiceClientImpl(config);
+        consumer = new ConfigBusConsumer(zkConfig, topologyId + "/" + getMetadataTopicSuffix(), this);
+        LOG.info("init called for client");
+    }
+
+    /**
+     * @seeAlso Coordinator
+     * @return
+     */
+    private String getMetadataTopicSuffix() {
+        switch (type) {
+        case ALERT_BOLT:
+            return "alert";
+        case ALERT_PUBLISH_BOLT:
+            return "publisher";
+        case SPOUT:
+            return "spout";
+        case STREAM_ROUTER_BOLT:
+            return "router";
+        default:
+            throw new RuntimeException(String.format("unexpected metadata type: %s !", type));
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        consumer.close();
+        LOG.info("Config consumer closed");
+    }
+
+    @Override
+    public void onNewConfig(ConfigValue value) {
+        LOG.info("Metadata changed {}",value);
+
+        if (client == null) {
+            LOG.error("OnNewConfig trigger, but metadata service client is null. Metadata type {}", type);
+            return;
+        }
+
+        // analyze config value and notify corresponding listeners
+        String version = value.getValue().toString();
+        // brute-force load all: this might introduce load's on metadata service.
+        // FIXME : after ScheduleState persisted with better normalization, load
+        // state based on type and version
+        ScheduleState state = client.getVersionedSpec(version);
+        if (state == null) {
+            LOG.error("Failed to load schedule state of version {}, this is possibly a bug, pls check coordinator log !", version);
+            return;
+        }
+        Map<String, StreamDefinition> sds = getStreams(state.getStreamSnapshots());
+        switch (type) {
+        case ALERT_BOLT:
+            // we might query metadata service query get metadata snapshot and StreamDefinition
+            AlertBoltSpec alertSpec = state.getAlertSpecs().get(topologyId);
+            if (alertSpec == null) {
+                LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
+            } else {
+                prePopulate(alertSpec, state.getPolicySnapshots());
+                notifyAlertBolt(alertSpec, sds);
+            }
+            break;
+        case ALERT_PUBLISH_BOLT:
+            PublishSpec pubSpec = state.getPublishSpecs().get(topologyId);
+            if (pubSpec == null) {
+                LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
+            } else {
+                notifyAlertPublishBolt(pubSpec, sds);
+            }
+            break;
+        case SPOUT:
+            SpoutSpec spoutSpec = state.getSpoutSpecs().get(topologyId);
+            if (spoutSpec == null) {
+                LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
+            } else {
+                notifySpout(spoutSpec, sds);
+            }
+            break;
+        case STREAM_ROUTER_BOLT:
+            RouterSpec gSpec = state.getGroupSpecs().get(topologyId);
+            if (gSpec == null) {
+                LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
+            } else {
+                notifyStreamRouterBolt(gSpec, sds);
+            }
+            break;
+        default:
+            LOG.error("unexpected metadata type: {} ", type);
+        }
+    }
+
+    private void prePopulate(AlertBoltSpec alertSpec, List<VersionedPolicyDefinition> list) {
+        Map<String, PolicyDefinition> policyMap = listToMap(list);
+        for (Entry<String, List<String>> policyEntry : alertSpec.getBoltPolicyIdsMap().entrySet()) {
+            List<PolicyDefinition> pds = alertSpec.getBoltPoliciesMap().get(policyEntry.getKey());
+            if (pds == null) {
+                pds = new ArrayList<PolicyDefinition>();
+                alertSpec.getBoltPoliciesMap().put(policyEntry.getKey(), pds);
+            }
+            for (String policyName : policyEntry.getValue()) {
+                if (policyMap.containsKey(policyName)) {
+                    pds.add(policyMap.get(policyName));
+                }
+            }
+        }
+    }
+
+    private Map<String, StreamDefinition> getStreams(List<VersionedStreamDefinition> streamSnapshots) {
+        Map<String, StreamDefinition> result = new HashMap<String, StreamDefinition>();
+        for (VersionedStreamDefinition vsd : streamSnapshots) {
+            result.put(vsd.getDefinition().getStreamId(), vsd.getDefinition());
+        }
+        return result;
+    }
+
+    private Map<String, PolicyDefinition> listToMap(List<VersionedPolicyDefinition> listStreams) {
+        Map<String, PolicyDefinition> result = new HashMap<String, PolicyDefinition>();
+        for (VersionedPolicyDefinition sd : listStreams) {
+            result.put(sd.getDefinition().getName(), sd.getDefinition());
+        }
+        return result;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
new file mode 100644
index 0000000..22f1408
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
@@ -0,0 +1,29 @@
+package org.apache.eagle.alert.engine.evaluator;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface PolicyChangeListener {
+    void onPolicyChange(List<PolicyDefinition> added,
+                        List<PolicyDefinition> removed,
+                        List<PolicyDefinition> modified, Map<String, StreamDefinition> sds);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
new file mode 100644
index 0000000..dd3ca24
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
@@ -0,0 +1,45 @@
+package org.apache.eagle.alert.engine.evaluator;
+
+import java.io.Serializable;
+
+import org.apache.eagle.alert.engine.AlertStreamCollector;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * policy group refers to the policies which belong to the same MonitoredStream
+ * 3 lifecycle steps are involved in PolicyGroupEvaluator
+ * Step 1: create object. Be aware that in distributed environment, this object may be serialized and transferred across network
+ * Step 2: init. This normally is invoked only once before nextEvent is invoked
+ * Step 3: nextEvent
+ * Step 4: close
+ */
+public interface PolicyGroupEvaluator extends PolicyChangeListener, Serializable{
+    void init(StreamContext context, AlertStreamCollector collector);
+
+    /**
+     * Evaluate event
+     */
+    void nextEvent(PartitionedEvent event);
+
+    String getName();
+
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
new file mode 100644
index 0000000..2898ebc
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
@@ -0,0 +1,60 @@
+package org.apache.eagle.alert.engine.evaluator;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+import backtype.storm.metric.api.MultiCountMetric;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class PolicyHandlerContext {
+    private PolicyDefinition policyDefinition;
+    private PolicyGroupEvaluator parentEvaluator;
+    private MultiCountMetric policyCounter;
+    private String policyEvaluatorId;
+
+    public PolicyDefinition getPolicyDefinition() {
+        return policyDefinition;
+    }
+
+    public void setPolicyDefinition(PolicyDefinition policyDefinition) {
+        this.policyDefinition = policyDefinition;
+    }
+
+    public PolicyGroupEvaluator getParentEvaluator() {
+        return parentEvaluator;
+    }
+
+    public void setParentEvaluator(PolicyGroupEvaluator parentEvaluator) {
+        this.parentEvaluator = parentEvaluator;
+    }
+
+    public void setPolicyCounter(MultiCountMetric metric) {
+        this.policyCounter = metric;
+    }
+
+    public MultiCountMetric getPolicyCounter() {
+        return policyCounter;
+    }
+
+    public String getPolicyEvaluatorId() {
+        return policyEvaluatorId;
+    }
+
+    public void setPolicyEvaluatorId(String policyEvaluatorId) {
+        this.policyEvaluatorId = policyEvaluatorId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
new file mode 100755
index 0000000..069d321
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
@@ -0,0 +1,27 @@
+package org.apache.eagle.alert.engine.evaluator;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface PolicyStreamHandler {
+    void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception;
+    void send(StreamEvent event) throws Exception;
+    void close() throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
new file mode 100644
index 0000000..2aa70e8
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+
+public class PolicyStreamHandlers {
+    public static final String SIDDHI_ENGINE ="siddhi";
+
+    public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds){
+        if(SIDDHI_ENGINE.equals(type)) {
+            return new SiddhiPolicyHandler(sds);
+        }
+        throw new IllegalArgumentException("Illegal policy stream handler type: "+type);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
new file mode 100755
index 0000000..3aa079e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.eagle.alert.engine.AlertStreamCollector;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+
+/**
+ * <h2>Thread Safe Mechanism</h2>
+ * <ul>
+ * <li>
+ *     emit() method is thread-safe enough to be called anywhere asynchronously in multi-thread
+ * </li>
+ * <li>
+ *     flush() method must be called synchronously, because Storm OutputCollector is not thread-safe
+ * </li>
+ * </ul>
+ */
+public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCollector {
+    private final OutputCollector delegate;
+    private final LinkedBlockingQueue<AlertStreamEvent> queue;
+    private final static Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class);
+    private final AtomicLong lastFlushTime = new AtomicLong(System.currentTimeMillis());
+    private final AutoAlertFlusher flusher;
+    private final static int MAX_ALERT_DELAY_SECS = 10;
+
+    public AlertBoltOutputCollectorThreadSafeWrapper(OutputCollector outputCollector){
+        this.delegate = outputCollector;
+        this.queue = new LinkedBlockingQueue<>();
+        this.flusher = new AutoAlertFlusher(this);
+        this.flusher.setName(Thread.currentThread().getName()+"-alertFlusher");
+        this.flusher.start();
+    }
+
+    private static class AutoAlertFlusher extends Thread{
+        private final AlertBoltOutputCollectorThreadSafeWrapper collector;
+        private boolean stopped = false;
+        private final static Logger LOG = LoggerFactory.getLogger(AutoAlertFlusher.class);
+
+        private AutoAlertFlusher(AlertBoltOutputCollectorThreadSafeWrapper collector){
+            this.collector = collector;
+        }
+
+        @Override
+        public void run() {
+            LOG.info("Starting");
+            while(!this.stopped){
+                if(System.currentTimeMillis() - collector.lastFlushTime.get() >= MAX_ALERT_DELAY_SECS * 1000L){
+                    this.collector.flush();
+                }
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException ignored) {}
+            }
+            LOG.info("Stopped");
+        }
+        public void shutdown(){
+            LOG.info("Stopping");
+            this.stopped = true;
+        }
+    }
+
+    /**
+     * Emit method can be called in multi-thread
+     * @param event
+     */
+    @Override
+    public void emit(AlertStreamEvent event) {
+        try {
+            queue.put(event);
+        } catch (InterruptedException e) {
+            LOG.error(e.getMessage(),e);
+        }
+    }
+
+    /**
+     * Flush will be called in synchronous way like StormBolt.execute() as Storm OutputCollector is not thread-safe
+     */
+    @Override
+    public void flush() {
+        if(!queue.isEmpty()) {
+            List<AlertStreamEvent> events = new ArrayList<>();
+            queue.drainTo(events);
+            events.forEach((event) -> delegate.emit(Arrays.asList(event.getStreamId(), event)));
+            LOG.info("Flushed {} alerts", events.size());
+        }
+        lastFlushTime.set(System.currentTimeMillis());
+    }
+
+    @Override
+    public void close() {
+        this.flusher.shutdown();
+    }
+}
\ No newline at end of file


Mime
View raw message