eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [29/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:08:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
new file mode 100644
index 0000000..b37f7b3
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
@@ -0,0 +1,220 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.spout;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamConverter;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.spout.SpoutOutputCollector;
+
+/**
+ * intercept the message sent from within KafkaSpout and select downstream bolts based on meta-data
+ * This is topic based. each topic will have one SpoutOutputCollectorWrapper
+ */
+public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements ISpoutSpecLCM,SerializationMetadataProvider {
+    private static final Logger LOG = LoggerFactory.getLogger(SpoutOutputCollectorWrapper.class);
+
+    private final ISpoutOutputCollector delegate;
+    private final String topic;
+    private final PartitionedEventSerializer serializer;
+    private int numOfRouterBolts;
+
+    private volatile List<StreamRepartitionMetadata> streamRepartitionMetadataList;
+    private volatile Tuple2StreamConverter converter;
+    private CorrelationSpout spout;
+    private volatile Map<String, StreamDefinition> sds;
+
+    /**
+     * @param delegate   actual SpoutOutputCollector to send data to following bolts
+     * @param topic      topic for this KafkaSpout to handle
+     * @param numGroupbyBolts bolts following this spout
+     * @param serializer
+     */
+    public SpoutOutputCollectorWrapper(CorrelationSpout spout,
+                                       ISpoutOutputCollector delegate,
+                                       String topic,
+                                       SpoutSpec spoutSpec,
+                                       int numGroupbyBolts,
+                                       Map<String, StreamDefinition> sds, PartitionedEventSerializer serializer) {
+        super(delegate);
+        this.spout = spout;
+        this.delegate = delegate;
+        this.topic = topic;
+        this.streamRepartitionMetadataList = spoutSpec.getStreamRepartitionMetadataMap().get(topic);
+        this.converter = new Tuple2StreamConverter(spoutSpec.getTuple2StreamMetadataMap().get(topic));
+        this.numOfRouterBolts = numGroupbyBolts;
+        this.sds = sds;
+        this.serializer = serializer;
+    }
+
+    /**
+     * How to assert that numTotalGroupbyBolts >= numOfRouterBolts, otherwise
+     * there is runtime issue by default, tuple includes 2 fields field 1: topic
+     * name field 2: map of key/value
+     */
+    @SuppressWarnings("rawtypes")
+    @Override
+    public List<Integer> emit(List<Object> tuple, Object messageId) {
+        if (!sanityCheck()) {
+            LOG.error(
+                    "spout collector for topic {} see monitored metadata invalid, is this data source removed! Trigger message id {} ",
+                    topic, messageId);
+            return null;
+        }
+
+        KafkaMessageIdWrapper newMessageId = new KafkaMessageIdWrapper(messageId);
+        newMessageId.topic = topic;
+        /**
+            phase 1: tuple to stream converter
+            if this topic multiplexes multiple streams, then retrieve the individual streams
+        */
+        List<Object> convertedTuple = converter.convert(tuple);
+        if(convertedTuple == null) {
+            LOG.warn("source data {} can't be converted to a stream, ignore this message", tuple);
+            spout.ack(newMessageId);
+            return null;
+        }
+        Map m = (Map)convertedTuple.get(3);
+        Object streamId = convertedTuple.get(1);
+
+        StreamDefinition sd = sds.get(streamId);
+        if(sd == null){
+            LOG.warn("StreamDefinition {} is not found within {}, ignore this message", streamId, sds);
+            spout.ack(newMessageId);
+            return null;
+        }
+
+        StreamEvent event = convertToStreamEventByStreamDefinition((Long)convertedTuple.get(2), m, sds.get(streamId));
+        /*
+            phase 2: stream repartition
+        */
+        for(StreamRepartitionMetadata md : streamRepartitionMetadataList) {
+            // one stream may have multiple group-by strategies, each strategy is for a specific group-by
+            for(StreamRepartitionStrategy groupingStrategy : md.groupingStrategies){
+                int hash = 0;
+                if(groupingStrategy.getPartition().getType().equals(StreamPartition.Type.GROUPBY)) {
+                    hash = getRoutingHashByGroupingStrategy(m, groupingStrategy);
+                }else if(groupingStrategy.getPartition().getType().equals(StreamPartition.Type.SHUFFLE)){
+                    hash = Math.abs((int)System.currentTimeMillis());
+                }
+                int mod = hash % groupingStrategy.numTotalParticipatingRouterBolts;
+                // filter out message
+                if (mod >= groupingStrategy.startSequence && mod < groupingStrategy.startSequence + numOfRouterBolts) {
+                    // framework takes care of field grouping instead of using storm internal field grouping
+                    String sid = StreamIdConversion.generateStreamIdBetween(spout.getSpoutName(), spout.getRouteBoltName()+ (hash % numOfRouterBolts));
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Emitted tuple: {} with message Id: {}, with topic {}, to streamId {}", convertedTuple, messageId, topic, sid);
+                    }
+                    // send message to StreamRouterBolt
+                    PartitionedEvent pEvent = new PartitionedEvent(event, groupingStrategy.partition, hash);
+                    if(this.serializer == null){
+                         delegate.emit(sid, Collections.singletonList(pEvent), newMessageId);
+                    }else {
+                        try {
+                            delegate.emit(sid, Collections.singletonList(serializer.serialize(pEvent)), newMessageId);
+                        } catch (IOException e) {
+                            LOG.error("Failed to serialize {}", pEvent, e);
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }else{
+                    // ******* short-cut ack ********
+                    // we should simply ack those messages which are not processed in this topology because KafkaSpout implementation requires _pending is empty
+                    // before moving to next offsets.
+                    if(LOG.isDebugEnabled()){
+                        LOG.debug("Message filtered with mod {} not within range {} and {} for message {}", mod, groupingStrategy.startSequence,
+                                groupingStrategy.startSequence+ numOfRouterBolts, tuple);
+                    }
+                    spout.ack(newMessageId);
+                }
+            }
+        }
+
+        return null;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private int getRoutingHashByGroupingStrategy(Map data, StreamRepartitionStrategy gs){
+        // calculate hash value for values from group-by fields
+        HashCodeBuilder hashCodeBuilder = new HashCodeBuilder();
+        for(String groupingField : gs.partition.getColumns()) {
+            if(data.get(groupingField) != null){
+                hashCodeBuilder.append(data.get(groupingField));
+            } else {
+                LOG.warn("Required GroupBy fields {} not found: {}", gs.partition.getColumns(), data);
+            }
+        }
+        int hash = hashCodeBuilder.toHashCode();
+        hash = Math.abs(hash);
+        return hash;
+    }
+
+    private boolean sanityCheck() {
+        boolean isOk = true;
+        if (streamRepartitionMetadataList == null) {
+            LOG.error("streamRepartitionMetadataList is null!");
+            isOk = false;
+        }
+        if (converter == null) {
+            LOG.error("tuple2StreamMetadata is null!");
+            isOk = false;
+        }
+        return isOk;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private StreamEvent convertToStreamEventByStreamDefinition(long timestamp, Map m, StreamDefinition sd){
+        return StreamEvent.Builder().timestamep(timestamp).attributes(m,sd).build();
+    }
+
+    /**
+     * SpoutSpec may be changed, this class will respond to changes on tuple2StreamMetadataMap and streamRepartitionMetadataMap
+     * @param spoutSpec
+     * @param sds
+     */
+    @Override
+    public void update(SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) {
+        this.streamRepartitionMetadataList = spoutSpec.getStreamRepartitionMetadataMap().get(topic);
+        this.converter = new Tuple2StreamConverter(spoutSpec.getTuple2StreamMetadataMap().get(topic));
+        this.sds = sds;
+    }
+
+    @Override
+    public StreamDefinition getStreamDefinition(String streamId) {
+        return this.sds.get(streamId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
new file mode 100644
index 0000000..f526cad
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.utils;
+
+import com.google.common.io.ByteStreams;
+
+import java.io.*;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+
+public class CompressionUtils {
+    public static byte[] compress(byte[] source) throws IOException {
+        if (source == null || source.length == 0) {
+            return source;
+        }
+        ByteArrayInputStream sourceStream = new ByteArrayInputStream(source);
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(source.length / 2);
+        try (OutputStream compressor = new GZIPOutputStream(outputStream)) {
+            ByteStreams.copy(sourceStream, compressor);
+            compressor.close();
+        }
+        try {
+            return outputStream.toByteArray();
+        } finally {
+            sourceStream.close();
+            outputStream.close();
+        }
+    }
+
+    public static byte[] decompress(byte[] compressed) throws IOException{
+        if (compressed == null || compressed.length == 0) {
+            return compressed;
+        }
+        ByteArrayInputStream sourceStream = new ByteArrayInputStream(compressed);
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(compressed.length * 2);
+        try (GZIPInputStream compressor = new GZIPInputStream(sourceStream)) {
+            ByteStreams.copy(compressor, outputStream);
+            compressor.close();
+        }
+        try {
+            return outputStream.toByteArray();
+        } finally {
+            sourceStream.close();
+            outputStream.close();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
new file mode 100644
index 0000000..a576404
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
@@ -0,0 +1,93 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.utils;
+
+import java.io.InputStream;
+
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Since 5/6/16.
+ */
+public class MetadataSerDeser {
+    private static final Logger LOG = LoggerFactory.getLogger(MetadataSerDeser.class);
+
+    @SuppressWarnings("rawtypes")
+    public static <K> K deserialize(InputStream is, TypeReference typeRef){
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            K spec = mapper.readValue(is, typeRef);
+            return spec;
+        }catch(Exception ex){
+            LOG.error("error in deserializing metadata of type {} from input stream", new TypeReference<K>(){}.getType().getTypeName(), ex);
+        }
+        return null;
+    }
+
+    public static <K> K deserialize(InputStream is, Class<K> cls){
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.configure(JsonParser.Feature.ALLOW_COMMENTS,true);
+        try {
+            K spec = mapper.readValue(is, cls);
+            return spec;
+        }catch(Exception ex){
+            LOG.error("Got error to deserialize metadata of type {} from input stream", new TypeReference<K>(){}.getType().getTypeName(), ex);
+        }
+        return null;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static <K> K deserialize(String json, TypeReference typeRef){
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            K spec = mapper.readValue(json, typeRef);
+            return spec;
+        }catch(Exception ex){
+            LOG.error("error in deserializing metadata of type {} from {}", new TypeReference<K>(){}.getType().getTypeName(), json, ex);
+        }
+        return null;
+    }
+
+    public static <K> K deserialize(String json, Class<K> cls){
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            K spec = mapper.readValue(json, cls);
+            return spec;
+        }catch(Exception ex){
+            LOG.error("error in deserializing metadata of type {} from {}", new TypeReference<K>(){}.getType().getTypeName(), json, ex);
+        }
+        return null;
+    }
+
+    public static <K> String serialize(K spec){
+        ObjectMapper mapper = new ObjectMapper();
+        try{
+            String json = mapper.writeValueAsString(spec);
+            return json;
+        }catch(Exception ex){
+            LOG.error("error in serializing object {} with type {}", spec, new TypeReference<K>(){}.getType().getTypeName(), ex);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
new file mode 100644
index 0000000..f4652a3
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+/**
+ * Utilities for working with Serializables.
+ *
+ * Derived from "com.google.cloud.dataflow.sdk.util.SerializableUtils":
+ * https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java
+ */
+public class SerializableUtils {
+  /**
+   * Serializes the argument into an array of bytes, and returns it.
+   *
+   * @throws IllegalArgumentException if there are errors when serializing
+   */
+  public static byte[] serializeToCompressedByteArray(Object value) {
+    try {
+      ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+      try (ObjectOutputStream oos = new ObjectOutputStream(new SnappyOutputStream(buffer))) {
+        oos.writeObject(value);
+      }
+      return buffer.toByteArray();
+    } catch (IOException exn) {
+      throw new IllegalArgumentException(
+          "unable to serialize " + value,
+          exn);
+    }
+  }
+
+  /**
+   * Serializes the argument into an array of bytes, and returns it.
+   *
+   * @throws IllegalArgumentException if there are errors when serializing
+   */
+  public static byte[] serializeToByteArray(Object value) {
+    try {
+      ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+      try (ObjectOutputStream oos = new ObjectOutputStream(buffer)) {
+        oos.writeObject(value);
+      }
+      return buffer.toByteArray();
+    } catch (IOException exn) {
+      throw new IllegalArgumentException("unable to serialize " + value, exn);
+    }
+  }
+
+  /**
+   * Deserializes an object from the given array of bytes, e.g., as
+   * serialized using {@link #serializeToCompressedByteArray}, and returns it.
+   *
+   * @throws IllegalArgumentException if there are errors when
+   * deserializing, using the provided description to identify what
+   * was being deserialized
+   */
+  public static Object deserializeFromByteArray(byte[] encodedValue,
+                                                          String description) {
+    try {
+      try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(encodedValue))) {
+        return ois.readObject();
+      }
+    } catch (IOException | ClassNotFoundException exn) {
+      throw new IllegalArgumentException(
+          "unable to deserialize " + description,
+          exn);
+    }
+  }
+
+  /**
+   * Deserializes an object from the given array of bytes, e.g., as
+   * serialized using {@link #serializeToCompressedByteArray}, and returns it.
+   *
+   * @throws IllegalArgumentException if there are errors when
+   * deserializing, using the provided description to identify what
+   * was being deserialized
+   */
+  public static Object deserializeFromCompressedByteArray(byte[] encodedValue,
+                                                          String description) {
+    try {
+      try (ObjectInputStream ois = new ObjectInputStream(
+          new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) {
+        return ois.readObject();
+      }
+    } catch (IOException | ClassNotFoundException exn) {
+      throw new IllegalArgumentException(
+          "unable to deserialize " + description,
+          exn);
+    }
+  }
+
+  public static <T extends Serializable> T ensureSerializable(T value) {
+    @SuppressWarnings("unchecked")
+    T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value),
+        value.toString());
+    return copy;
+  }
+
+  public static <T extends Serializable> T clone(T value) {
+    @SuppressWarnings("unchecked")
+    T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value),
+        value.toString());
+    return copy;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java
new file mode 100644
index 0000000..dd58172
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * Since 5/18/16.
+ * The original storm.kafka.KafkaSpout has some issues like the following
+ * 1) can only support one single topic
+ * 2) can only be initialized at open(), can't dynamically support another topic
+ */
+public class KafkaSpoutMetric implements IMetric {
+    @SuppressWarnings("unused")
+    private final static Logger LOG = LoggerFactory.getLogger(KafkaSpoutMetric.class);
+    private Map<String, KafkaSpoutMetricContext> metricContextMap = new ConcurrentHashMap<>();
+    private Map<String, KafkaUtils.KafkaOffsetMetric> offsetMetricMap = new ConcurrentHashMap<>();
+
+    public static class KafkaSpoutMetricContext {
+        SpoutConfig _spoutConfig;
+        DynamicPartitionConnections _connections;
+        PartitionCoordinator _coordinator;
+    }
+
+    public void addTopic(String topic, KafkaSpoutMetricContext context) {
+        // construct KafkaOffsetMetric
+        KafkaUtils.KafkaOffsetMetric kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(context._spoutConfig.topic, context._connections);
+        metricContextMap.put(topic, context);
+        offsetMetricMap.put(topic, kafkaOffsetMetric);
+    }
+
+    public void removeTopic(String topic) {
+        metricContextMap.remove(topic);
+        offsetMetricMap.remove(topic);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public Object getValueAndReset() {
+        HashMap spoutMetric = new HashMap();
+        for (Map.Entry<String, KafkaSpoutMetricContext> entry : metricContextMap.entrySet()) {
+            // construct offset metric
+            List<PartitionManager> pms = entry.getValue()._coordinator.getMyManagedPartitions();
+            Set<Partition> latestPartitions = new HashSet();
+            for (PartitionManager pm : pms) {
+                latestPartitions.add(pm.getPartition());
+            }
+
+            KafkaUtils.KafkaOffsetMetric offsetMetric = offsetMetricMap.get(entry.getKey());
+            offsetMetric.refreshPartitions(latestPartitions);
+            for (PartitionManager pm : pms) {
+                offsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
+            }
+            Object o = offsetMetric.getValueAndReset();
+            if(o != null) {
+                ((HashMap) o).forEach(
+                        (k, v) -> spoutMetric.put(k + "_" + entry.getKey(), v)
+                );
+            }
+
+            // construct partition metric
+            for (PartitionManager pm : pms) {
+                pm.getMetricsDataMap().forEach(
+                        (k, v) -> spoutMetric.put(k + "_" + entry.getKey(), v)
+                );
+            }
+        }
+        return spoutMetric;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java
new file mode 100644
index 0000000..1cef187
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java
@@ -0,0 +1,111 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package storm.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.spout.ISpoutSpecLCM;
+import org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+
+/**
+ * NOTE!!!!! This class copy/paste some code from storm.kafka.KafkaSpout to make sure it can support one process to hold multiple
+ * KafkaSpout
+ *
+ * this collectorWrapper provides the following capabilities:
+ * 1. inject customized collector collectorWrapper, so framework can control traffic routing
+ * 2. listen to topic to stream metadata change and pass that to customized collector collectorWrapper
+ * 3. return current streams for this topic
+ */
+public class KafkaSpoutWrapper extends KafkaSpout implements ISpoutSpecLCM {
+    private static final long serialVersionUID = 5507693757424351306L;
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutWrapper.class);
+    private KafkaSpoutMetric kafkaSpoutMetric;
+
+    public KafkaSpoutWrapper(SpoutConfig spoutConf, KafkaSpoutMetric kafkaSpoutMetric) {
+        super(spoutConf);
+        this.kafkaSpoutMetric = kafkaSpoutMetric;
+    }
+
+    private SpoutOutputCollectorWrapper collectorWrapper;
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+        String topologyInstanceId = context.getStormId();
+        ////// !!!! begin copying code from storm.kafka.KafkaSpout to here
+        _collector = collector;
+
+        Map stateConf = new HashMap(conf);
+        List<String> zkServers = _spoutConfig.zkServers;
+        if (zkServers == null) {
+            zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        }
+        Integer zkPort = _spoutConfig.zkPort;
+        if (zkPort == null) {
+            zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+        }
+        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
+        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
+        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
+        _state = new ZkState(stateConf);
+
+        _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
+
+        // using TransactionalState like this is a hack
+        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
+        if (_spoutConfig.hosts instanceof StaticHosts) {
+            _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, topologyInstanceId);
+        } else {
+            _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, topologyInstanceId);
+        }
+
+        ////// !!!! end copying code from storm.kafka.KafkaSpout to here
+
+        // add new topic to metric
+        KafkaSpoutMetric.KafkaSpoutMetricContext metricContext = new KafkaSpoutMetric.KafkaSpoutMetricContext();
+        metricContext._connections = _connections;
+        metricContext._coordinator = _coordinator;
+        metricContext._spoutConfig = _spoutConfig;
+        kafkaSpoutMetric.addTopic(_spoutConfig.topic, metricContext);
+
+        this.collectorWrapper = (SpoutOutputCollectorWrapper)collector;
+    }
+
+    @Override
+    public void update(SpoutSpec metadata, Map<String, StreamDefinition> sds){
+        collectorWrapper.update(metadata, sds);
+    }
+
+    @Override
+    public void close(){
+        super.close();
+        kafkaSpoutMetric.removeTopic(_spoutConfig.topic);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
new file mode 100644
index 0000000..2da6288
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
@@ -0,0 +1,267 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+	<head>
+		<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+		<meta name="viewport" content="width=device-width"/>
+		<style>
+			body {
+				width:100% !important;
+				min-width: 100%;
+				-webkit-text-size-adjust:100%;
+				-ms-text-size-adjust:100%;
+				margin:0;
+				padding:0;
+			}
+
+			table {
+				border-spacing: 0;
+				border-collapse: collapse;
+			}
+
+			table th,
+			table td {
+				padding: 3px 0 3px 0;
+			}
+
+			.body {
+				width: 100%;
+			}
+
+			p,a,h1,h2,h3,ul,ol,li {
+				font-family: Helvetica, Arial, sans-serif;
+				font-weight: normal;
+				margin: 0;
+				padding: 0;
+			}
+			p {
+				font-size: 14px;
+				line-height: 19px;
+			}
+			a {
+				color: #3294b1;
+			}
+			h1 {
+				font-size: 36px;
+				margin: 15px 0 5px 0;
+			}
+			h2 {
+				font-size: 32px;
+			}
+			h3 {
+				font-size: 28px;
+			}
+
+			ul,ol {
+				margin: 0 0 0 25px;
+				padding: 0;
+			}
+
+			.btn {
+				background: #2ba6cb !important;
+				border: 1px solid #2284a1;
+				padding: 10px 20px 10px 20px;
+				text-align: center;
+			}
+			.btn:hover {
+				background: #2795b6 !important;
+			}
+			.btn a {
+				color: #FFFFFF;
+				text-decoration: none;
+				font-weight: bold;
+				padding: 10px 20px 10px 20px;
+			}
+
+			.tableBordered {
+				border-top: 1px solid #b9e5ff;
+			}
+			.tableBordered th {
+				background: #ECF8FF;
+			}
+			.tableBordered th p {
+				font-weight: bold;
+				color: #3294b1;
+			}
+			.tableBordered th,
+			.tableBordered td {
+				color: #333333;
+				border-bottom: 1px solid #b9e5ff;
+				text-align: center;
+				padding-bottom: 5px;
+			}
+
+			.panel {
+				height: 100px;
+			}
+		</style>
+	</head>
+	<body>
+		#set ( $elem = $alertList[0] )
+		#set ( $alertUrl = $elem["alertDetailUrl"] )
+		#set ( $policyUrl = $elem["policyDetailUrl"] )
+		<table class="body">
+			<tr>
+				<td align="center" valign="top" style="background: #999999; padding: 0 0 0 0;">
+					<!-- Header -->
+					<table width="580">
+						<tr>
+							<td style="padding: 0 0 0 0;" align="left" >
+								<p style="color:#FFFFFF;font-weight: bold; font-size: 22px">UMP Alerts</p>
+							</td>
+						</tr>
+					</table>
+				</td>
+			</tr>
+
+			<tr>
+				<td align="center" valign="top">
+					<!-- Eagle Body -->
+					<table width="580">
+						<tr>
+							<!-- Title -->
+							<td align="center">
+								<h1>$elem["streamId"] Alert Detected</h1>
+							</td>
+						</tr>
+						<tr>
+							<!-- Time -->
+							<td>
+								<table width="580">
+									<tr>
+										<td>
+											<p><b>Detected Time: $elem["alertTime"]</b></p>
+										</td>
+										#set ( $severity = $elem["severity"] )
+										#if (!$severity || ("$severity" == ""))
+											#set ( $elem["severity"] = "WARNING")
+										#end
+										<td align="right">
+											<p><b>
+												Severity:
+									            #if ($elem["severity"] == "WARNING")
+													<span>$elem["severity"]</span>												
+    											#else
+													<span style="color: #FF0000;">$elem["severity"]</span>
+    											#end
+											</b></p>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- Description -->
+							<td valign="top" style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 10px;">
+								<p>$elem["alertMessage"]</p>
+							</td>
+						</tr>
+						<tr>
+							<!-- View Detail -->
+							<td align="center" style="padding: 10px 0 0 0;">
+								<table width="580">
+									<tr>
+										<td class="btn">
+											<a href="$alertUrl">View Alert Details on Eagle Web</a>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- Basic Information -->
+							<td style="padding: 20px 0 0 0;">
+								<p><b>Basic Information:</b></p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Basic Information Content -->
+							<td>
+								<table class="tableBordered" width="580">
+									<tr>
+										<th>
+											<p>Policy Name</p>
+										</th>
+										<th>
+											<p>Data Source</p>
+										</th>
+									</tr>
+									<tr>
+										<td>
+											<p>$elem["policyId"]</p>
+										</td>
+										<td>
+											<p>$elem["streamId"]</p>
+										</td>
+									</tr>
+									<tr>
+
+										<th>
+											<p>Creator</p>
+										</th>
+										<th>
+											<p>Severity</p>
+										</th>
+									</tr>
+									<tr>
+										<td>
+											<p>$elem["creator"]</p>
+										</td>
+										<td>
+											<p>$elem["severity"]</p>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- View Detail -->
+							<td align="center" style="padding: 10px 0 0 0;">
+								<table width="580">
+									<tr>
+										<td class="btn">
+											<a href="$policyUrl">View Policy Details on Eagle Web</a>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>						
+						<tr>
+							<!-- Actions Required -->
+							<td style="padding: 20px 0 0 0;">
+								<p><b>Actions Required:</b></p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Possible Root Causes Content -->
+							<td class="panel" valign="top" style="background: #F4F4F4; border: 1px solid #AAAAAA; padding: 10px 10px 12px 10px;">
+								<p> $elem["streamId"] alert found, please check.</p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Copyright -->
+							<td align="center">
+								<p><a href="<Eagle-Host>/alerts/alertlist.html">UMP Alert Engine</a></p>
+							</td>
+						</tr>
+					</table>
+				</td>
+			</tr>
+		</table>
+	</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
new file mode 100644
index 0000000..b9308ef
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_1",
+    "numOfTotalWorkers" : 2,
+    "numOfSpoutTasks" : 1,
+    "numOfRouterBolts" : 4,
+    "numOfAlertBolts" : 10,
+    "numOfPublishTasks" : 1,
+    "messageTimeoutSecs": 3600,
+    "localMode" : "true"
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "localhost:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+    "context" : "/api",
+    "host" : "localhost",
+    "port" : 8080
+  },
+  "coordinatorService": {
+    "host": "localhost",
+    "port": 9090,
+    "context" : "/api"
+  }
+  "metric":{
+    "sink": {
+      "kafka": {
+        "topic": "alert_metric"
+        "bootstrap.servers": "localhost:6667"
+      }
+      "stdout": {}
+      //      "elasticsearch": {
+      //        "hosts": ["localhost:9200"]
+      //        "index": "alert_metric"
+      //        "timestampField": "timestamp"
+      //      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
new file mode 100644
index 0000000..506bad9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# externalTimeBatch=org.apache.eagle.policy.siddhi.extension.ExternalTimeBatchWindowProcessor
+collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
new file mode 100644
index 0000000..af99e2c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+##log4j.logger.org.apache.eagle.alert.engine.spout.CorrelationSpout=DEBUG
+log4j.logger.org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper=DEBUG
+log4j.logger.org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector=DEBUG
+log4j.logger.org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl=DEBUG
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
new file mode 100644
index 0000000..aebf3b5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import javax.ws.rs.core.MediaType;
+
+import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.typesafe.config.Config;
+
+/**
+ * @since May 9, 2016
+ *
+ */
+public class CoordinatorClient implements Closeable {
+
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorClient.class);
+
+    private static final String EAGLE_COORDINATOR_SERVICE_CONTEXT = "coordinatorService.context";
+    private static final String EAGLE_COORDINATOR_SERVICE_PORT = "coordinatorService.port";
+    private static final String EAGLE_COORDINATOR_SERVICE_HOST = "coordinatorService.host";
+    private static final String COORDINATOR_SCHEDULE_API = "/coordinator/build";
+
+    private String host;
+    private int port;
+    private String context;
+    private transient Client client;
+    private String basePath;
+
+    public CoordinatorClient(Config config) {
+        this(config.getString(EAGLE_COORDINATOR_SERVICE_HOST), config.getInt(EAGLE_COORDINATOR_SERVICE_PORT), config
+                .getString(EAGLE_COORDINATOR_SERVICE_CONTEXT));
+        basePath = buildBasePath();
+    }
+
+    public CoordinatorClient(String host, int port, String context) {
+        this.host = host;
+        this.port = port;
+        this.context = context;
+        this.basePath = buildBasePath();
+        ClientConfig cc = new DefaultClientConfig();
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_CONNECT_TIMEOUT, 60 * 1000);
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_READ_TIMEOUT, 60 * 1000);
+        cc.getClasses().add(JacksonJsonProvider.class);
+        cc.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
+        this.client = Client.create(cc);
+        client.addFilter(new com.sun.jersey.api.client.filter.GZIPContentEncodingFilter());
+    }
+
+    private String buildBasePath() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("http://");
+        sb.append(host);
+        sb.append(":");
+        sb.append(port);
+        sb.append(context);
+        return sb.toString();
+    }
+
+    public String schedule() {
+        WebResource r = client.resource(basePath + COORDINATOR_SCHEDULE_API);
+        return r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(String.class);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.client.destroy();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
new file mode 100644
index 0000000..057aa73
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.utils.Utils;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+/**
+ * Case of simple
+ * 
+ * @since May 8, 2016
+ *
+ */
+public class Integration1 {
+    private static final Logger LOG = LoggerFactory.getLogger(Integration1.class);
+    private static final ObjectMapper om = new ObjectMapper();
+
+    public static void main(String[] args) throws Exception {
+        Integration1 inte = new Integration1();
+        inte.args = args;
+        inte.test_simple_threshhold();
+    }
+    
+    private String[] args;
+    private ExecutorService executors = Executors.newFixedThreadPool(5);
+
+    /**
+     * Assumption:
+     * <p>
+     * start metadata service 8080, better in docker
+     * <p>
+     * start coordinator service 9090, better in docker
+     * <p>
+     * datasources : perfmon_datasource
+     * <p>
+     * stream: perfmon_cpu
+     * <p>
+     * policy : perfmon_cpu_host_check / perfmon_cpu_pool_check
+     * <p>
+     * Create topic
+     * liasu@xxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic perfmon_metrics
+     * <p>
+     * 
+     * @throws InterruptedException
+     */
+    @Ignore
+    @Test
+    public void test_simple_threshhold() throws Exception {
+        System.setProperty("config.resource", "/application-integration.conf");
+        ConfigFactory.invalidateCaches();
+        Config config = ConfigFactory.load();
+
+        System.out.println("loading metadatas...");
+        loadMetadatas("/", config);
+        System.out.println("loading metadatas done!");
+
+        executors.submit(() -> SampleClient1.main(args));
+
+        executors.submit(() -> UnitTopologyMain.main(args));
+
+        Utils.sleep(1000 * 5l);
+        while (true) {
+            proactive_schedule(config);
+
+            Utils.sleep(1000 * 60l * 5);
+        }
+    }
+
+    /**
+     * Test only run expected when there is a missed config in the config file. mark as ignored
+     * @throws InterruptedException
+     * @throws ExecutionException
+     */
+    @Ignore
+    @Test(expected = ExecutionException.class)
+    public void test_typesafe_config() throws InterruptedException, ExecutionException {
+        System.setProperty("config.resource", "/application-integration.conf");
+        ConfigFactory.invalidateCaches();
+        Future<?> f = executors.submit(() -> {
+            UnitTopologyMain.main(null);
+        });
+
+        f.get();
+    }
+
+//    @Test
+//    private void makeSureTopic() {
+//        System.setProperty("config.resource", "/application-integration.conf");
+//        ConfigFactory.invalidateCaches();
+//        Config config = ConfigFactory.load();
+//        ZKConfig zkconfig = ZKConfigBuilder.getZKConfig(config);
+//        
+//        CuratorFramework curator = CuratorFrameworkFactory.newClient(
+//                zkconfig.zkQuorum,
+//                zkconfig.zkSessionTimeoutMs,
+//                zkconfig.connectionTimeoutMs,
+//                new RetryNTimes(zkconfig.zkRetryTimes, zkconfig.zkRetryInterval)
+//        );
+//    }
+
+    public static void proactive_schedule(Config config) throws Exception {
+
+        try (CoordinatorClient cc = new CoordinatorClient(config)) {
+            try {
+                String resp = cc.schedule();
+                LOG.info("schedule return : {} ", resp);
+            } catch (Exception e) {
+                LOG.error("failed to call schedule!", e);
+            }
+        }
+    }
+
+    public static void loadMetadatas(String base, Config config) throws Exception {
+        IMetadataServiceClient client = new MetadataServiceClientImpl(config);
+        client.clear();
+
+        List<Kafka2TupleMetadata> metadata = loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
+        for (Kafka2TupleMetadata k : metadata) {
+            client.addDataSource(k);
+        }
+
+        List<PolicyDefinition> policies = loadEntities(base + "policies.json", PolicyDefinition.class);
+        for (PolicyDefinition p : policies) {
+            client.addPolicy(p);
+        }
+
+        List<Publishment> pubs = loadEntities(base + "publishments.json", Publishment.class);
+        for (Publishment pub : pubs) {
+            client.addPublishment(pub);
+        }
+
+        List<StreamDefinition> defs = loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
+        for (StreamDefinition def : defs) {
+            client.addStreamDefinition(def);
+        }
+
+        List<Topology> topos = loadEntities(base + "topologies.json", Topology.class);
+        for (Topology t : topos) {
+            client.addTopology(t);
+        }
+
+        client.close();
+    }
+
+    public static <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
+        JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
+        List<T> l = om.readValue(Integration1.class.getResourceAsStream(path), type);
+        return l;
+    }
+
+    /**
+     * <p>
+     * {"name":"xxx","numOfSpout":1,"numOfAlertBolt":3,"numOfGroupBolt":2,
+     * "spoutId"
+     * :"xxx-spout","groupNodeIds":["xxx-grp"],"alertBoltIds":["xxx-bolt"
+     * ],"pubBoltId":"xxx-pubBolt","spoutParallelism":1,"groupParallelism":1,
+     * "alertParallelism":1}
+     * <p>
+     * 
+     * @throws Exception
+     */
+    @Ignore
+    @Test
+    public void testJson() throws Exception {
+        {
+            JavaType type = CollectionType.construct(List.class, SimpleType.construct(Topology.class));
+            List<Topology> l = om.readValue(Integration1.class.getResourceAsStream("/topologies.json"),
+                    type);
+            Topology t = (Topology) l.get(0);
+
+            Assert.assertEquals(4, t.getGroupNodeIds().size());
+            Assert.assertEquals(10, t.getAlertBoltIds().size());
+        }
+
+        {
+            JavaType type = CollectionType.construct(List.class, SimpleType.construct(Publishment.class));
+            // publishment
+            List<Publishment> l = om.readValue(Integration1.class.getResourceAsStream("/publishments.json"), type);
+            Publishment p = l.get(0);
+            Assert.assertEquals("KAFKA", p.getType());
+        }
+        
+        checkAll("/");
+        checkAll("/correlation/");
+    }
+
+    private void checkAll(String base) throws Exception {
+        loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
+        loadEntities(base + "policies.json", PolicyDefinition.class);
+        loadEntities(base + "publishments.json", Publishment.class);
+        loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
+        loadEntities(base + "topologies.json", Topology.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
new file mode 100644
index 0000000..7ea0e7e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+import backtype.storm.utils.Utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since May 10, 2016
+ *
+ */
+public class Integration2 {
+
+    private ExecutorService executors = Executors.newFixedThreadPool(5);
+    
+    /**
+     * <pre>
+     * Create topic
+     * liasu@xxxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic eslogs
+     * liasu@xxxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bootfailures
+     * </pre>
+     * 
+     * @param args
+     */
+    public static void main(String[] args) throws Exception {
+        Integration2 inte = new Integration2();
+        inte.args = args;
+        inte.test_start();
+    }
+
+    private String[] args;
+
+    @Ignore
+    @Test
+    public void test_start() throws Exception {
+        System.setProperty("config.resource", "/correlation/application-integration-2.conf");
+        ConfigFactory.invalidateCaches();
+        Config config = ConfigFactory.load();
+        Integration1.loadMetadatas("/correlation/", config);
+
+        executors.submit(() -> UnitTopologyMain.main(args));
+
+        executors.submit(() -> SampleClient2.main(args));
+
+        Utils.sleep(1000 * 5l);
+        while (true) {
+            Integration1.proactive_schedule(config);
+            Utils.sleep(1000 * 60l * 5);
+        }
+    }
+
+    @Test @Ignore
+    public void test3() throws Exception {
+        SiddhiManager sm = new SiddhiManager();
+        String s1 = " define stream esStream(instanceUuid string, timestamp long, logLevel string, message string, reqId string, host string, component string); ";
+        s1 += " define stream ifStream(instanceUuid string, timestamp long, reqId string, message string, host string); ";
+        s1 += "from esStream#window.externalTime(timestamp, 20 min) as a join ifStream#window.externalTime(timestamp, 5 min) as b on a.instanceUuid == b.instanceUuid  within 10 min select logLevel, a.host as aHost, a.component, a.message as logMessage, b.message as failMessage, a.timestamp as t1, b.timestamp as t2, b.host as bHost, count(1) as errorCount group by component insert into log_stream_join_output; ";
+        ExecutionPlanRuntime epr = sm.createExecutionPlanRuntime(s1);
+
+        epr.addCallback("log_stream_join_output", new StreamCallback() {
+            @Override
+            public void receive(Event[] arg0) {
+                System.out.println("join result!");
+                EventPrinter.print(arg0);
+            }
+        });
+
+        InputHandler input1 = epr.getInputHandler("esStream");
+        InputHandler input2 = epr.getInputHandler("ifStream");
+
+        epr.start();
+        
+        long base = 1462880695837l;
+        
+        while (true) {
+            sendEvent(input1, input2, base);
+            
+            base = base + 3000;
+            
+            Utils.sleep(3000);
+        }
+
+    }
+
+    private void sendEvent(InputHandler input1, InputHandler input2, long base) throws InterruptedException {
+        {
+            Event e = new Event();
+            e.setTimestamp(base);
+            e.setData(new Object[] {
+                    "instance-guid-c2a1c926-b590-418e-bf57-41469d7891fa",
+                    base,
+                    "ERROR",
+                    "NullPointException",
+                    "req-id-82dab92c-9e45-4ad8-8793-96e912831f00",
+                    "nova.host",
+                    "NOVA"
+            });
+            input1.send(e);
+        }
+        
+        {
+            Event e = new Event();
+            e.setTimestamp(base);
+            e.setData(new Object[] {"instance-guid-c2a1c926-b590-418e-bf57-41469d7891fa",
+                    base,
+                    "req-id-82dab92c-9e45-4ad8-8793-96e912831f00",
+                    "boot failure for when try start the given vm!",
+                    "boot-vm-data-center.corp.com"});
+            input2.send(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
new file mode 100644
index 0000000..348bf78
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.alert.utils.JsonUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.utils.Utils;
+
+/**
+ * @since May 9, 2016
+ *
+ */
+public class SampleClient1 {
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(SampleClient1.class);
+
+    private static final String PERFMON_CPU_STREAM = "perfmon_cpu_stream";
+    private static final String PERFMON_MEM_STREAM = "perfmon_mem_stream";
+
+//    private static int hostIndx = 1;
+    private static String hostTemp = "host-000%d.datacenter.corp.com";
+
+    /**
+     * <pre>
+     * {"host": "", "timestamp" : "", "metric" : "", "pool": "", "value": 1.0, "colo": "phx"}
+     * </pre>
+     */
+    public static class Entity {
+        public String host;
+        public long timestamp;
+        public String metric;
+        public String pool;
+        public double value;
+        public String colo;
+    }
+
+    public static void main(String[] args) {
+        long base = System.currentTimeMillis();
+        AtomicLong msgCount = new AtomicLong();
+
+        try (KafkaProducer<String, String> proceduer = createProceduer()) {
+            while (true) {
+                int hostIndex = 6;
+                for (int i = 0; i < hostIndex; i++) {
+                    base = send_metric(base, proceduer, PERFMON_CPU_STREAM, i);
+                    msgCount.incrementAndGet();
+                    base = send_metric(base, proceduer, PERFMON_MEM_STREAM, i);
+                    msgCount.incrementAndGet();
+                }
+
+                if ((msgCount.get() % 600) == 0) {
+                    System.out.println("send 600 CPU/MEM metric!");
+                }
+
+                Utils.sleep(3000);
+            }
+        }
+    }
+
+    private static long send_metric(long base, KafkaProducer<String, String> proceduer, String stream, int hostIndex) {
+
+        Pair<Long, String> pair = createEntity(base, stream, hostIndex);
+        base = pair.getKey();
+        ProducerRecord<String, String> record = new ProducerRecord<String, String>("perfmon_metrics",
+                pair.getRight());
+        proceduer.send(record);
+        return base;
+    }
+
+    private static Pair<Long, String> createEntity(long base, String stream, int hostIndex) {
+        // TODO : add randomization
+        Entity e = new Entity();
+        e.colo = "LVS";
+        e.host = String.format(hostTemp, hostIndex);
+        if (hostIndex < 3) {
+            e.pool = "hadoop-eagle-prod";
+        } else {
+            e.pool = "raptor-pool1";
+        }
+        e.timestamp = base;
+        e.metric = stream;
+        e.value = 92.0;
+
+        base = base + 1000;
+
+        return Pair.of(base, JsonUtils.writeValueAsString(e));
+    }
+
+    public static KafkaProducer<String, String> createProceduer() {
+
+        Properties configMap = new Properties();
+        // String broker_list = zkconfig.zkQuorum;
+        // TODO: replace boot strap servers with new workable server
+        configMap.put("bootstrap.servers", "localhost:9092");
+        // configMap.put("metadata.broker.list", broker_list);
+        configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("request.required.acks", "1");
+        configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap);
+        return proceduer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
new file mode 100644
index 0000000..06148cc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.alert.utils.JsonUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import backtype.storm.utils.Utils;
+
+/**
+ * @since May 10, 2016
+ *
+ */
+public class SampleClient2 {
+    
+//    private static final Logger LOG = LoggerFactory.getLogger(SampleClient2.class);
+
+    public static class LogEntity {
+        public String instanceUuid;
+        public long timestamp;
+        public String logLevel;
+        public String message;
+        public String reqId;
+        public String host;
+        public String component;
+    }
+
+    public static class IfEntity {
+        public String instanceUuid;
+        public long timestamp;
+        public String reqId;
+        public String message;
+        public String host;
+    }
+    
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        AtomicLong base1 = new AtomicLong(System.currentTimeMillis());
+        AtomicLong base2 = new AtomicLong(System.currentTimeMillis());
+        AtomicLong count = new AtomicLong();
+
+        try (KafkaProducer<String, String> proceduer = SampleClient1.createProceduer()) {
+            while (true) {
+                nextUuid = String.format(instanceUuidTemp, UUID.randomUUID().toString());
+                nextReqId = String.format(reqIdTemp, UUID.randomUUID().toString());
+
+                int hostIndex = 6;
+                for (int i = 0; i < hostIndex; i++) {
+                    sendMetric(base1, base2, count, proceduer, i);
+                }
+
+                if (count.get() % 600 == 0) {
+                    System.out.println("send 600 LOG/FAILURE metric!");
+                }
+
+                Utils.sleep(3000);
+
+            }
+        }
+    }
+
+    private static void sendMetric(AtomicLong base1, AtomicLong base2, AtomicLong count,
+            KafkaProducer<String, String> proceduer, int i) {
+        {
+            Pair<Long, String> pair = createLogEntity(base1, i);
+            ProducerRecord<String, String> logRecord = new ProducerRecord<>("eslogs", pair.getRight());
+            proceduer.send(logRecord);
+            count.incrementAndGet();
+        }
+        {
+            Pair<Long, String> pair2 = createFailureEntity(base2, i);
+            ProducerRecord<String, String> failureRecord = new ProducerRecord<>("bootfailures", pair2.getRight());
+            proceduer.send(failureRecord);
+            count.incrementAndGet();
+        }
+    }
+
+    private static String instanceUuidTemp = "instance-guid-%s";
+    private static String reqIdTemp = "req-id-%s";
+    private static String nextUuid;
+    private static String nextReqId;
+
+    private static Pair<Long, String> createLogEntity(AtomicLong base1, int hostIndex) {
+        // TODO: add randomization
+        LogEntity le = new LogEntity();
+        if (hostIndex < 3) {
+            le.component = "NOVA";
+            le.host = "nova.000-" + hostIndex + ".datacenter.corp.com";
+            le.message = "RabbitMQ Exception - MQ not connectable!";
+        } else {
+            le.component = "NEUTRON";
+            le.host = "neturon.000-" + (hostIndex - 3) + ".datacenter.corp.com";
+            le.message = "DNS Exception - Fail to connect to DNS!";
+        }
+        le.instanceUuid = nextUuid;
+        le.logLevel = "ERROR";
+        le.reqId = nextReqId;
+        le.timestamp = base1.get();
+
+        base1.addAndGet(1000);// simply some interval.
+        return Pair.of(base1.get(), JsonUtils.writeValueAsString(le));
+    }
+
+    private static Pair<Long, String> createFailureEntity(AtomicLong base, int hostIndex) {
+        // TODO: add randomization
+        IfEntity ie = new IfEntity();
+        ie.host = "boot-vm-0-" + hostIndex + ".datacenter.corp.com";
+        ie.instanceUuid = nextUuid;
+        ie.message = "boot failure for when try start the given vm!";
+        ie.reqId = nextReqId;
+        ie.timestamp = base.get();
+
+        base.addAndGet(2000);// simply some interval.
+        return Pair.of(base.get(), JsonUtils.writeValueAsString(ie));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
new file mode 100755
index 0000000..7bf4931
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorThreadSafeWrapper;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+
+public class AlertBoltOutputCollectorThreadSafeWrapperTest {
+    @Test
+    public void testThreadSafeAlertBoltOutputCollector(){
+        MockedStormAlertOutputCollector stormOutputCollector = new MockedStormAlertOutputCollector(null);
+        AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(stormOutputCollector);
+        alertBoltOutputCollectorWrapper.emit(create("mockAlert_1"));
+        alertBoltOutputCollectorWrapper.emit(create("mockAlert_2"));
+        Assert.assertEquals(0,stormOutputCollector.getCollected().size());
+        Assert.assertEquals(0,stormOutputCollector.getTupleSize());
+        alertBoltOutputCollectorWrapper.flush();
+        Assert.assertEquals(2,stormOutputCollector.getCollected().size());
+        Assert.assertEquals(2,stormOutputCollector.getTupleSize());
+        alertBoltOutputCollectorWrapper.emit(create("mockAlert_3"));
+        Assert.assertEquals(2,stormOutputCollector.getCollected().size());
+        Assert.assertEquals(2,stormOutputCollector.getTupleSize());
+        alertBoltOutputCollectorWrapper.flush();
+        alertBoltOutputCollectorWrapper.flush();
+        alertBoltOutputCollectorWrapper.flush();
+        Assert.assertEquals(3,stormOutputCollector.getCollected().size());
+        Assert.assertEquals(3,stormOutputCollector.getTupleSize());
+    }
+
+    private AlertStreamEvent create(String streamId){
+        AlertStreamEvent alert = new AlertStreamEvent();
+        alert.setCreatedBy(this.toString());
+        alert.setCreatedTime(System.currentTimeMillis());
+        alert.setData(new Object[]{"field_1",2,"field_3"});
+        alert.setStreamId(streamId);
+        return alert;
+    }
+
+    private class MockedStormAlertOutputCollector extends OutputCollector {
+        private final Map<Object,List<Object>> collected;
+        MockedStormAlertOutputCollector(IOutputCollector delegate) {
+            super(delegate);
+            collected = new HashMap<>();
+        }
+
+        @Override
+        public List<Integer> emit(String streamId, List<Object> tuple) {
+            if(!collected.containsKey(tuple.get(0))){
+                collected.put(tuple.get(0),new LinkedList<>());
+            }
+            collected.get(tuple.get(0)).add(tuple);
+            return null;
+        }
+        Map<Object,List<Object>> getCollected(){
+            return collected;
+        }
+
+        int getTupleSize(){
+            int size = 0;
+            for(List<Object> alerts:collected.values()){
+                size += alerts.size();
+            }
+            return size;
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message