eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [2/8] incubator-eagle git commit: EAGLE-79 Provide aggregation and persistence DSL support
Date Tue, 12 Jan 2016 07:47:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
new file mode 100644
index 0000000..fdbcda0
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.executor;
+
+import com.codahale.metrics.MetricRegistry;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.datastream.Tuple2;
+import org.apache.eagle.metric.reportor.EagleCounterMetric;
+import org.apache.eagle.metric.reportor.EagleMetricListener;
+import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener;
+import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder;
+import org.apache.eagle.policy.*;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.config.AbstractPolicyDefinition;
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.siddhi.StreamMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * The stream process executor based on two types
+ * @since Dec 17, 2015
+ *
+ * @param <T> - The policy definition entity type
+ * @param <K> - The stream entity type
+ */
+public abstract class PolicyProcessExecutor<T extends AbstractPolicyDefinitionEntity, K>
+		extends JavaStormStreamExecutor2<String, K> 
+		implements PolicyLifecycleMethods<T>, PolicyDistributionReportMethods, IPolicyExecutor<T, K>
+{
+	
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(PolicyProcessExecutor.class);
+	
+	public static final String EAGLE_EVENT_COUNT = "eagle.event.count";
+	public static final String EAGLE_POLICY_EVAL_COUNT = "eagle.policy.eval.count";
+	public static final String EAGLE_POLICY_EVAL_FAIL_COUNT = "eagle.policy.eval.fail.count";
+	public static final String EAGLE_ALERT_COUNT = "eagle.alert.count";
+	public static final String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count";
+
+	private	 static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE;
+
+	private final Class<T> policyDefinitionClz;
+	private String executorId;
+	private volatile CopyOnWriteHashMap<String, PolicyEvaluator<T>> policyEvaluators;
+	private PolicyPartitioner partitioner;
+	private int numPartitions;
+	private int partitionSeq;
+	private Config config;
+	private Map<String, Map<String, T>> initialAlertDefs;
+	private String[] sourceStreams;
+
+	/**
+	 * metricMap's key = metricName[#policyId]
+	 */
+	private Map<String, Map<String, String>> dimensionsMap; // cache it for performance
+	private Map<String, String> baseDimensions;
+
+	private MetricRegistry registry;
+	private EagleMetricListener listener;
+
+	private PolicyDefinitionDAO<T> policyDefinitionDao;
+
+	public PolicyProcessExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
+                         PolicyDefinitionDAO<T> alertDefinitionDao, String[] sourceStreams, Class<T> clz){
+		this.executorId = alertExecutorId;
+		this.partitioner = partitioner;
+		this.numPartitions = numPartitions;
+		this.partitionSeq = partitionSeq;
+		this.policyDefinitionDao = alertDefinitionDao;
+		this.sourceStreams = sourceStreams;
+		this.policyDefinitionClz = clz;
+	}
+	
+	public String getExecutorId(){
+		return this.executorId;
+	}
+	
+	public int getNumPartitions() {
+		return this.numPartitions;
+	}
+	
+	public int getPartitionSeq(){
+		return this.partitionSeq;
+	}
+	
+	public PolicyPartitioner getPolicyPartitioner() {
+		return this.partitioner;
+	}
+	
+	public Map<String, Map<String, T>> getInitialAlertDefs() {
+		return this.initialAlertDefs;
+	}
+		
+	public PolicyDefinitionDAO<T> getPolicyDefinitionDao() {
+		return policyDefinitionDao;
+	}
+
+    public Map<String, PolicyEvaluator<T>> getPolicyEvaluators(){
+        return policyEvaluators;
+    }
+	
+	@Override
+	public void prepareConfig(Config config) {
+		this.config = config;
+	}
+	
+	private void initMetricReportor() {
+		String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+		int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+
+		String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ?
+				          config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
+		String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
+				          config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
+		
+		registry = new MetricRegistry();
+		listener = new EagleServiceReporterMetricListener(host, port, username, password);
+		
+		baseDimensions = new HashMap<>();
+		baseDimensions = new HashMap<String, String>();
+		baseDimensions.put(Constants.ALERT_EXECUTOR_ID, executorId);
+		baseDimensions.put(Constants.PARTITIONSEQ, String.valueOf(partitionSeq));
+		baseDimensions.put(Constants.SOURCE, ManagementFactory.getRuntimeMXBean().getName());
+		baseDimensions.put(EagleConfigConstants.DATA_SOURCE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE));
+		baseDimensions.put(EagleConfigConstants.SITE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE));
+
+		dimensionsMap = new HashMap<String, Map<String, String>>();
+	}
+
+    public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
+        return new AlertStreamSchemaDAOImpl(config);
+    }
+
+	@Override
+	public void init() {
+		// initialize StreamMetadataManager before it is used
+		StreamMetadataManager.getInstance().init(config, getAlertStreamSchemaDAO(config));
+		// for each AlertDefinition, to create a PolicyEvaluator
+		Map<String, PolicyEvaluator<T>> tmpPolicyEvaluators = new HashMap<String, PolicyEvaluator<T>>();
+		
+        String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
+		String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
+		try {
+			initialAlertDefs = policyDefinitionDao.findActivePoliciesGroupbyExecutorId(site, dataSource);
+		}
+		catch (Exception ex) {
+			LOG.error("fail to initialize initialAlertDefs: ", ex);
+            throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
+		}
+        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
+            LOG.warn("No alert definitions was found for site: " + site + ", dataSource: " + dataSource);
+        }
+        else if (initialAlertDefs.get(executorId) != null) { 
+			for(T alertDef : initialAlertDefs.get(executorId).values()){
+				int part = partitioner.partition(numPartitions, alertDef.getTags().get(Constants.POLICY_TYPE), alertDef.getTags().get(Constants.POLICY_ID));
+				if (part == partitionSeq) {
+					tmpPolicyEvaluators.put(alertDef.getTags().get(Constants.POLICY_ID), createPolicyEvaluator(alertDef));
+				}
+			}
+		}
+		
+		policyEvaluators = new CopyOnWriteHashMap<>();
+		// for efficiency, we don't put single policy evaluator
+		policyEvaluators.putAll(tmpPolicyEvaluators);
+		DynamicPolicyLoader<T> policyLoader = DynamicPolicyLoader.getInstanceOf(policyDefinitionClz);
+		policyLoader.init(initialAlertDefs, policyDefinitionDao, config);
+        String fullQualifiedAlertExecutorId = executorId + "_" + partitionSeq;
+		policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this);
+        policyLoader.addPolicyDistributionReporter(fullQualifiedAlertExecutorId, this);
+		LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions);
+        LOG.info("All policy evaluators: " + policyEvaluators);
+		
+		initMetricReportor();
+	}
+
+    /**
+     * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class
+     *
+     * @param alertDef alert definition
+     * @return PolicyEvaluator instance
+     */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	protected PolicyEvaluator<T> createPolicyEvaluator(T alertDef){
+		String policyType = alertDef.getTags().get(Constants.POLICY_TYPE);
+		Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType);
+		if(evalCls == null){
+			String msg = "No policy evaluator defined for policy type : " + policyType;
+			LOG.error(msg);
+			throw new IllegalStateException(msg);
+		}
+		
+		// check out whether strong incoming data validation is necessary
+        String needValidationConfigKey= Constants.ALERT_EXECUTOR_CONFIGS + "." + executorId + ".needValidation";
+
+        // Default: true
+        boolean needValidation = !config.hasPath(needValidationConfigKey) || config.getBoolean(needValidationConfigKey);
+
+		AbstractPolicyDefinition policyDef = null;
+		try {
+			policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class, 
+					PolicyManager.getInstance().getPolicyModules(policyType));
+		} catch (Exception ex) {
+			LOG.error("Fail initial alert policy def: "+alertDef.getPolicyDef(), ex);
+		}
+		PolicyEvaluator<T> pe;
+		try {
+			// Create evaluator instances
+			pe = (PolicyEvaluator<T>) evalCls
+					.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
+					.newInstance(config, alertDef.getTags().get("policyId"), policyDef, sourceStreams, needValidation);
+		}catch(Exception ex){
+			LOG.error("Fail creating new policyEvaluator", ex);
+			LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());
+			throw new IllegalStateException(ex);
+		}
+		return pe;
+	}
+
+    /**
+     * verify both alertExecutor logic name and partition id
+     * @param alertDef alert definition
+     *
+     * @return whether accept the alert definition
+     */
+	private boolean accept(T alertDef){
+		String executorID = alertDef.getTags().containsKey("executorId") ? alertDef.getTags().get("executorId")
+				: alertDef.getTags().get("alertExecutorId");
+
+		if(!executorID.equals(executorId)) {
+            if(LOG.isDebugEnabled()){
+                LOG.debug("alertDef does not belong to this alertExecutorId : " + executorId + ", alertDef : " + alertDef);
+            }
+            return false;
+        }
+		int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(Constants.POLICY_TYPE), alertDef.getTags().get(Constants.POLICY_ID));
+		if(targetPartitionSeq == partitionSeq)
+			return true;
+		return false;
+	}
+	
+	private long trim(long value, long granularity) {
+		return value / granularity * granularity;
+	}
+
+	private void updateCounter(String name, Map<String, String> dimensions, double value) {
+		long current = System.currentTimeMillis();
+		String metricName = MetricKeyCodeDecoder.codeMetricKey(name, dimensions);
+		if (registry.getMetrics().get(metricName) == null) {
+			EagleCounterMetric metric = new EagleCounterMetric(current, metricName, value, MERITE_GRANULARITY);
+			metric.registerListener(listener);
+			registry.register(metricName, metric);
+		} else {
+			EagleCounterMetric metric = (EagleCounterMetric) registry.getMetrics().get(metricName);
+			metric.update(value, current);
+			// TODO: need remove unused metric from registry
+		}
+	}
+	
+	private void updateCounter(String name, Map<String, String> dimensions) {
+		updateCounter(name, dimensions, 1.0);
+	}
+	
+	protected Map<String, String> getDimensions(String policyId) {
+		if (dimensionsMap.get(policyId) == null) {
+			Map<String, String> newDimensions = new HashMap<String, String>(baseDimensions);
+			newDimensions.put(Constants.POLICY_ID, policyId);
+			dimensionsMap.put(policyId, newDimensions);
+		}
+		return dimensionsMap.get(policyId);
+	}
+	
+    /**
+     * within this single executor, execute all PolicyEvaluator sequentially
+     * the contract for input:
+     * 1. total # of fields for input is 3, which is fixed
+     * 2. the first field is key
+     * 3. the second field is stream name
+     * 4. the third field is value which is java SortedMap
+     */
+    @Override
+    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, K>> outputCollector){
+        if(input.size() != 3)
+            throw new IllegalStateException("AlertExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)");
+        if(LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2));
+        if(LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + policyEvaluators.keySet().toString());
+
+        updateCounter(EAGLE_EVENT_COUNT, baseDimensions);
+        try{
+            synchronized(this.policyEvaluators) {
+                for(Entry<String, PolicyEvaluator<T>> entry : policyEvaluators.entrySet()){
+                    String policyId = entry.getKey();
+                    PolicyEvaluator<T> evaluator = entry.getValue();
+                    updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(policyId));
+                    try {
+                        PolicyEvaluationContext<T, K> evaluationContext = new PolicyEvaluationContext<T, K>();
+                        evaluationContext.alertExecutor = this;
+                        evaluationContext.policyId = policyId;
+                        evaluationContext.evaluator = evaluator;
+                        evaluationContext.outputCollector = outputCollector;
+						evaluationContext.resultRender = getResultRender();
+                        evaluator.evaluate(new ValuesArray(evaluationContext, input.get(1), input.get(2)));
+                    }
+                    catch (Exception ex) {
+                        LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);
+                        updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(policyId));
+                    }
+                }
+            }
+        } catch(Exception ex){
+            LOG.error(executorId + ", partition " + partitionSeq + ", error fetching alerts, but continue to run", ex);
+            updateCounter(EAGLE_ALERT_FAIL_COUNT, baseDimensions);
+        }
+    }
+
+	@Override
+	public void onPolicyCreated(Map<String, T> added) {
+		if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " + partitionSeq + " policy added : " + added + " policyEvaluators " + policyEvaluators);
+		for(T alertDef : added.values()){
+			if(!accept(alertDef))
+				continue;
+			LOG.info(executorId + ", partition " + partitionSeq + " policy really added " + alertDef);
+			PolicyEvaluator<T> newEvaluator = createPolicyEvaluator(alertDef);
+			if(newEvaluator != null){
+				synchronized(this.policyEvaluators) {
+					policyEvaluators.put(alertDef.getTags().get(Constants.POLICY_ID), newEvaluator);
+				}
+			}
+		}
+	}
+
+	@Override
+	public void onPolicyChanged(Map<String, T> changed) {
+		if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " + partitionSeq + " policy changed : " + changed);
+		for(T alertDef : changed.values()){
+			if(!accept(alertDef))
+				continue;
+			LOG.info(executorId + ", partition " + partitionSeq + " policy really changed " + alertDef);
+			synchronized(this.policyEvaluators) {
+				PolicyEvaluator<T> pe = policyEvaluators.get(alertDef.getTags().get(Constants.POLICY_ID));
+				pe.onPolicyUpdate(alertDef);
+			}
+		}
+	}
+
+	@Override
+	public void onPolicyDeleted(Map<String, T> deleted) {
+		if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " + partitionSeq + " policy deleted : " + deleted);
+		for(T alertDef : deleted.values()){
+			if(!accept(alertDef))
+				continue;
+			LOG.info(executorId + ", partition " + partitionSeq + " policy really deleted " + alertDef);
+			String policyId = alertDef.getTags().get(Constants.POLICY_ID);
+			synchronized(this.policyEvaluators) {			
+				if (policyEvaluators.containsKey(policyId)) {
+					PolicyEvaluator<T> pe = policyEvaluators.remove(alertDef.getTags().get(Constants.POLICY_ID));
+					pe.onPolicyDelete();
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public void onEvalEvents(PolicyEvaluationContext<T, K> context, List<K> alerts) {
+		if(alerts != null && !alerts.isEmpty()){
+			String policyId = context.policyId;
+            LOG.info(String.format("Detected %d alerts for policy %s", alerts.size(), policyId));
+			Collector outputCollector = context.outputCollector;
+			PolicyEvaluator<T> evaluator = context.evaluator;
+			updateCounter(EAGLE_ALERT_COUNT, getDimensions(policyId), alerts.size());
+			for (K entity : alerts) {
+				synchronized(this) {
+					outputCollector.collect(new Tuple2(policyId, entity));
+				}
+				if(LOG.isDebugEnabled()) LOG.debug("A new alert is triggered: " + executorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity + ", for policy " + evaluator);
+			}
+		}
+	}
+
+	public abstract ResultRender<T, K> getResultRender();
+
+    @Override
+    public void report() {
+        PolicyDistroStatsLogReporter appender = new PolicyDistroStatsLogReporter();
+        appender.reportPolicyMembership(executorId + "_" + partitionSeq, policyEvaluators.keySet());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java
new file mode 100644
index 0000000..a5d506f
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.siddhi;
+
+/**
+ * @see org.wso2.siddhi.query.api.definition.Attribute.Type
+ */
+public enum AttributeType {
+	STRING,
+	LONG,
+	INTEGER,
+	BOOL,
+    FLOAT,
+    DOUBLE
+//    , OBJECT
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java
new file mode 100644
index 0000000..2e8fc55
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.siddhi;
+
+import org.apache.eagle.policy.PolicyEvaluationContext;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+
+import java.util.List;
+
+public interface SiddhiEvaluationHandler<T extends AbstractPolicyDefinitionEntity, K> {
+
+	void onEvalEvents(PolicyEvaluationContext<T, K> context, List<K> alerts);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
new file mode 100644
index 0000000..421a464
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.siddhi;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.eagle.policy.config.AbstractPolicyDefinition;
+
+/**
+ * siddhi policy definition has the following format
+ * {
+        "type":"SiddhiCEPEngine",
+		"expression" : "from every b1=HeapUsage[metric == 'eagle.metric.gc'] -> a1=FullGCEvent[eventName == 'full gc'] -> b2=HeapUsage[metric == b1.metric and host == b1.host and value >= b1.value * 0.8] within 100 sec select a1.eventName, b1.metric, b2.timestamp, 60 as timerange insert into GCMonitor; "
+	}
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SiddhiPolicyDefinition extends AbstractPolicyDefinition {
+	private String expression;
+
+	private boolean containsDefintion;
+
+	public boolean isContainsDefintion() {
+		return containsDefintion;
+	}
+
+	public void setContainsDefintion(boolean containsDefintion) {
+		this.containsDefintion = containsDefintion;
+	}
+
+	public String getExpression() {
+		return expression;
+	}
+	public void setExpression(String expression) {
+		this.expression = expression;
+	}
+	
+	@Override
+	public String toString(){
+		return expression;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
new file mode 100644
index 0000000..c9ccc5e
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.siddhi;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.policy.PolicyEvaluator;
+import org.apache.eagle.policy.PolicyManager;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.config.AbstractPolicyDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.execution.query.Query;
+import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+/**
+ * when policy is updated or deleted, SiddhiManager.shutdown should be invoked to release resources.
+ * during this time, synchronization is important
+ */
+public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K> implements PolicyEvaluator<T>{
+	
+	private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class);
+	
+	private volatile SiddhiRuntime siddhiRuntime;
+	private String[] sourceStreams;
+	private boolean needValidation;
+	private String policyId;
+	private Config config;
+	private final static String EXECUTION_PLAN_NAME = "query";
+	
+	/**
+	 * everything dependent on policyDef should be together and switched in runtime
+	 */
+	public static class SiddhiRuntime{
+		QueryCallback callback;
+		Map<String, InputHandler> siddhiInputHandlers;
+		SiddhiManager siddhiManager;
+		SiddhiPolicyDefinition policyDef;
+		List<String> outputFields;
+		String executionPlanName;
+	}
+	
+	public SiddhiPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams){
+		this(config, policyName, policyDef, sourceStreams, false);
+	}
+	
+	public SiddhiPolicyEvaluator(Config config, String policyId, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
+		this.config = config;
+		this.policyId = policyId;
+		this.needValidation = needValidation;
+		this.sourceStreams = sourceStreams; 
+		init(policyDef);
+	}
+	
+	public void init(AbstractPolicyDefinition policyDef){			
+		siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
+	}
+
+	public static String addContextFieldIfNotExist(String expression) {		
+		// select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB
+		int pos = expression.indexOf("select ") + 7;
+		int index = pos;
+		boolean isSelectStarPattern = true;
+		while(index < expression.length()) {
+			if (expression.charAt(index) == ' ') index++;
+			else if (expression.charAt(index) == '*') break;
+			else {
+				isSelectStarPattern = false;
+				break;
+			}
+		}
+		if (isSelectStarPattern) return expression;
+		StringBuilder sb = new StringBuilder();
+		sb.append(expression.substring(0, pos));
+		sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + ",");
+		sb.append(expression.substring(pos, expression.length()));
+		return sb.toString();
+	}
+
+	private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition policyDef){
+		SiddhiManager siddhiManager = new SiddhiManager();
+		Map<String, InputHandler> siddhiInputHandlers = new HashMap<String, InputHandler>();
+
+		// compose execution plan sql
+		String executionPlan = policyDef.getExpression();
+		if (!policyDef.isContainsDefintion()) {
+			StringBuilder sb = new StringBuilder();
+			for (String sourceStream : sourceStreams) {
+				String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream);
+				LOG.info("Siddhi stream definition : " + streamDef);
+				sb.append(streamDef);
+			}
+
+			String expression = addContextFieldIfNotExist(policyDef.getExpression());
+			executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + expression;
+		}
+
+		ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+		
+		for(String sourceStream : sourceStreams){			
+			siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream));
+		}
+		executionPlanRuntime.start();
+
+		QueryCallback callback = new SiddhiQueryCallbackImpl<T, K>(config, this);		
+
+		LOG.info("Siddhi query: " + executionPlan);
+		executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback);
+
+		List<String> outputFields = new ArrayList<String>();
+		try {
+	        Field field = QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME);
+	        field.setAccessible(true);
+	        Query query = (Query)field.get(callback);
+	        List<OutputAttribute> list = query.getSelector().getSelectionList();
+	        for (OutputAttribute output : list) {	        	
+	        	outputFields.add(output.getRename());
+	        }
+		}
+		catch (Exception ex) {
+			LOG.error("Got an Exception when initial outputFields ", ex);
+		}
+		SiddhiRuntime runtime = new SiddhiRuntime();
+		runtime.siddhiInputHandlers = siddhiInputHandlers;
+		runtime.siddhiManager = siddhiManager;
+		runtime.callback = callback;
+		runtime.policyDef = policyDef;
+		runtime.outputFields = outputFields;
+		runtime.executionPlanName = executionPlanRuntime.getName();
+		return runtime;
+	}
+	
+	/**
+	 * 1. input has 3 fields, first is siddhi context, second is streamName, the last one is map of attribute name/value
+	 * 2. runtime check for input data (This is very expensive, so we ignore for now)
+	 *     the size of input map should be equal to size of attributes which stream metadata defines
+	 *     the attribute names should be equal to attribute names which stream metadata defines
+	 *     the input field cannot be null
+	 */
+	@SuppressWarnings({ "rawtypes" })
+	@Override
+	public void evaluate(ValuesArray data) throws Exception {
+		if(LOG.isDebugEnabled()) LOG.debug("Siddhi policy evaluator consumers data :" + data);
+        Object siddhiAlertContext = data.get(0);
+		String streamName = (String)data.get(1);
+		SortedMap map = (SortedMap)data.get(2);
+		validateEventInRuntime(streamName, map);
+		synchronized(siddhiRuntime){
+			//insert siddhiAlertContext into the first field
+			List<Object> input = new ArrayList<>();
+			input.add(siddhiAlertContext);
+			input.add(streamName);
+			putAttrsIntoInputStream(input, streamName, map);
+			siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
+		}
+	}
+
+	/**
+	 * This is a heavy operation, we should avoid to use.
+     *
+     * This validation method will skip invalid fields in event which are not declared in stream schema otherwise it will cause exception for siddhi engine.
+     *
+     * @see <a href="https://issues.apache.org/jira/browse/EAGLE-49">https://issues.apache.org/jira/browse/EAGLE-49</a>
+     *
+	 * @param sourceStream source steam id
+	 * @param data input event
+	 */
+	private void validateEventInRuntime(String sourceStream, SortedMap data){
+		if(!needValidation)
+			return;
+		SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(sourceStream);
+        if(!map.keySet().equals(data.keySet())){
+            Set<Object> badKeys = new TreeSet<>();
+            for(Object key:data.keySet()) if(!map.containsKey(key)) badKeys.add(key);
+            LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s", badKeys.toString(),data.toString(), sourceStream,map.keySet().toString()));
+            for(Object key:badKeys) data.remove(key);
+        }
+	}
+
+	private void putAttrsIntoInputStream(List<Object> input, String streamName, SortedMap map) {
+		if(!needValidation) {
+			input.addAll(map.values());
+			return;
+		}
+		for (Object key : map.keySet()) {
+			Object value = map.get(key);
+			if (value == null) {
+				input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, (String)key));
+			}
+			else input.add(value);
+		}
+	}
+
+	@Override
+	public void onPolicyUpdate(T newAlertDef) {
+		AbstractPolicyDefinition policyDef = null;
+		try {
+			policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(), 
+					AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(Constants.POLICY_TYPE)));
+		}
+		catch (Exception ex) {
+			LOG.error("Initial policy def error, ", ex);
+		}
+		SiddhiRuntime previous = siddhiRuntime;
+		siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
+		synchronized(previous){
+			previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown();
+		}
+	}
+	
+	@Override
+	public void onPolicyDelete(){
+		synchronized(siddhiRuntime){
+			LOG.info("Going to shutdown siddhi execution plan, planName: " + siddhiRuntime.executionPlanName);
+			siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown();
+			LOG.info("Siddhi execution plan " + siddhiRuntime.executionPlanName + " is successfully shutdown ");
+		}
+	}
+	
+	@Override
+	public String toString(){
+		return siddhiRuntime.policyDef.toString();
+	}
+	
+	public String[] getStreamNames() {
+		return sourceStreams;
+	}
+
+	public Map<String, String> getAdditionalContext() {
+		Map<String, String> context = new HashMap<String, String>();
+		StringBuilder sourceStreams = new StringBuilder();
+		for (String streamName : getStreamNames()) {
+			sourceStreams.append(streamName + ",");
+		}
+		if (sourceStreams.length() > 0) {
+			sourceStreams.deleteCharAt(sourceStreams.length() - 1);
+		}
+		context.put(Constants.SOURCE_STREAMS, sourceStreams.toString());
+		context.put(Constants.POLICY_ID, policyId);
+		return context;
+	}
+
+	public List<String> getOutputStreamAttrNameList() {
+		return siddhiRuntime.outputFields;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
new file mode 100644
index 0000000..1bd5830
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.siddhi;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.policy.PolicyEvaluatorServiceProvider;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+public class SiddhiPolicyEvaluatorServiceProviderImpl<T extends AbstractPolicyDefinitionEntity> implements PolicyEvaluatorServiceProvider<T> {
+	@Override
+	public String getPolicyType() {
+		return Constants.policyType.siddhiCEPEngine.name();
+	}
+
+	@Override
+	public Class getPolicyEvaluator() {
+		return SiddhiPolicyEvaluator.class;
+	}
+
+	@Override
+	public List<Module> getBindingModules() {
+		Module module1 = new SimpleModule(Constants.POLICY_DEFINITION).registerSubtypes(new NamedType(SiddhiPolicyDefinition.class, getPolicyType()));
+		return Arrays.asList(module1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
new file mode 100644
index 0000000..9060920
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.siddhi;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import org.apache.eagle.policy.PolicyEvaluationContext;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Siddhi call back implementation
+ *
+ * @param <T> - The policy definition type
+ * @param <K> - K the alert entity type
+ */
+public class SiddhiQueryCallbackImpl<T extends AbstractPolicyDefinitionEntity, K> extends QueryCallback{
+
+	private SiddhiPolicyEvaluator<T, K> evaluator;
+	public static final Logger LOG = LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class);
+	public static final ObjectMapper mapper = new ObjectMapper();	
+	public Config config;
+	
+	public SiddhiQueryCallbackImpl(Config config, SiddhiPolicyEvaluator<T, K> evaluator) {
+		this.config = config;		
+		this.evaluator = evaluator;
+	}
+	
+	public static List<String> convertToString(List<Object> data) {
+		List<String> rets = new ArrayList<String>();
+		for (Object object : data) {
+			String value = null;
+			if (object instanceof Double) {
+				value = String.valueOf((Double)object);
+			}
+			else if (object instanceof Integer) {
+				value = String.valueOf((Integer)object);
+			}
+			else if (object instanceof Long) {
+				value = String.valueOf((Long)object);
+			}
+			else if (object instanceof String) {
+				value = (String)object;
+			}
+			else if (object instanceof Boolean) {
+				value = String.valueOf((Boolean)object);
+			}
+			rets.add(value);
+		}
+		return rets;
+	}
+
+	public static List<Object> getOutputObject(Object[] data) {
+		List<Object> rets = new ArrayList<Object>();
+		boolean isFirst = true;
+		for (Object object : data) {
+			// The first field is siddhiAlertContext, skip it
+			if (isFirst) {
+				isFirst = false;
+				continue;
+			}
+			rets.add(object);
+		}
+		return rets;
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+		Object[] data = inEvents[0].getData();
+		PolicyEvaluationContext<T, K> siddhiAlertContext = (PolicyEvaluationContext<T, K>)data[0];
+		List<Object> rets = getOutputObject(inEvents[0].getData());
+		K alert = siddhiAlertContext.resultRender.render(config, rets, siddhiAlertContext, timeStamp);
+		SiddhiEvaluationHandler<T, K> handler = siddhiAlertContext.alertExecutor;
+		handler.onEvalEvents(siddhiAlertContext, Arrays.asList(alert));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
new file mode 100644
index 0000000..f42a021
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.siddhi;
+
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.SortedMap;
+
+/**
+ * convert metadata entities for a stream to stream definition for siddhi cep engine
+ * define stream HeapUsage (metric string, host string, value double, timestamp long)
+ */
+public class SiddhiStreamMetadataUtils {
+	private final static Logger LOG = LoggerFactory.getLogger(SiddhiStreamMetadataUtils.class);
+	
+	public final static String EAGLE_ALERT_CONTEXT_FIELD = "eagleAlertContext";
+
+	public static SortedMap<String, AlertStreamSchemaEntity> getAttrMap(String streamName) {
+		SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName);
+		if(map == null || map.size() == 0){
+			throw new IllegalStateException("alert stream schema should never be empty");
+		}
+		return map;
+	}
+
+	/**
+     * @see org.wso2.siddhi.query.api.definition.Attribute.Type
+     * make sure StreamMetadataManager.init is invoked before this method
+	 * @param streamName
+	 * @return
+	 */
+	public static String convertToStreamDef(String streamName){
+		SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName);
+		StringBuilder sb = new StringBuilder();
+		sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object, ");
+		for(Map.Entry<String, AlertStreamSchemaEntity> entry : map.entrySet()){
+            appendAttributeNameType(sb, entry.getKey(), entry.getValue().getAttrType());
+		}
+		if(sb.length() > 0){
+			sb.deleteCharAt(sb.length()-1);
+		}
+		
+		String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");";
+		return String.format(siddhiStreamDefFormat, sb.toString());
+	}
+
+    public static String convertToStreamDef(String streamName, Map<String, String> eventSchema){
+        StringBuilder sb = new StringBuilder();
+        sb.append("context" + " object,");
+        for(Map.Entry<String, String> entry : eventSchema.entrySet()){
+            appendAttributeNameType(sb, entry.getKey(), entry.getValue());
+        }
+        if(sb.length() > 0){
+            sb.deleteCharAt(sb.length()-1);
+        }
+
+        String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");";
+        return String.format(siddhiStreamDefFormat, sb.toString());
+    }
+
+    private static void appendAttributeNameType(StringBuilder sb, String attrName, String attrType){
+        sb.append(attrName);
+        sb.append(" ");
+        if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
+            sb.append("string");
+        }else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
+            sb.append("int");
+        }else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
+            sb.append("long");
+        }else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
+            sb.append("bool");
+        }else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){
+            sb.append("float");
+        }else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){
+            sb.append("double");
+        }else{
+            LOG.warn("AttrType is not recognized, ignore : " + attrType);
+        }
+        sb.append(",");
+    }
+
+	public static Object getAttrDefaultValue(String streamName, String attrName){
+		SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName);
+		AlertStreamSchemaEntity entity = map.get(attrName);
+		if (entity.getDefaultValue() != null) {
+			return entity.getDefaultValue();
+		}
+		else {
+			String attrType = entity.getAttrType();
+			if (attrType.equalsIgnoreCase(AttributeType.STRING.name())) {
+				return "NA";
+			} else if (attrType.equalsIgnoreCase(AttributeType.INTEGER.name()) || attrType.equalsIgnoreCase(AttributeType.LONG.name())) {
+				return -1;
+			} else if (attrType.equalsIgnoreCase(AttributeType.BOOL.name())) {
+				return true;
+			} else {
+				LOG.warn("AttrType is not recognized: " + attrType + ", treat it as string");
+				return "N/A";
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java
new file mode 100644
index 0000000..47b7ef7
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.siddhi;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import com.typesafe.config.Config;
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.commons.collections.map.UnmodifiableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * centralized memory where all stream metadata sit on, it is not mutable data
+ */
+public class StreamMetadataManager {
+	private static final Logger LOG = LoggerFactory.getLogger(StreamMetadataManager.class);
+	
+	private static StreamMetadataManager instance = new StreamMetadataManager();
+	private Map<String, List<AlertStreamSchemaEntity>> map = new HashMap<String, List<AlertStreamSchemaEntity>>();
+	private Map<String, SortedMap<String, AlertStreamSchemaEntity>> map2 = new HashMap<String, SortedMap<String, AlertStreamSchemaEntity>>();
+	private volatile boolean initialized = false;
+	
+	private StreamMetadataManager(){
+	}
+	
+	public static StreamMetadataManager getInstance(){
+		return instance;
+	}
+	
+	private void internalInit(Config config, AlertStreamSchemaDAO dao){
+		try{
+			String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
+			List<AlertStreamSchemaEntity> list = dao.findAlertStreamSchemaByDataSource(dataSource);
+			if(list == null)
+				return;
+			for (AlertStreamSchemaEntity entity : list) {
+				String streamName = entity.getTags().get(Constants.STREAM_NAME);
+				if (map.get(streamName) == null) {
+					map.put(streamName, new ArrayList<AlertStreamSchemaEntity>());
+					map2.put(streamName, new TreeMap<String, AlertStreamSchemaEntity>());
+				}
+				map.get(streamName).add(entity);
+				map2.get(streamName).put(entity.getTags().get(Constants.ATTR_NAME), entity);
+			}
+		}catch(Exception ex){
+			LOG.error("Fail building metadata manger", ex);
+			throw new IllegalStateException(ex);
+		}
+	}
+	
+	/**
+	 * singleton with init would be good for unit test as well, and it ensures that
+	 * initialization happens only once before you use it.  
+	 * @param config
+	 * @param dao
+	 */
+	public void init(Config config, AlertStreamSchemaDAO dao){
+		if(!initialized){
+			synchronized(this){
+				if(!initialized){
+                    if(LOG.isDebugEnabled()) LOG.debug("Initializing ...");
+					internalInit(config, dao);
+					initialized = true;
+                    LOG.info("Successfully initialized");
+				}
+			}
+		}else{
+            LOG.info("Already initialized, skip");
+        }
+	}
+
+	// Only for unit test purpose
+	public void reset() {
+		synchronized (this) {
+			initialized = false;
+			map.clear();
+			map2.clear();
+		}
+	}
+
+	private void ensureInitialized(){
+		if(!initialized)
+			throw new IllegalStateException("StreamMetadataManager should be initialized before using it");
+	}
+	
+	public List<AlertStreamSchemaEntity> getMetadataEntitiesForStream(String streamName){
+		ensureInitialized();
+		return getMetadataEntitiesForAllStreams().get(streamName);
+	}
+	
+	public Map<String, List<AlertStreamSchemaEntity>> getMetadataEntitiesForAllStreams(){
+		ensureInitialized();
+		return UnmodifiableMap.decorate(map);
+	}
+	
+	public SortedMap<String, AlertStreamSchemaEntity> getMetadataEntityMapForStream(String streamName){
+		ensureInitialized();
+		return getMetadataEntityMapForAllStreams().get(streamName);
+	}
+	
+	public Map<String, SortedMap<String, AlertStreamSchemaEntity>> getMetadataEntityMapForAllStreams(){
+		ensureInitialized();
+		return UnmodifiableMap.decorate(map2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ContainsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ContainsIgnoreCaseExtension.java
new file mode 100644
index 0000000..6d3a0e4
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ContainsIgnoreCaseExtension.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.policy.siddhi.extension;
+
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+public class ContainsIgnoreCaseExtension extends FunctionExecutor {
+
+    Attribute.Type returnType = Attribute.Type.BOOL;
+
+    @Override
+    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+        if (attributeExpressionExecutors.length != 2) {
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, " +
+                    "but found " + attributeExpressionExecutors.length);
+        }
+        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+        }
+        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+        }
+    }
+
+    @Override
+    protected Object execute(Object[] data) {
+        if (data[0] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. First argument cannot be null");
+        }
+        if (data[1] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null");
+        }
+        String str1 = (String)data[0];
+        String str2 = (String)data[1];
+        return str1.toUpperCase().contains(str2.toUpperCase());
+    }
+
+    @Override
+    protected Object execute(Object data) {
+        return null; //Since the containsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
+    }
+
+    @Override
+    public void start() {
+        //Nothing to start
+    }
+
+    @Override
+    public void stop() {
+        //Nothing to stop
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return returnType;
+    }
+
+    @Override
+    public Object[] currentState() {
+        return new Object[]{};
+    }
+
+    @Override
+    public void restoreState(Object[] state) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/EqualsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/EqualsIgnoreCaseExtension.java
new file mode 100644
index 0000000..3a622e2
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/EqualsIgnoreCaseExtension.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.policy.siddhi.extension;
+
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
+import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class EqualsIgnoreCaseExtension extends FunctionExecutor {
+
+    Attribute.Type returnType = Attribute.Type.BOOL;
+
+    @Override
+    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+        if (attributeExpressionExecutors.length != 2) {
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, " +
+                    "but found " + attributeExpressionExecutors.length);
+        }
+        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+        }
+        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+        }
+    }
+
+    @Override
+    protected Object execute(Object[] data) {
+        if (data[0] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. First argument cannot be null");
+        }
+        if (data[1] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null");
+        }
+        String str1 = (String)data[0];
+        String str2 = (String)data[1];
+        return str1.equalsIgnoreCase(str2);
+    }
+
+    @Override
+    protected Object execute(Object data) {
+        return null; //Since the equalsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
+    }
+
+    @Override
+    public void start() {
+        //Nothing to start
+    }
+
+    @Override
+    public void stop() {
+        //Nothing to stop
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return returnType;
+    }
+
+    @Override
+    public Object[] currentState() {
+        return new Object[]{};
+    }
+
+    @Override
+    public void restoreState(Object[] state) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java
new file mode 100644
index 0000000..c70d1a1
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.siddhi.extension;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.event.ComplexEvent;
+import org.wso2.siddhi.core.event.ComplexEventChunk;
+import org.wso2.siddhi.core.event.MetaComplexEvent;
+import org.wso2.siddhi.core.event.stream.StreamEvent;
+import org.wso2.siddhi.core.event.stream.StreamEventCloner;
+import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
+import org.wso2.siddhi.core.query.processor.Processor;
+import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
+import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
+import org.wso2.siddhi.core.table.EventTable;
+import org.wso2.siddhi.core.util.collection.operator.Finder;
+import org.wso2.siddhi.core.util.parser.CollectionOperatorParser;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+import org.wso2.siddhi.query.api.expression.Expression;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @since Dec 23, 2015
+ *
+ */
+
+public class ExternalTimeBatchWindowProcessor extends WindowProcessor implements FindableProcessor {
+    
+    private long timeToKeep;
+
+    private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<StreamEvent>();
+    private ComplexEventChunk<StreamEvent> expiredEventChunk = new ComplexEventChunk<StreamEvent>();
+    
+    static final Logger log = LoggerFactory.getLogger(ExternalTimeBatchWindowProcessor.class);
+    private VariableExpressionExecutor timeStampVariableExpressionExecutor;
+
+    private long lastSendTime = -1;
+
+    @Override
+    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+        this.expiredEventChunk = new ComplexEventChunk<StreamEvent>();
+        if (attributeExpressionExecutors.length == 2) {
+            if (attributeExpressionExecutors[1].getReturnType() == Attribute.Type.INT) {
+                timeToKeep = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue()));
+            } else {
+                timeToKeep = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue()));
+            }
+            if (!(attributeExpressionExecutors[0] instanceof VariableExpressionExecutor)) {
+                throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timeStamp should be a type long stream attribute but found " + attributeExpressionExecutors[0].getClass());
+            }
+            timeStampVariableExpressionExecutor = ((VariableExpressionExecutor) attributeExpressionExecutors[0]);
+            if (timeStampVariableExpressionExecutor.getReturnType() != Attribute.Type.LONG) {
+                throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timeStamp should be type long, but found " + timeStampVariableExpressionExecutor.getReturnType());
+            }
+        } else {
+            throw new ExecutionPlanValidationException("ExternalTime window should only have two parameter (<long> timeStamp, <int|long|time> windowTime), but found " + attributeExpressionExecutors.length + " input attributes");
+        }
+    }
+    
+    /**
+     * Here an assumption is taken: 
+     * Parameter: timestamp: The time which the window determines as current time and will act upon, 
+     *              the value of this parameter should be monotonically increasing. 
+     * from https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows-externalTime
+     * 
+     */
+    @Override
+    protected synchronized void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
+        // event incoming trigger process. No events means no action
+        if (!streamEventChunk.hasNext()) {
+            return;
+        }
+
+        // for window beginning, if window is empty, set lastSendTime to incomingChunk first.
+        if (currentEventChunk.getFirst() == null && lastSendTime < 0) {
+            lastSendTime = (Long) streamEventChunk.getFirst().getAttribute(timeStampVariableExpressionExecutor.getPosition());
+        }
+
+        while(streamEventChunk.hasNext()) {
+            StreamEvent currStreamEvent = streamEventChunk.next();
+            if (currStreamEvent.getType() != ComplexEvent.Type.CURRENT) {
+                continue;
+            }
+            
+            long currentTime = (Long) currStreamEvent.getAttribute(timeStampVariableExpressionExecutor.getPosition());
+            if (currentTime < lastSendTime + timeToKeep) {
+                cloneAppend(streamEventCloner, currStreamEvent);
+            } else if (currentTime >= lastSendTime + timeToKeep) {
+                flushCurentChunk(nextProcessor, streamEventCloner, currentTime);
+                cloneAppend(streamEventCloner, currStreamEvent);
+            }
+        }
+    }
+
+    private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent currStreamEvent) {
+        StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(currStreamEvent);
+        currentEventChunk.add(clonedStreamEvent);
+    }
+
+    private void flushCurentChunk(Processor nextProcessor, StreamEventCloner streamEventCloner, long currentTime) {
+        // need flush the currentEventChunk
+        currentEventChunk.reset();
+        ComplexEventChunk<StreamEvent> newEventChunk = new ComplexEventChunk<StreamEvent>();
+
+        // mark the timestamp for the expiredType event
+        while (expiredEventChunk.hasNext()) {
+            StreamEvent expiredEvent = expiredEventChunk.next();
+            expiredEvent.setTimestamp(currentTime);
+        }
+        // add expired event to newEventChunk too.
+        if (expiredEventChunk.getFirst() != null) {
+            newEventChunk.add(expiredEventChunk.getFirst());
+        }
+        
+        // make current event chunk as expired in expiredChunk
+        expiredEventChunk.clear();
+        while (currentEventChunk.hasNext()) {
+            StreamEvent currentEvent = currentEventChunk.next();
+            StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(currentEvent);
+            toExpireEvent.setType(StreamEvent.Type.EXPIRED);
+            expiredEventChunk.add(toExpireEvent);
+        }
+
+        // add current event chunk to next processor
+        if (currentEventChunk.getFirst() != null) {
+            newEventChunk.add(currentEventChunk.getFirst());
+        }
+        currentEventChunk.clear();
+
+        // update timestamp, call next processor
+        lastSendTime = currentTime;
+        if (newEventChunk.getFirst() != null) {
+            nextProcessor.process(newEventChunk);
+        }
+    }
+
+    public void start() {
+        //Do nothing
+    }
+
+    public void stop() {
+        //Do nothing
+    }
+
+    public Object[] currentState() {
+        return new Object[]{currentEventChunk, expiredEventChunk};
+    }
+
+    @SuppressWarnings("unchecked")
+    public void restoreState(Object[] state) {
+        currentEventChunk = (ComplexEventChunk<StreamEvent>) state[0];
+        expiredEventChunk = (ComplexEventChunk<StreamEvent>) state[1];
+    }
+
+    public synchronized StreamEvent find(ComplexEvent matchingEvent, Finder finder) {
+        return finder.find(matchingEvent, expiredEventChunk, streamEventCloner);
+    }
+
+    public Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) {
+        return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
new file mode 100644
index 0000000..14e98cc
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.policy.siddhi.extension;
+
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
+import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.extension.string.RegexpFunctionExtension;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * regexpIgnoreCase(string, regex)
+ * Tells whether or not this 'string' matches the given regular expression 'regex'.
+ * Accept Type(s): (STRING,STRING)
+ * Return Type(s): BOOLEAN
+ */
+public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension {
+
+    //state-variables
+    boolean isRegexConstant = false;
+    String regexConstant;
+    Pattern patternConstant;
+
+    @Override
+    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+        if (attributeExpressionExecutors.length != 2) {
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, " +
+                    "but found " + attributeExpressionExecutors.length);
+        }
+        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+        }
+        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+        }
+        if(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor){
+            isRegexConstant = true;
+            regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue();
+            patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE);
+        }
+    }
+
+    @Override
+    protected Object execute(Object[] data) {
+        String regex;
+        Pattern pattern;
+        Matcher matcher;
+
+        if (data[0] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. First argument cannot be null");
+        }
+        if (data[1] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. Second argument cannot be null");
+        }
+        String source = (String) data[0];
+
+        if(!isRegexConstant){
+            regex = (String) data[1];
+            pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
+            matcher = pattern.matcher(source);
+            return matcher.matches();
+
+        } else {
+            matcher = patternConstant.matcher(source);
+            return matcher.matches();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
new file mode 100644
index 0000000..eac2bfd
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
new file mode 100644
index 0000000..1b1183c
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
@@ -0,0 +1 @@
+externalTimeBatch=org.apache.eagle.policy.siddhi.extension.ExternalTimeBatchWindowProcessor
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/str.siddhiext b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/str.siddhiext
new file mode 100644
index 0000000..963f311
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/str.siddhiext
@@ -0,0 +1,35 @@
+#
+# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+charAt=org.wso2.siddhi.extension.string.CharAtFunctionExtension
+coalesce=org.wso2.siddhi.extension.string.CoalesceFunctionExtension
+concat=org.wso2.siddhi.extension.string.ConcatFunctionExtension
+length=org.wso2.siddhi.extension.string.LengthFunctionExtension
+lower=org.wso2.siddhi.extension.string.LowerFunctionExtension
+regexp=org.wso2.siddhi.extension.string.RegexpFunctionExtension
+repeat=org.wso2.siddhi.extension.string.RepeatFunctionExtension
+replaceAll=org.wso2.siddhi.extension.string.ReplaceAllFunctionExtension
+replaceFirst=org.wso2.siddhi.extension.string.ReplaceFirstFunctionExtension
+reverse=org.wso2.siddhi.extension.string.ReverseFunctionExtension
+strcmp=org.wso2.siddhi.extension.string.StrcmpFunctionExtension
+substr=org.wso2.siddhi.extension.string.SubstrFunctionExtension
+trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
+upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
+hex=org.wso2.siddhi.extension.string.HexFunctionExtension
+unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
+equalsIgnoreCase=org.apache.eagle.policy.siddhi.extension.EqualsIgnoreCaseExtension
+containsIgnoreCase=org.apache.eagle.policy.siddhi.extension.ContainsIgnoreCaseExtension
+regexpIgnoreCase=org.apache.eagle.policy.siddhi.extension.RegexpIgnoreCaseFunctionExtension

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java b/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java
new file mode 100644
index 0000000..fd40f79
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Created on 12/31/15.
+ */
+public class TestSchemaDao {
+
+    @Ignore
+    @Test
+    public void test() throws Exception {
+        AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl("localhost", 9099, "admin", "secret");
+        List<AlertStreamSchemaEntity> entities = dao.findAlertStreamSchemaByDataSource("hdfsAuditLog");
+        System.out.print(entities);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/pom.xml b/eagle-core/eagle-policy/pom.xml
new file mode 100644
index 0000000..ab104a6
--- /dev/null
+++ b/eagle-core/eagle-policy/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>eagle</groupId>
+        <artifactId>eagle-core</artifactId>
+        <version>0.3.0</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>eagle-policy-parent</artifactId>
+    <packaging>pom</packaging>
+
+    <name>eagle-policy-parent</name>
+    <modules>
+        <module>eagle-policy-base</module>
+    </modules>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
index a6421a4..049964b 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/repo/EntityRepositoryScanner.java
@@ -36,7 +36,7 @@ public final class EntityRepositoryScanner {
 	public static void scan() throws InstantiationException, IllegalAccessException {
 		// TODO currently extcos 0.3b doesn't support to search packages like "com.*.eagle.*", "org.*.eagle.*". However 0.4b depends on asm-all version 4.0, which is 
 		// conflicted with jersey server 1.8. We should fix it later
-		LOG.info("Scanning all entity repositories with pattern \"eagle.*\"");
+		LOG.info("Scanning all entity repositories with pattern \"org.apache.eagle.*\"");
 		final ComponentScanner scanner = new ComponentScanner();
 		final Set<Class<?>> classes = scanner.getClasses(new EntityRepoScanQuery() );
 		for (Class<?> entityClass : classes) {



Mime
View raw message