eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [06/18] incubator-eagle git commit: EAGLE-324 Init branch-v0.5
Date Wed, 01 Jun 2016 06:00:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/storm/kafka/KafkaSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/storm/kafka/KafkaSpoutWrapper.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/storm/kafka/KafkaSpoutWrapper.java
new file mode 100644
index 0000000..1cef187
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/storm/kafka/KafkaSpoutWrapper.java
@@ -0,0 +1,111 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package storm.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.spout.ISpoutSpecLCM;
+import org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+
+/**
+ * NOTE!!!!! This class copy/paste some code from storm.kafka.KafkaSpout to make sure it can support one process to hold multiple
+ * KafkaSpout
+ *
+ * this collectorWrapper provides the following capabilities:
+ * 1. inject customized collector collectorWrapper, so framework can control traffic routing
+ * 2. listen to topic to stream metadata change and pass that to customized collector collectorWrapper
+ * 3. return current streams for this topic
+ */
+public class KafkaSpoutWrapper extends KafkaSpout implements ISpoutSpecLCM {
+    private static final long serialVersionUID = 5507693757424351306L;
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutWrapper.class);
+    private KafkaSpoutMetric kafkaSpoutMetric;
+
+    public KafkaSpoutWrapper(SpoutConfig spoutConf, KafkaSpoutMetric kafkaSpoutMetric) {
+        super(spoutConf);
+        this.kafkaSpoutMetric = kafkaSpoutMetric;
+    }
+
+    private SpoutOutputCollectorWrapper collectorWrapper;
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+        String topologyInstanceId = context.getStormId();
+        ////// !!!! begin copying code from storm.kafka.KafkaSpout to here
+        _collector = collector;
+
+        Map stateConf = new HashMap(conf);
+        List<String> zkServers = _spoutConfig.zkServers;
+        if (zkServers == null) {
+            zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        }
+        Integer zkPort = _spoutConfig.zkPort;
+        if (zkPort == null) {
+            zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+        }
+        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
+        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
+        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
+        _state = new ZkState(stateConf);
+
+        _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
+
+        // using TransactionalState like this is a hack
+        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
+        if (_spoutConfig.hosts instanceof StaticHosts) {
+            _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, topologyInstanceId);
+        } else {
+            _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, topologyInstanceId);
+        }
+
+        ////// !!!! end copying code from storm.kafka.KafkaSpout to here
+
+        // add new topic to metric
+        KafkaSpoutMetric.KafkaSpoutMetricContext metricContext = new KafkaSpoutMetric.KafkaSpoutMetricContext();
+        metricContext._connections = _connections;
+        metricContext._coordinator = _coordinator;
+        metricContext._spoutConfig = _spoutConfig;
+        kafkaSpoutMetric.addTopic(_spoutConfig.topic, metricContext);
+
+        this.collectorWrapper = (SpoutOutputCollectorWrapper)collector;
+    }
+
+    @Override
+    public void update(SpoutSpec metadata, Map<String, StreamDefinition> sds){
+        collectorWrapper.update(metadata, sds);
+    }
+
+    @Override
+    public void close(){
+        super.close();
+        kafkaSpoutMetric.removeTopic(_spoutConfig.topic);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/ALERT_DEFAULT.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/ALERT_DEFAULT.vm b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/ALERT_DEFAULT.vm
new file mode 100644
index 0000000..2da6288
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/ALERT_DEFAULT.vm
@@ -0,0 +1,267 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+	<head>
+		<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+		<meta name="viewport" content="width=device-width"/>
+		<style>
+			body {
+				width:100% !important;
+				min-width: 100%;
+				-webkit-text-size-adjust:100%;
+				-ms-text-size-adjust:100%;
+				margin:0;
+				padding:0;
+			}
+
+			table {
+				border-spacing: 0;
+				border-collapse: collapse;
+			}
+
+			table th,
+			table td {
+				padding: 3px 0 3px 0;
+			}
+
+			.body {
+				width: 100%;
+			}
+
+			p,a,h1,h2,h3,ul,ol,li {
+				font-family: Helvetica, Arial, sans-serif;
+				font-weight: normal;
+				margin: 0;
+				padding: 0;
+			}
+			p {
+				font-size: 14px;
+				line-height: 19px;
+			}
+			a {
+				color: #3294b1;
+			}
+			h1 {
+				font-size: 36px;
+				margin: 15px 0 5px 0;
+			}
+			h2 {
+				font-size: 32px;
+			}
+			h3 {
+				font-size: 28px;
+			}
+
+			ul,ol {
+				margin: 0 0 0 25px;
+				padding: 0;
+			}
+
+			.btn {
+				background: #2ba6cb !important;
+				border: 1px solid #2284a1;
+				padding: 10px 20px 10px 20px;
+				text-align: center;
+			}
+			.btn:hover {
+				background: #2795b6 !important;
+			}
+			.btn a {
+				color: #FFFFFF;
+				text-decoration: none;
+				font-weight: bold;
+				padding: 10px 20px 10px 20px;
+			}
+
+			.tableBordered {
+				border-top: 1px solid #b9e5ff;
+			}
+			.tableBordered th {
+				background: #ECF8FF;
+			}
+			.tableBordered th p {
+				font-weight: bold;
+				color: #3294b1;
+			}
+			.tableBordered th,
+			.tableBordered td {
+				color: #333333;
+				border-bottom: 1px solid #b9e5ff;
+				text-align: center;
+				padding-bottom: 5px;
+			}
+
+			.panel {
+				height: 100px;
+			}
+		</style>
+	</head>
+	<body>
+		#set ( $elem = $alertList[0] )
+		#set ( $alertUrl = $elem["alertDetailUrl"] )
+		#set ( $policyUrl = $elem["policyDetailUrl"] )
+		<table class="body">
+			<tr>
+				<td align="center" valign="top" style="background: #999999; padding: 0 0 0 0;">
+					<!-- Header -->
+					<table width="580">
+						<tr>
+							<td style="padding: 0 0 0 0;" align="left" >
+								<p style="color:#FFFFFF;font-weight: bold; font-size: 22px">UMP Alerts</p>
+							</td>
+						</tr>
+					</table>
+				</td>
+			</tr>
+
+			<tr>
+				<td align="center" valign="top">
+					<!-- Eagle Body -->
+					<table width="580">
+						<tr>
+							<!-- Title -->
+							<td align="center">
+								<h1>$elem["streamId"] Alert Detected</h1>
+							</td>
+						</tr>
+						<tr>
+							<!-- Time -->
+							<td>
+								<table width="580">
+									<tr>
+										<td>
+											<p><b>Detected Time: $elem["alertTime"]</b></p>
+										</td>
+										#set ( $severity = $elem["severity"] )
+										#if (!$severity || ("$severity" == ""))
+											#set ( $elem["severity"] = "WARNING")
+										#end
+										<td align="right">
+											<p><b>
+												Severity:
+									            #if ($elem["severity"] == "WARNING")
+													<span>$elem["severity"]</span>												
+    											#else
+													<span style="color: #FF0000;">$elem["severity"]</span>
+    											#end
+											</b></p>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- Description -->
+							<td valign="top" style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 10px;">
+								<p>$elem["alertMessage"]</p>
+							</td>
+						</tr>
+						<tr>
+							<!-- View Detail -->
+							<td align="center" style="padding: 10px 0 0 0;">
+								<table width="580">
+									<tr>
+										<td class="btn">
+											<a href="$alertUrl">View Alert Details on Eagle Web</a>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- Basic Information -->
+							<td style="padding: 20px 0 0 0;">
+								<p><b>Basic Information:</b></p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Basic Information Content -->
+							<td>
+								<table class="tableBordered" width="580">
+									<tr>
+										<th>
+											<p>Policy Name</p>
+										</th>
+										<th>
+											<p>Data Source</p>
+										</th>
+									</tr>
+									<tr>
+										<td>
+											<p>$elem["policyId"]</p>
+										</td>
+										<td>
+											<p>$elem["streamId"]</p>
+										</td>
+									</tr>
+									<tr>
+
+										<th>
+											<p>Creator</p>
+										</th>
+										<th>
+											<p>Severity</p>
+										</th>
+									</tr>
+									<tr>
+										<td>
+											<p>$elem["creator"]</p>
+										</td>
+										<td>
+											<p>$elem["severity"]</p>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- View Detail -->
+							<td align="center" style="padding: 10px 0 0 0;">
+								<table width="580">
+									<tr>
+										<td class="btn">
+											<a href="$policyUrl">View Policy Details on Eagle Web</a>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>						
+						<tr>
+							<!-- Actions Required -->
+							<td style="padding: 20px 0 0 0;">
+								<p><b>Actions Required:</b></p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Possible Root Causes Content -->
+							<td class="panel" valign="top" style="background: #F4F4F4; border: 1px solid #AAAAAA; padding: 10px 10px 12px 10px;">
+								<p> $elem["streamId"] alert found, please check.</p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Copyright -->
+							<td align="center">
+								<p><a href="<Eagle-Host>/alerts/alertlist.html">UMP Alert Engine</a></p>
+							</td>
+						</tr>
+					</table>
+				</td>
+			</tr>
+		</table>
+	</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/application.conf b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/application.conf
new file mode 100644
index 0000000..b9308ef
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/application.conf
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_1",
+    "numOfTotalWorkers" : 2,
+    "numOfSpoutTasks" : 1,
+    "numOfRouterBolts" : 4,
+    "numOfAlertBolts" : 10,
+    "numOfPublishTasks" : 1,
+    "messageTimeoutSecs": 3600,
+    "localMode" : "true"
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "localhost:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+    "context" : "/api",
+    "host" : "localhost",
+    "port" : 8080
+  },
+  "coordinatorService": {
+    "host": "localhost",
+    "port": 9090,
+    "context" : "/api"
+  }
+  "metric":{
+    "sink": {
+      "kafka": {
+        "topic": "alert_metric"
+        "bootstrap.servers": "localhost:6667"
+      }
+      "stdout": {}
+      //      "elasticsearch": {
+      //        "hosts": ["localhost:9200"]
+      //        "index": "alert_metric"
+      //        "timestampField": "timestamp"
+      //      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/eagle.siddhiext b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/eagle.siddhiext
new file mode 100644
index 0000000..506bad9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/eagle.siddhiext
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# externalTimeBatch=org.apache.eagle.policy.siddhi.extension.ExternalTimeBatchWindowProcessor
+collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/log4j.properties b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/log4j.properties
new file mode 100644
index 0000000..af99e2c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/resources/log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+##log4j.logger.org.apache.eagle.alert.engine.spout.CorrelationSpout=DEBUG
+log4j.logger.org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper=DEBUG
+log4j.logger.org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector=DEBUG
+log4j.logger.org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl=DEBUG
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
new file mode 100644
index 0000000..aebf3b5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import javax.ws.rs.core.MediaType;
+
+import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.typesafe.config.Config;
+
+/**
+ * @since May 9, 2016
+ *
+ */
+public class CoordinatorClient implements Closeable {
+
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorClient.class);
+
+    private static final String EAGLE_COORDINATOR_SERVICE_CONTEXT = "coordinatorService.context";
+    private static final String EAGLE_COORDINATOR_SERVICE_PORT = "coordinatorService.port";
+    private static final String EAGLE_COORDINATOR_SERVICE_HOST = "coordinatorService.host";
+    private static final String COORDINATOR_SCHEDULE_API = "/coordinator/build";
+
+    private String host;
+    private int port;
+    private String context;
+    private transient Client client;
+    private String basePath;
+
+    public CoordinatorClient(Config config) {
+        this(config.getString(EAGLE_COORDINATOR_SERVICE_HOST), config.getInt(EAGLE_COORDINATOR_SERVICE_PORT), config
+                .getString(EAGLE_COORDINATOR_SERVICE_CONTEXT));
+        basePath = buildBasePath();
+    }
+
+    public CoordinatorClient(String host, int port, String context) {
+        this.host = host;
+        this.port = port;
+        this.context = context;
+        this.basePath = buildBasePath();
+        ClientConfig cc = new DefaultClientConfig();
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_CONNECT_TIMEOUT, 60 * 1000);
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_READ_TIMEOUT, 60 * 1000);
+        cc.getClasses().add(JacksonJsonProvider.class);
+        cc.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
+        this.client = Client.create(cc);
+        client.addFilter(new com.sun.jersey.api.client.filter.GZIPContentEncodingFilter());
+    }
+
+    private String buildBasePath() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("http://");
+        sb.append(host);
+        sb.append(":");
+        sb.append(port);
+        sb.append(context);
+        return sb.toString();
+    }
+
+    public String schedule() {
+        WebResource r = client.resource(basePath + COORDINATOR_SCHEDULE_API);
+        return r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(String.class);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.client.destroy();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
new file mode 100644
index 0000000..057aa73
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.utils.Utils;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+/**
+ * Case of simple
+ * 
+ * @since May 8, 2016
+ *
+ */
+public class Integration1 {
+    private static final Logger LOG = LoggerFactory.getLogger(Integration1.class);
+    private static final ObjectMapper om = new ObjectMapper();
+
+    public static void main(String[] args) throws Exception {
+        Integration1 inte = new Integration1();
+        inte.args = args;
+        inte.test_simple_threshhold();
+    }
+    
+    private String[] args;
+    private ExecutorService executors = Executors.newFixedThreadPool(5);
+
+    /**
+     * Assumption:
+     * <p>
+     * start metadata service 8080, better in docker
+     * <p>
+     * start coordinator service 9090, better in docker
+     * <p>
+     * datasources : perfmon_datasource
+     * <p>
+     * stream: perfmon_cpu
+     * <p>
+     * policy : perfmon_cpu_host_check / perfmon_cpu_pool_check
+     * <p>
+     * Create topic
+     * liasu@xxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic perfmon_metrics
+     * <p>
+     * 
+     * @throws InterruptedException
+     */
+    @Ignore
+    @Test
+    public void test_simple_threshhold() throws Exception {
+        System.setProperty("config.resource", "/application-integration.conf");
+        ConfigFactory.invalidateCaches();
+        Config config = ConfigFactory.load();
+
+        System.out.println("loading metadatas...");
+        loadMetadatas("/", config);
+        System.out.println("loading metadatas done!");
+
+        executors.submit(() -> SampleClient1.main(args));
+
+        executors.submit(() -> UnitTopologyMain.main(args));
+
+        Utils.sleep(1000 * 5l);
+        while (true) {
+            proactive_schedule(config);
+
+            Utils.sleep(1000 * 60l * 5);
+        }
+    }
+
+    /**
+     * Test only run expected when there is a missed config in the config file. mark as ignored
+     * @throws InterruptedException
+     * @throws ExecutionException
+     */
+    @Ignore
+    @Test(expected = ExecutionException.class)
+    public void test_typesafe_config() throws InterruptedException, ExecutionException {
+        System.setProperty("config.resource", "/application-integration.conf");
+        ConfigFactory.invalidateCaches();
+        Future<?> f = executors.submit(() -> {
+            UnitTopologyMain.main(null);
+        });
+
+        f.get();
+    }
+
+//    @Test
+//    private void makeSureTopic() {
+//        System.setProperty("config.resource", "/application-integration.conf");
+//        ConfigFactory.invalidateCaches();
+//        Config config = ConfigFactory.load();
+//        ZKConfig zkconfig = ZKConfigBuilder.getZKConfig(config);
+//        
+//        CuratorFramework curator = CuratorFrameworkFactory.newClient(
+//                zkconfig.zkQuorum,
+//                zkconfig.zkSessionTimeoutMs,
+//                zkconfig.connectionTimeoutMs,
+//                new RetryNTimes(zkconfig.zkRetryTimes, zkconfig.zkRetryInterval)
+//        );
+//    }
+
+    public static void proactive_schedule(Config config) throws Exception {
+
+        try (CoordinatorClient cc = new CoordinatorClient(config)) {
+            try {
+                String resp = cc.schedule();
+                LOG.info("schedule return : {} ", resp);
+            } catch (Exception e) {
+                LOG.error("failed to call schedule!", e);
+            }
+        }
+    }
+
+    public static void loadMetadatas(String base, Config config) throws Exception {
+        IMetadataServiceClient client = new MetadataServiceClientImpl(config);
+        client.clear();
+
+        List<Kafka2TupleMetadata> metadata = loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
+        for (Kafka2TupleMetadata k : metadata) {
+            client.addDataSource(k);
+        }
+
+        List<PolicyDefinition> policies = loadEntities(base + "policies.json", PolicyDefinition.class);
+        for (PolicyDefinition p : policies) {
+            client.addPolicy(p);
+        }
+
+        List<Publishment> pubs = loadEntities(base + "publishments.json", Publishment.class);
+        for (Publishment pub : pubs) {
+            client.addPublishment(pub);
+        }
+
+        List<StreamDefinition> defs = loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
+        for (StreamDefinition def : defs) {
+            client.addStreamDefinition(def);
+        }
+
+        List<Topology> topos = loadEntities(base + "topologies.json", Topology.class);
+        for (Topology t : topos) {
+            client.addTopology(t);
+        }
+
+        client.close();
+    }
+
+    public static <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
+        JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
+        List<T> l = om.readValue(Integration1.class.getResourceAsStream(path), type);
+        return l;
+    }
+
+    /**
+     * <p>
+     * {"name":"xxx","numOfSpout":1,"numOfAlertBolt":3,"numOfGroupBolt":2,
+     * "spoutId"
+     * :"xxx-spout","groupNodeIds":["xxx-grp"],"alertBoltIds":["xxx-bolt"
+     * ],"pubBoltId":"xxx-pubBolt","spoutParallelism":1,"groupParallelism":1,
+     * "alertParallelism":1}
+     * <p>
+     * 
+     * @throws Exception
+     */
+    @Ignore
+    @Test
+    public void testJson() throws Exception {
+        {
+            JavaType type = CollectionType.construct(List.class, SimpleType.construct(Topology.class));
+            List<Topology> l = om.readValue(Integration1.class.getResourceAsStream("/topologies.json"),
+                    type);
+            Topology t = (Topology) l.get(0);
+
+            Assert.assertEquals(4, t.getGroupNodeIds().size());
+            Assert.assertEquals(10, t.getAlertBoltIds().size());
+        }
+
+        {
+            JavaType type = CollectionType.construct(List.class, SimpleType.construct(Publishment.class));
+            // publishment
+            List<Publishment> l = om.readValue(Integration1.class.getResourceAsStream("/publishments.json"), type);
+            Publishment p = l.get(0);
+            Assert.assertEquals("KAFKA", p.getType());
+        }
+        
+        checkAll("/");
+        checkAll("/correlation/");
+    }
+
+    private void checkAll(String base) throws Exception {
+        loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
+        loadEntities(base + "policies.json", PolicyDefinition.class);
+        loadEntities(base + "publishments.json", Publishment.class);
+        loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
+        loadEntities(base + "topologies.json", Topology.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
new file mode 100644
index 0000000..7ea0e7e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+import backtype.storm.utils.Utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since May 10, 2016
+ *
+ */
+public class Integration2 {
+
+    private ExecutorService executors = Executors.newFixedThreadPool(5);
+    
+    /**
+     * <pre>
+     * Create topic
+     * liasu@xxxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic eslogs
+     * liasu@xxxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bootfailures
+     * </pre>
+     * 
+     * @param args
+     */
+    public static void main(String[] args) throws Exception {
+        Integration2 inte = new Integration2();
+        inte.args = args;
+        inte.test_start();
+    }
+
+    private String[] args;
+
+    @Ignore
+    @Test
+    public void test_start() throws Exception {
+        System.setProperty("config.resource", "/correlation/application-integration-2.conf");
+        ConfigFactory.invalidateCaches();
+        Config config = ConfigFactory.load();
+        Integration1.loadMetadatas("/correlation/", config);
+
+        executors.submit(() -> UnitTopologyMain.main(args));
+
+        executors.submit(() -> SampleClient2.main(args));
+
+        Utils.sleep(1000 * 5l);
+        while (true) {
+            Integration1.proactive_schedule(config);
+            Utils.sleep(1000 * 60l * 5);
+        }
+    }
+
+    @Test @Ignore
+    public void test3() throws Exception {
+        SiddhiManager sm = new SiddhiManager();
+        String s1 = " define stream esStream(instanceUuid string, timestamp long, logLevel string, message string, reqId string, host string, component string); ";
+        s1 += " define stream ifStream(instanceUuid string, timestamp long, reqId string, message string, host string); ";
+        s1 += "from esStream#window.externalTime(timestamp, 20 min) as a join ifStream#window.externalTime(timestamp, 5 min) as b on a.instanceUuid == b.instanceUuid  within 10 min select logLevel, a.host as aHost, a.component, a.message as logMessage, b.message as failMessage, a.timestamp as t1, b.timestamp as t2, b.host as bHost, count(1) as errorCount group by component insert into log_stream_join_output; ";
+        ExecutionPlanRuntime epr = sm.createExecutionPlanRuntime(s1);
+
+        epr.addCallback("log_stream_join_output", new StreamCallback() {
+            @Override
+            public void receive(Event[] arg0) {
+                System.out.println("join result!");
+                EventPrinter.print(arg0);
+            }
+        });
+
+        InputHandler input1 = epr.getInputHandler("esStream");
+        InputHandler input2 = epr.getInputHandler("ifStream");
+
+        epr.start();
+        
+        long base = 1462880695837l;
+        
+        while (true) {
+            sendEvent(input1, input2, base);
+            
+            base = base + 3000;
+            
+            Utils.sleep(3000);
+        }
+
+    }
+
+    private void sendEvent(InputHandler input1, InputHandler input2, long base) throws InterruptedException {
+        {
+            Event e = new Event();
+            e.setTimestamp(base);
+            e.setData(new Object[] {
+                    "instance-guid-c2a1c926-b590-418e-bf57-41469d7891fa",
+                    base,
+                    "ERROR",
+                    "NullPointException",
+                    "req-id-82dab92c-9e45-4ad8-8793-96e912831f00",
+                    "nova.host",
+                    "NOVA"
+            });
+            input1.send(e);
+        }
+        
+        {
+            Event e = new Event();
+            e.setTimestamp(base);
+            e.setData(new Object[] {"instance-guid-c2a1c926-b590-418e-bf57-41469d7891fa",
+                    base,
+                    "req-id-82dab92c-9e45-4ad8-8793-96e912831f00",
+                    "boot failure for when try start the given vm!",
+                    "boot-vm-data-center.corp.com"});
+            input2.send(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
new file mode 100644
index 0000000..348bf78
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.alert.utils.JsonUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.utils.Utils;
+
+/**
+ * @since May 9, 2016
+ *
+ */
+public class SampleClient1 {
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(SampleClient1.class);
+
+    private static final String PERFMON_CPU_STREAM = "perfmon_cpu_stream";
+    private static final String PERFMON_MEM_STREAM = "perfmon_mem_stream";
+
+//    private static int hostIndx = 1;
+    private static String hostTemp = "host-000%d.datacenter.corp.com";
+
+    /**
+     * <pre>
+     * {"host": "", "timestamp" : "", "metric" : "", "pool": "", "value": 1.0, "colo": "phx"}
+     * </pre>
+     */
+    public static class Entity {
+        public String host;
+        public long timestamp;
+        public String metric;
+        public String pool;
+        public double value;
+        public String colo;
+    }
+
+    public static void main(String[] args) {
+        long base = System.currentTimeMillis();
+        AtomicLong msgCount = new AtomicLong();
+
+        try (KafkaProducer<String, String> proceduer = createProceduer()) {
+            while (true) {
+                int hostIndex = 6;
+                for (int i = 0; i < hostIndex; i++) {
+                    base = send_metric(base, proceduer, PERFMON_CPU_STREAM, i);
+                    msgCount.incrementAndGet();
+                    base = send_metric(base, proceduer, PERFMON_MEM_STREAM, i);
+                    msgCount.incrementAndGet();
+                }
+
+                if ((msgCount.get() % 600) == 0) {
+                    System.out.println("send 600 CPU/MEM metric!");
+                }
+
+                Utils.sleep(3000);
+            }
+        }
+    }
+
+    private static long send_metric(long base, KafkaProducer<String, String> proceduer, String stream, int hostIndex) {
+
+        Pair<Long, String> pair = createEntity(base, stream, hostIndex);
+        base = pair.getKey();
+        ProducerRecord<String, String> record = new ProducerRecord<String, String>("perfmon_metrics",
+                pair.getRight());
+        proceduer.send(record);
+        return base;
+    }
+
+    private static Pair<Long, String> createEntity(long base, String stream, int hostIndex) {
+        // TODO : add randomization
+        Entity e = new Entity();
+        e.colo = "LVS";
+        e.host = String.format(hostTemp, hostIndex);
+        if (hostIndex < 3) {
+            e.pool = "hadoop-eagle-prod";
+        } else {
+            e.pool = "raptor-pool1";
+        }
+        e.timestamp = base;
+        e.metric = stream;
+        e.value = 92.0;
+
+        base = base + 1000;
+
+        return Pair.of(base, JsonUtils.writeValueAsString(e));
+    }
+
+    public static KafkaProducer<String, String> createProceduer() {
+
+        Properties configMap = new Properties();
+        // String broker_list = zkconfig.zkQuorum;
+        // TODO: replace boot strap servers with new workable server
+        configMap.put("bootstrap.servers", "localhost:9092");
+        // configMap.put("metadata.broker.list", broker_list);
+        configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("request.required.acks", "1");
+        configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap);
+        return proceduer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
new file mode 100644
index 0000000..06148cc
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.alert.utils.JsonUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import backtype.storm.utils.Utils;
+
+/**
+ * @since May 10, 2016
+ *
+ */
+public class SampleClient2 {
+    
+//    private static final Logger LOG = LoggerFactory.getLogger(SampleClient2.class);
+
+    public static class LogEntity {
+        public String instanceUuid;
+        public long timestamp;
+        public String logLevel;
+        public String message;
+        public String reqId;
+        public String host;
+        public String component;
+    }
+
+    public static class IfEntity {
+        public String instanceUuid;
+        public long timestamp;
+        public String reqId;
+        public String message;
+        public String host;
+    }
+    
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        AtomicLong base1 = new AtomicLong(System.currentTimeMillis());
+        AtomicLong base2 = new AtomicLong(System.currentTimeMillis());
+        AtomicLong count = new AtomicLong();
+
+        try (KafkaProducer<String, String> proceduer = SampleClient1.createProceduer()) {
+            while (true) {
+                nextUuid = String.format(instanceUuidTemp, UUID.randomUUID().toString());
+                nextReqId = String.format(reqIdTemp, UUID.randomUUID().toString());
+
+                int hostIndex = 6;
+                for (int i = 0; i < hostIndex; i++) {
+                    sendMetric(base1, base2, count, proceduer, i);
+                }
+
+                if (count.get() % 600 == 0) {
+                    System.out.println("send 600 LOG/FAILURE metric!");
+                }
+
+                Utils.sleep(3000);
+
+            }
+        }
+    }
+
+    private static void sendMetric(AtomicLong base1, AtomicLong base2, AtomicLong count,
+            KafkaProducer<String, String> proceduer, int i) {
+        {
+            Pair<Long, String> pair = createLogEntity(base1, i);
+            ProducerRecord<String, String> logRecord = new ProducerRecord<>("eslogs", pair.getRight());
+            proceduer.send(logRecord);
+            count.incrementAndGet();
+        }
+        {
+            Pair<Long, String> pair2 = createFailureEntity(base2, i);
+            ProducerRecord<String, String> failureRecord = new ProducerRecord<>("bootfailures", pair2.getRight());
+            proceduer.send(failureRecord);
+            count.incrementAndGet();
+        }
+    }
+
+    private static String instanceUuidTemp = "instance-guid-%s";
+    private static String reqIdTemp = "req-id-%s";
+    private static String nextUuid;
+    private static String nextReqId;
+
+    private static Pair<Long, String> createLogEntity(AtomicLong base1, int hostIndex) {
+        // TODO: add randomization
+        LogEntity le = new LogEntity();
+        if (hostIndex < 3) {
+            le.component = "NOVA";
+            le.host = "nova.000-" + hostIndex + ".datacenter.corp.com";
+            le.message = "RabbitMQ Exception - MQ not connectable!";
+        } else {
+            le.component = "NEUTRON";
+            le.host = "neturon.000-" + (hostIndex - 3) + ".datacenter.corp.com";
+            le.message = "DNS Exception - Fail to connect to DNS!";
+        }
+        le.instanceUuid = nextUuid;
+        le.logLevel = "ERROR";
+        le.reqId = nextReqId;
+        le.timestamp = base1.get();
+
+        base1.addAndGet(1000);// simply some interval.
+        return Pair.of(base1.get(), JsonUtils.writeValueAsString(le));
+    }
+
+    private static Pair<Long, String> createFailureEntity(AtomicLong base, int hostIndex) {
+        // TODO: add randomization
+        IfEntity ie = new IfEntity();
+        ie.host = "boot-vm-0-" + hostIndex + ".datacenter.corp.com";
+        ie.instanceUuid = nextUuid;
+        ie.message = "boot failure for when try start the given vm!";
+        ie.reqId = nextReqId;
+        ie.timestamp = base.get();
+
+        base.addAndGet(2000);// simply some interval.
+        return Pair.of(base.get(), JsonUtils.writeValueAsString(ie));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
new file mode 100755
index 0000000..518d496
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
@@ -0,0 +1,74 @@
+package org.apache.eagle.alert.engine.evaluator;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorThreadSafeWrapper;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+
+public class AlertBoltOutputCollectorThreadSafeWrapperTest {
+    @Test
+    public void testThreadSafeAlertBoltOutputCollector(){
+        MockedStormAlertOutputCollector stormOutputCollector = new MockedStormAlertOutputCollector(null);
+        AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(stormOutputCollector);
+        alertBoltOutputCollectorWrapper.emit(create("mockAlert_1"));
+        alertBoltOutputCollectorWrapper.emit(create("mockAlert_2"));
+        Assert.assertEquals(0,stormOutputCollector.getCollected().size());
+        Assert.assertEquals(0,stormOutputCollector.getTupleSize());
+        alertBoltOutputCollectorWrapper.flush();
+        Assert.assertEquals(2,stormOutputCollector.getCollected().size());
+        Assert.assertEquals(2,stormOutputCollector.getTupleSize());
+        alertBoltOutputCollectorWrapper.emit(create("mockAlert_3"));
+        Assert.assertEquals(2,stormOutputCollector.getCollected().size());
+        Assert.assertEquals(2,stormOutputCollector.getTupleSize());
+        alertBoltOutputCollectorWrapper.flush();
+        alertBoltOutputCollectorWrapper.flush();
+        alertBoltOutputCollectorWrapper.flush();
+        Assert.assertEquals(3,stormOutputCollector.getCollected().size());
+        Assert.assertEquals(3,stormOutputCollector.getTupleSize());
+    }
+
+    private AlertStreamEvent create(String streamId){
+        AlertStreamEvent alert = new AlertStreamEvent();
+        alert.setCreatedBy(this.toString());
+        alert.setCreatedTime(System.currentTimeMillis());
+        alert.setData(new Object[]{"field_1",2,"field_3"});
+        alert.setStreamId(streamId);
+        return alert;
+    }
+
+    private class MockedStormAlertOutputCollector extends OutputCollector {
+        private final Map<Object,List<Object>> collected;
+        MockedStormAlertOutputCollector(IOutputCollector delegate) {
+            super(delegate);
+            collected = new HashMap<>();
+        }
+
+        @Override
+        public List<Integer> emit(String streamId, List<Object> tuple) {
+            if(!collected.containsKey(tuple.get(0))){
+                collected.put(tuple.get(0),new LinkedList<>());
+            }
+            collected.get(tuple.get(0)).add(tuple);
+            return null;
+        }
+        Map<Object,List<Object>> getCollected(){
+            return collected;
+        }
+
+        int getTupleSize(){
+            int size = 0;
+            for(List<Object> alerts:collected.values()){
+                size += alerts.size();
+            }
+            return size;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
new file mode 100755
index 0000000..ab19801
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
+import org.apache.eagle.alert.engine.mock.MockStreamCollector;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.metric.api.MultiCountMetric;
+
+public class SiddhiCEPPolicyEventHandlerTest {
+    private final static Logger LOG = LoggerFactory.getLogger(SiddhiCEPPolicyEventHandlerTest.class);
+
+    private Map<String, StreamDefinition> createDefinition(String ... streamIds) {
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        for(String streamId:streamIds) {
+            // construct StreamDefinition
+            StreamDefinition sd = MockSampleMetadataFactory.createSampleStreamDefinition(streamId);
+            sds.put(streamId, sd);
+        }
+        return sds;
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testBySendSimpleEvent() throws Exception {
+        SiddhiPolicyHandler handler;
+        MockStreamCollector collector;
+
+        handler = new SiddhiPolicyHandler(createDefinition("sampleStream_1","sampleStream_2"));
+        collector = new MockStreamCollector();
+        PolicyDefinition policyDefinition = MockSampleMetadataFactory.createSingleMetricSamplePolicy();
+        PolicyHandlerContext context = new PolicyHandlerContext();
+        context.setPolicyDefinition(policyDefinition);
+        context.setPolicyCounter(new MultiCountMetric());
+        handler.prepare(collector,context);
+        StreamEvent event = StreamEvent.Builder()
+                .schema(MockSampleMetadataFactory.createSampleStreamDefinition("sampleStream_1"))
+                .streamId("sampleStream_1")
+                .timestamep(System.currentTimeMillis())
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","cpu");
+                    put("value",60.0);
+                    put("bad","bad column value");
+                }}).build();
+        handler.send(event);
+        handler.close();
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testWithTwoStreamJoinPolicy() throws Exception {
+        Map<String,StreamDefinition> ssd = createDefinition("sampleStream_1","sampleStream_2");
+
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("SampleJoinPolicyForTest");
+        policyDefinition.setInputStreams(Arrays.asList("sampleStream_1","sampleStream_2"));
+        policyDefinition.setOutputStreams(Collections.singletonList("joinedStream"));
+        policyDefinition.setDefinition(new PolicyDefinition.Definition(PolicyStreamHandlers.SIDDHI_ENGINE,
+                "from sampleStream_1#window.length(10) as left " +
+                "join sampleStream_2#window.length(10) as right " +
+                "on left.name == right.name and left.value == right.value " +
+                "select left.timestamp,left.name,left.value "+
+                "insert into joinedStream"));
+        policyDefinition.setPartitionSpec(Collections.singletonList(MockSampleMetadataFactory.createSampleStreamGroupbyPartition("sampleStream_1", Collections.singletonList("name"))));
+        SiddhiPolicyHandler handler;
+        Semaphore mutex = new Semaphore(0);
+        List<AlertStreamEvent> alerts = new ArrayList<>(0);
+        Collector<AlertStreamEvent> collector = (event) -> {
+            LOG.info("Collected {}",event);
+            Assert.assertTrue(event != null);
+            alerts.add(event);
+            mutex.release();
+        };
+
+        handler = new SiddhiPolicyHandler(ssd);
+        PolicyHandlerContext context = new PolicyHandlerContext();
+        context.setPolicyDefinition(policyDefinition);
+        context.setPolicyCounter(new MultiCountMetric());
+        handler.prepare(collector,context);
+
+        long ts_1 = System.currentTimeMillis();
+        long ts_2 = System.currentTimeMillis()+1;
+
+        handler.send(StreamEvent.Builder()
+                .schema(ssd.get("sampleStream_1"))
+                .streamId("sampleStream_1")
+                .timestamep(ts_1)
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","cpu");
+                    put("value",60.0);
+                    put("bad","bad column value");
+                }}).build());
+
+        handler.send(StreamEvent.Builder()
+                .schema(ssd.get("sampleStream_2"))
+                .streamId("sampleStream_2")
+                .timestamep(ts_2)
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","cpu");
+                    put("value",61.0);
+                }}).build());
+
+        handler.send(StreamEvent.Builder()
+                .schema(ssd.get("sampleStream_2"))
+                .streamId("sampleStream_2")
+                .timestamep(ts_2)
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","disk");
+                    put("value",60.0);
+                }}).build());
+
+        handler.send(StreamEvent.Builder()
+                .schema(ssd.get("sampleStream_2"))
+                .streamId("sampleStream_2")
+                .timestamep(ts_2)
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","cpu");
+                    put("value",60.0);
+                }}).build());
+
+        handler.close();
+
+        Assert.assertTrue("Should get result in 5 s",mutex.tryAcquire(5, TimeUnit.SECONDS));
+        Assert.assertEquals(1,alerts.size());
+        Assert.assertEquals("joinedStream",alerts.get(0).getStreamId());
+        Assert.assertEquals("cpu",alerts.get(0).getData()[1]);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
new file mode 100644
index 0000000..d887451
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.integration;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+
+@SuppressWarnings("serial")
+public class MockMetadataServiceClient implements IMetadataServiceClient {
+
+    @Override
+    public List<SpoutSpec>  listSpoutMetadata() {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public ScheduleState getVersionedSpec(String version) {
+        return null;
+    }
+
+    @Override
+    public List<StreamingCluster> listClusters() {
+        return null;
+    }
+
+    @Override
+    public List<PolicyDefinition> listPolicies() {
+        return null;
+    }
+
+    @Override
+    public List<StreamDefinition> listStreams() {
+        return null;
+    }
+
+    @Override
+    public List<Kafka2TupleMetadata> listDataSources() {
+        return null;
+    }
+
+    @Override
+    public List<Publishment> listPublishment() {
+        return null;
+    }
+
+    @Override
+    public ScheduleState getVersionedSpec() {
+        return null;
+    }
+
+    @Override
+    public void addScheduleState(ScheduleState state) {
+        
+    }
+
+    @Override
+    public List<Topology> listTopologies() {
+        return null;
+    }
+
+    @Override
+    public void addStreamingCluster(StreamingCluster cluster) {
+        
+    }
+
+    @Override
+    public void addTopology(Topology t) {
+        
+    }
+
+    @Override
+    public void addPolicy(PolicyDefinition policy) {
+        
+    }
+
+    @Override
+    public void addStreamDefinition(StreamDefinition streamDef) {
+        
+    }
+
+    @Override
+    public void addDataSource(Kafka2TupleMetadata k2t) {
+        
+    }
+
+    @Override
+    public void addPublishment(Publishment pub) {
+        
+    }
+
+    @Override
+    public void clear() {
+        
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
new file mode 100644
index 0000000..76aef7e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
@@ -0,0 +1,47 @@
+package org.apache.eagle.alert.engine.metric;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class MemoryUsageGaugeSetTest {
+    private final Logger LOG = LoggerFactory.getLogger(MemoryUsageGaugeSetTest.class);
+
+    @Test
+    public void testJVMMetrics() throws InterruptedException {
+        LOG.info("Starting testJVMMetrics");
+        final MetricRegistry metrics = new MetricRegistry();
+        ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics)
+                .convertRatesTo(TimeUnit.SECONDS)
+                .convertDurationsTo(TimeUnit.MILLISECONDS)
+                .build();
+        metrics.registerAll(new MemoryUsageGaugeSet());
+        metrics.register("sample", (Gauge<Double>) () -> 0.1234);
+        reporter.start(1, TimeUnit.SECONDS);
+        Thread.sleep(5000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
new file mode 100644
index 0000000..74d11d2
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
@@ -0,0 +1,55 @@
+package org.apache.eagle.alert.engine.mock;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class MockPartitionedCollector implements PartitionedEventCollector {
+    @SuppressWarnings("unused")
+    private final static Logger LOG = LoggerFactory.getLogger(MockPartitionedCollector.class);
+    private List<PartitionedEvent> cache;
+    public MockPartitionedCollector(){
+        cache = new LinkedList<>();
+    }
+
+    public void emit(PartitionedEvent event) {
+        cache.add(event);
+    }
+
+    public void clear(){
+        cache.clear();
+    }
+
+    public List<PartitionedEvent> get(){
+        return cache;
+    }
+
+    public int size(){
+        return cache.size();
+    }
+
+    @Override
+    public void drop(PartitionedEvent event) {
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
new file mode 100644
index 0000000..97e6310
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.mock;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+@SuppressWarnings("serial")
+public class MockSampleMetadataFactory {
+    private static MockStreamMetadataService mockStreamMetadataServiceInstance = null;
+    public static MockStreamMetadataService createSingletonMetadataServiceWithSample(){
+        if(mockStreamMetadataServiceInstance!=null) return mockStreamMetadataServiceInstance;
+        mockStreamMetadataServiceInstance = new MockStreamMetadataService();
+        mockStreamMetadataServiceInstance.registerStream("sampleStream",createSampleStreamDefinition("sampleStream"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_1",createSampleStreamDefinition("sampleStream_1"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_2",createSampleStreamDefinition("sampleStream_2"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_3",createSampleStreamDefinition("sampleStream_3"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_4",createSampleStreamDefinition("sampleStream_4"));
+        return mockStreamMetadataServiceInstance;
+    }
+
+    public static StreamDefinition createSampleStreamDefinition(String streamId){
+        StreamDefinition sampleStreamDefinition = new StreamDefinition();
+        sampleStreamDefinition.setStreamId(streamId);
+        sampleStreamDefinition.setTimeseries(true);
+        sampleStreamDefinition.setValidate(true);
+        sampleStreamDefinition.setDescription("Schema for "+streamId);
+        List<StreamColumn> streamColumns = new ArrayList<>();
+
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+//        streamColumns.add(new StreamColumn.Builder().name("value1").type(StreamColumn.Type.DOUBLE).build());
+//        streamColumns.add(new StreamColumn.Builder().name("value2").type(StreamColumn.Type.DOUBLE).build());
+//        streamColumns.add(new StreamColumn.Builder().name("value3").type(StreamColumn.Type.DOUBLE).build());
+//        streamColumns.add(new StreamColumn.Builder().name("value4").type(StreamColumn.Type.DOUBLE).build());
+//        streamColumns.add(new StreamColumn.Builder().name("value5").type(StreamColumn.Type.DOUBLE).build());
+        sampleStreamDefinition.setColumns(streamColumns);
+        return sampleStreamDefinition;
+    }
+
+    /**
+     * By default window period is: PT1m
+     *
+     * @param streamId
+     * @return
+     */
+    public static StreamSortSpec createSampleStreamSortSpec(String streamId){
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+//        streamSortSpec.setColumn("timestamp");
+//        streamSortSpec.setStreamId(streamId);
+        streamSortSpec.setWindowMargin(1000);
+        streamSortSpec.setWindowPeriod("PT1m");
+        return streamSortSpec;
+    }
+
+    public static StreamSortSpec createSampleStreamSortSpec(String streamId,String period,int margin){
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+//        streamSortSpec.setColumn("timestamp");
+//        streamSortSpec.setStreamId(streamId);
+        streamSortSpec.setWindowMargin(margin);
+        streamSortSpec.setWindowPeriod(period);
+        return streamSortSpec;
+    }
+
+    /**
+     * Policy: from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;
+     *
+     * @return PolicyDefinition[from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;]
+     */
+    public static PolicyDefinition createSingleMetricSamplePolicy(){
+        String definePolicy = "from sampleStream_1[name == \"cpu\" and value > 50.0] select name, host, flag, value insert into outputStream;";
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("SamplePolicyForTest");
+        policyDefinition.setInputStreams(Arrays.asList("sampleStream_1"));
+        policyDefinition.setOutputStreams(Arrays.asList("outputStream"));
+        policyDefinition.setDefinition(new PolicyDefinition.Definition(
+                PolicyStreamHandlers.SIDDHI_ENGINE,
+                definePolicy
+        ));
+        policyDefinition.setPartitionSpec(Arrays.asList(createSampleStreamGroupbyPartition("sampleStream_1",Arrays.asList("name"))));
+        return policyDefinition;
+    }
+
+    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
+        StreamPartition streamPartition = new StreamPartition();
+        streamPartition.setStreamId(streamId);
+        streamPartition.setColumns(new ArrayList<>(groupByField));
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT30m");
+        streamSortSpec.setWindowMargin(10000);
+        streamPartition.setSortSpec(streamSortSpec);
+        return streamPartition;
+    }
+
+    public static StreamRouterSpec createSampleStreamRouteSpec(String streamId, String groupByField, List<String> targetEvaluatorIds){
+        List<WorkSlot> slots = Arrays.asList(targetEvaluatorIds.stream().map((t) -> {
+            return new WorkSlot("sampleTopology", t);
+        }).toArray(WorkSlot[]::new));
+        StreamRouterSpec streamRouteSpec = new StreamRouterSpec();
+        streamRouteSpec.setStreamId(streamId);
+        streamRouteSpec.setPartition(createSampleStreamGroupbyPartition(streamId,Arrays.asList(groupByField)));
+        streamRouteSpec.setTargetQueue(Arrays.asList(new PolicyWorkerQueue(slots)));
+        return streamRouteSpec;
+    }
+
+    public static StreamRouterSpec createSampleStreamRouteSpec(List<String> targetEvaluatorIds){
+        return createSampleStreamRouteSpec("sampleStream_1","name",targetEvaluatorIds);
+    }
+
+    /**
+     * GROUPBY_sampleStream_1_ON_name
+     *
+     * @param targetEvaluatorIds
+     * @return
+     */
+    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_1_BY_name(List<String> targetEvaluatorIds){
+        return createSampleStreamRouteSpec("sampleStream_1","name",targetEvaluatorIds);
+    }
+
+    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_2_BY_name(List<String> targetEvaluatorIds){
+        return createSampleStreamRouteSpec("sampleStream_2","name",targetEvaluatorIds);
+    }
+
+    public static PartitionedEvent createSimpleStreamEvent()  {
+        StreamEvent event = null;
+        try {
+            event = StreamEvent.Builder()
+                .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition("sampleStream_1"))
+                .streamId("sampleStream_1")
+                .timestamep(System.currentTimeMillis())
+                .attributes(new HashMap<String,Object>(){{
+                    put("name","cpu");
+                    put("value",60.0);
+                    put("unknown","unknown column value");
+                }}).build();
+        } catch (StreamDefinitionNotFoundException e) {
+            e.printStackTrace();
+        }
+        PartitionedEvent pEvent = new PartitionedEvent();
+        pEvent.setEvent(event);
+        return pEvent;
+    }
+
+    private final static String[] SAMPLE_STREAM_NAME_OPTIONS=new String[]{
+            "cpu","memory","disk","network"
+    };
+
+    private final static String[] SAMPLE_STREAM_HOST_OPTIONS =new String[]{
+            "localhost_1","localhost_2","localhost_3","localhost_4"
+    };
+
+    private final static Boolean[] SAMPLE_STREAM_FLAG_OPTIONS=new Boolean[]{
+            true,false
+    };
+
+    private final static Double[] SAMPLE_STREAM_VALUE_OPTIONS=new Double[]{
+            -0.20, 40.4,50.5,60.6,10000.1
+    };
+    private final static String[] SAMPLE_STREAM_ID_OPTIONS=new String[]{
+            "sampleStream_1","sampleStream_2","sampleStream_3","sampleStream_4",
+    };
+    private final static Random RANDOM = ThreadLocalRandom.current();
+
+    public static StreamEvent createRandomStreamEvent()  {
+        return createRandomStreamEvent(SAMPLE_STREAM_ID_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_ID_OPTIONS.length)]);
+    }
+
+    public static StreamEvent createRandomStreamEvent(String streamId)  {
+        return createRandomStreamEvent(streamId,System.currentTimeMillis());
+    }
+
+    private final static Long[] TIME_DELTA_OPTIONS = new Long[]{
+            -30000L, -10000L, -5000L, -1000L, 0L, 1000L, 5000L, 10000L, 30000L
+    };
+
+    public static StreamEvent createRandomOutOfTimeOrderStreamEvent(String streamId)  {
+        StreamEvent event = createRandomStreamEvent(streamId);
+        event.setTimestamp(System.currentTimeMillis()+TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
+        return event;
+    }
+
+
+    public static PartitionedEvent createRandomOutOfTimeOrderEventGroupedByName(String streamId)  {
+        StreamEvent event = createRandomStreamEvent(streamId);
+        event.setTimestamp(System.currentTimeMillis()+TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
+        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+    }
+
+    public static PartitionedEvent createPartitionedEventGroupedByName(String streamId,long timestamp)  {
+        StreamEvent event = createRandomStreamEvent(streamId);
+        event.setTimestamp(timestamp);
+        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+    }
+
+    public static PartitionedEvent createRandomSortedEventGroupedByName(String streamId)  {
+        StreamEvent event = createRandomStreamEvent(streamId);
+        event.setTimestamp(System.currentTimeMillis());
+        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+    }
+
+    public static StreamEvent createRandomStreamEvent(String streamId, long timestamp)  {
+        StreamEvent event;
+        try {
+            event = StreamEvent.Builder()
+                    .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition(streamId))
+                    .streamId(streamId)
+                    .timestamep(timestamp)
+                    .attributes(new HashMap<String,Object>(){{
+                        put("name",SAMPLE_STREAM_NAME_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_NAME_OPTIONS.length)]);
+                        put("value", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+                        put("host", SAMPLE_STREAM_HOST_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_HOST_OPTIONS.length)]);
+                        put("flag",SAMPLE_STREAM_FLAG_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_FLAG_OPTIONS.length)]);
+//                        put("value1", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+//                        put("value2", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+//                        put("value3", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+//                        put("value4", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+//                        put("value5", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+                        put("unknown","unknown column value");
+                    }}).build();
+        } catch (StreamDefinitionNotFoundException e) {
+            throw new IllegalStateException(e.getMessage(),e);
+        }
+        return event;
+    }
+
+    public static PartitionedEvent createRandomPartitionedEvent(String streamId, long timestamp)  {
+        StreamEvent event = createRandomStreamEvent(streamId,timestamp);
+        PartitionedEvent partitionedEvent = new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+        return partitionedEvent;
+    }
+}
\ No newline at end of file


Mime
View raw message