eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [2/2] incubator-eagle git commit: [EAGLE-647] Support Policy Execution Interpreter and Planner to compile siddhi query to distributed execution plan
Date Thu, 20 Oct 2016 08:08:50 GMT
[EAGLE-647] Support Policy Execution Interpreter and Planner to compile siddhi query to distributed execution plan

Support Policy Execution Interpreter and Planner to compile siddhi query to distributed execution plan

* Support parse siddhi pattern and join query as distributed execution
* Support alias in inner join condition
* Refactor PolicyIntepreter to eagle-alert-engine and decoupel PolicyExecutionPlanner
* Fix factory method for PolicyExecutionPlanner

Author: Hao Chen <hao@apache.org>

Closes #536 from haoch/SupportPatternAndJoin.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/64fce8f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/64fce8f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/64fce8f8

Branch: refs/heads/master
Commit: 64fce8f80745d6180ff04b6950364a802d73723e
Parents: 8991b61
Author: Hao Chen <hao@apache.org>
Authored: Thu Oct 20 16:08:33 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Thu Oct 20 16:08:33 2016 +0800

----------------------------------------------------------------------
 .../engine/interpreter/PolicyExecutionPlan.java | 100 +++++
 .../interpreter/PolicyExecutionPlanner.java     |  28 ++
 .../interpreter/PolicyExecutionPlannerImpl.java | 339 +++++++++++++++++
 .../engine/interpreter/PolicyInterpreter.java   | 112 ++++++
 .../engine/interpreter/PolicyParseResult.java   |  65 ++++
 .../interpreter/PolicyValidationResult.java     |  76 ++++
 .../interpreter/PolicyInterpreterTest.java      | 381 +++++++++++++++++++
 .../metadata/resource/MetadataResource.java     |   9 +-
 .../metadata/resource/PolicyExecutionPlan.java  | 100 -----
 .../metadata/resource/PolicyInterpreter.java    | 244 ------------
 .../metadata/resource/PolicyParseResult.java    |  65 ----
 .../resource/PolicyValidationResult.java        |  76 ----
 .../resource/PolicyInterpreterTest.java         | 226 -----------
 13 files changed, 1109 insertions(+), 712 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
new file mode 100644
index 0000000..7ecc36f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.wso2.siddhi.query.api.ExecutionPlan;
+
+import java.util.List;
+import java.util.Map;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyExecutionPlan {
+    /**
+     * Actual input streams.
+     */
+    private Map<String, List<StreamColumn>> inputStreams;
+
+    /**
+     * Actual output streams.
+     */
+    private Map<String, List<StreamColumn>> outputStreams;
+
+    /**
+     * Execution plan source.
+     */
+    private String executionPlanSource;
+
+    /**
+     * Execution plan.
+     */
+    private ExecutionPlan internalExecutionPlan;
+
+    private String executionPlanDesc;
+
+    private List<StreamPartition> streamPartitions;
+
+    public String getExecutionPlanSource() {
+        return executionPlanSource;
+    }
+
+    public void setExecutionPlanSource(String executionPlanSource) {
+        this.executionPlanSource = executionPlanSource;
+    }
+
+    public ExecutionPlan getInternalExecutionPlan() {
+        return internalExecutionPlan;
+    }
+
+    public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) {
+        this.internalExecutionPlan = internalExecutionPlan;
+    }
+
+    public String getExecutionPlanDesc() {
+        return executionPlanDesc;
+    }
+
+    public void setExecutionPlanDesc(String executionPlanDesc) {
+        this.executionPlanDesc = executionPlanDesc;
+    }
+
+    public List<StreamPartition> getStreamPartitions() {
+        return streamPartitions;
+    }
+
+    public void setStreamPartitions(List<StreamPartition> streamPartitions) {
+        this.streamPartitions = streamPartitions;
+    }
+
+    public Map<String, List<StreamColumn>> getInputStreams() {
+        return inputStreams;
+    }
+
+    public void setInputStreams(Map<String, List<StreamColumn>> inputStreams) {
+        this.inputStreams = inputStreams;
+    }
+
+    public Map<String, List<StreamColumn>> getOutputStreams() {
+        return outputStreams;
+    }
+
+    public void setOutputStreams(Map<String, List<StreamColumn>> outputStreams) {
+        this.outputStreams = outputStreams;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
new file mode 100644
index 0000000..9e8f9f1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+interface PolicyExecutionPlanner {
+    /**
+     * @return PolicyExecutionPlan.
+     */
+    PolicyExecutionPlan getExecutionPlan();
+
+    static PolicyExecutionPlan parseExecutionPlan(String executionPlan) throws Exception {
+        return new PolicyExecutionPlannerImpl(executionPlan).getExecutionPlan();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
new file mode 100644
index 0000000..82bb64f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.ListUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.exception.DefinitionNotExistException;
+import org.wso2.siddhi.query.api.ExecutionPlan;
+import org.wso2.siddhi.query.api.exception.DuplicateDefinitionException;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+import org.wso2.siddhi.query.api.execution.ExecutionElement;
+import org.wso2.siddhi.query.api.execution.query.Query;
+import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
+import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
+import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.JoinInputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
+import org.wso2.siddhi.query.api.execution.query.input.stream.StateInputStream;
+import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
+import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
+import org.wso2.siddhi.query.api.execution.query.selection.Selector;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.condition.Compare;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
+import org.wso2.siddhi.query.compiler.SiddhiCompiler;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PolicyExecutionPlannerImpl.class);
+
+    /**
+     * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
+     */
+    private static final String WINDOW_EXTERNAL_TIME = "externalTime";
+
+    private final String executionPlan;
+    private final Map<String,List<StreamColumn>> effectiveInputStreams;
+    private final Map<String,List<StreamColumn>> effectiveOutputStreams;
+    private final Map<String,StreamPartition> effectivePartitions;
+    private final PolicyExecutionPlan policyExecutionPlan;
+
+    public PolicyExecutionPlannerImpl(String executionPlan) throws Exception {
+        this.executionPlan = executionPlan;
+        this.effectiveInputStreams = new HashMap<>();
+        this.effectiveOutputStreams = new HashMap<>();
+        this.effectivePartitions = new HashMap<>();
+        this.policyExecutionPlan = doParse();
+    }
+
+    @Override
+    public PolicyExecutionPlan getExecutionPlan() {
+        return policyExecutionPlan;
+    }
+
+    private PolicyExecutionPlan doParse()  throws Exception {
+        PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
+        try {
+            ExecutionPlan executionPlan = SiddhiCompiler.parse(this.executionPlan);
+
+            policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
+
+            // Set current execution plan as valid
+            policyExecutionPlan.setExecutionPlanSource(this.executionPlan);
+            policyExecutionPlan.setInternalExecutionPlan(executionPlan);
+
+
+            // Go through execution element
+            for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
+                // -------------
+                // Explain Query
+                // -------------
+                if (executionElement instanceof Query) {
+                    // -----------------------
+                    // Query Level Variables
+                    // -----------------------
+                    InputStream inputStream = ((Query) executionElement).getInputStream();
+                    Selector selector = ((Query) executionElement).getSelector();
+                    Map<String, SingleInputStream> queryLevelAliasToStreamMapping = new HashMap<>();
+
+                    // Inputs stream definitions
+                    for (String streamId : inputStream.getUniqueStreamIds()) {
+                        if (!effectiveInputStreams.containsKey(streamId)) {
+                            org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
+                            if (streamDefinition != null) {
+                                effectiveInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
+                            } else {
+                                effectiveInputStreams.put(streamId, null);
+                            }
+                        }
+                    }
+
+                    // Window Spec and Partition
+                    if (inputStream instanceof SingleInputStream) {
+                        retrieveAliasForQuery((SingleInputStream) inputStream, queryLevelAliasToStreamMapping);
+                        retrievePartition(findStreamPartition((SingleInputStream) inputStream, selector));
+                    } else {
+                        if (inputStream instanceof JoinInputStream) {
+                            // Only Support JOIN/INNER_JOIN Now
+                            if (((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.INNER_JOIN) || ((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.JOIN)) {
+                                SingleInputStream leftInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getLeftInputStream();
+                                SingleInputStream rightInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getRightInputStream();
+
+                                retrievePartition(findStreamPartition(leftInputStream, selector));
+                                retrievePartition(findStreamPartition(rightInputStream, selector));
+                                retrieveAliasForQuery(leftInputStream, queryLevelAliasToStreamMapping);
+                                retrieveAliasForQuery(rightInputStream, queryLevelAliasToStreamMapping);
+
+                            } else {
+                                throw new ExecutionPlanValidationException("Not support " + ((JoinInputStream) inputStream).getType() + " yet, currently support: INNER JOIN");
+                            }
+
+                            Expression joinCondition = ((JoinInputStream) inputStream).getOnCompare();
+
+                            if (joinCondition != null) {
+                                if (joinCondition instanceof Compare) {
+                                    if (((Compare) joinCondition).getOperator().equals(Compare.Operator.EQUAL)) {
+                                        Variable leftExpression = (Variable) ((Compare) joinCondition).getLeftExpression();
+                                        Preconditions.checkNotNull(leftExpression.getStreamId());
+                                        Preconditions.checkNotNull(leftExpression.getAttributeName());
+
+                                        StreamPartition leftPartition = new StreamPartition();
+                                        leftPartition.setType(StreamPartition.Type.GROUPBY);
+                                        leftPartition.setColumns(Collections.singletonList(leftExpression.getAttributeName()));
+                                        leftPartition.setStreamId(retrieveStreamId(leftExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));
+                                        retrievePartition(leftPartition);
+
+                                        Variable rightExpression = (Variable) ((Compare) joinCondition).getRightExpression();
+                                        Preconditions.checkNotNull(rightExpression.getStreamId());
+                                        Preconditions.checkNotNull(rightExpression.getAttributeName());
+                                        StreamPartition rightPartition = new StreamPartition();
+                                        rightPartition.setType(StreamPartition.Type.GROUPBY);
+                                        rightPartition.setColumns(Collections.singletonList(rightExpression.getAttributeName()));
+                                        rightPartition.setStreamId(retrieveStreamId(rightExpression, effectiveInputStreams,queryLevelAliasToStreamMapping));
+                                        retrievePartition(leftPartition);
+                                    } else {
+                                        throw new ExecutionPlanValidationException("Only support \"EQUAL\" condition in INNER JOIN" + joinCondition);
+                                    }
+                                } else {
+                                    throw new ExecutionPlanValidationException("Only support \"Compare\" on INNER JOIN condition in INNER JOIN: " + joinCondition);
+                                }
+                            }
+                        } else if (inputStream instanceof StateInputStream) {
+                            // Group By Spec
+                            List<Variable> groupBy = selector.getGroupByList();
+                            if (groupBy.size() >= 0) {
+                                Map<String, List<Variable>> streamGroupBy = new HashMap<>();
+                                for (String streamId : inputStream.getUniqueStreamIds()) {
+                                    streamGroupBy.put(streamId, new ArrayList<>());
+                                }
+                                for (Variable variable : groupBy) {
+                                    // Not stream not set, then should be all streams' same field
+                                    if (variable.getStreamId() == null) {
+                                        for (String streamId : inputStream.getUniqueStreamIds()) {
+                                            streamGroupBy.get(streamId).add(variable);
+                                        }
+                                    } else {
+                                        String streamId = retrieveStreamId(variable, effectiveInputStreams,queryLevelAliasToStreamMapping);
+                                        if (streamGroupBy.containsKey(streamId)) {
+                                            streamGroupBy.get(streamId).add(variable);
+                                        } else {
+                                            throw new DefinitionNotExistException(streamId);
+                                        }
+                                    }
+                                }
+                                for (Map.Entry<String, List<Variable>> entry : streamGroupBy.entrySet()) {
+                                    if (entry.getValue().size() > 0) {
+                                        retrievePartition(generatePartition(entry.getKey(), null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()]))));
+                                    }
+                                }
+                            }
+                        }
+                    }
+
+                    // Output streams
+                    OutputStream outputStream = ((Query) executionElement).getOutputStream();
+                    effectiveOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
+                } else {
+                    LOG.warn("Unhandled execution element: {}", executionElement.toString());
+                }
+            }
+            // Set effective input streams
+            policyExecutionPlan.setInputStreams(effectiveInputStreams);
+
+            // Set effective output streams
+            policyExecutionPlan.setOutputStreams(effectiveOutputStreams);
+
+            // Set Partitions
+            for (String streamId : effectiveInputStreams.keySet()) {
+                // Use shuffle partition by default
+                if (!effectivePartitions.containsKey(streamId)) {
+                    StreamPartition shufflePartition = new StreamPartition();
+                    shufflePartition.setStreamId(streamId);
+                    shufflePartition.setType(StreamPartition.Type.SHUFFLE);
+                    effectivePartitions.put(streamId, shufflePartition);
+                }
+            }
+            policyExecutionPlan.setStreamPartitions(new ArrayList<>(effectivePartitions.values()));
+        } catch (Exception ex) {
+            LOG.error("Got error to parse policy execution plan: \n{}", this.executionPlan, ex);
+            throw ex;
+        }
+        return policyExecutionPlan;
+    }
+
+    private String retrieveStreamId(Variable variable, Map<String, List<StreamColumn>> streamMap, Map<String, SingleInputStream> aliasMap) {
+        Preconditions.checkNotNull(variable.getStreamId(), "streamId");
+        if (streamMap.containsKey(variable.getStreamId()) && aliasMap.containsKey(variable.getStreamId())) {
+            throw new DuplicateDefinitionException("Duplicated streamId and alias: " + variable.getStreamId());
+        } else if (streamMap.containsKey(variable.getStreamId())) {
+            return variable.getStreamId();
+        } else if (aliasMap.containsKey(variable.getStreamId())) {
+            return aliasMap.get(variable.getStreamId()).getStreamId();
+        } else {
+            throw new DefinitionNotExistException(variable.getStreamId());
+        }
+    }
+
+    private StreamPartition findStreamPartition(SingleInputStream inputStream, Selector selector) {
+        // Window Spec
+        List<Window> windows = new ArrayList<>();
+        for (StreamHandler streamHandler : inputStream.getStreamHandlers()) {
+            if (streamHandler instanceof Window) {
+                windows.add((Window) streamHandler);
+            }
+        }
+
+        // Group By Spec
+        List<Variable> groupBy = selector.getGroupByList();
+        if (windows.size() > 0 || groupBy.size() >= 0) {
+            return generatePartition(inputStream.getStreamId(), windows, groupBy);
+        } else {
+            return null;
+        }
+    }
+
+    private void retrievePartition(StreamPartition partition) {
+        if (partition == null) {
+            return;
+        }
+
+        if (!effectivePartitions.containsKey(partition.getStreamId())) {
+            effectivePartitions.put(partition.getStreamId(), partition);
+        } else if (!effectivePartitions.get(partition.getStreamId()).equals(partition)) {
+            StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());
+            // If same Type & Columns but different sort spec, then use larger
+            if (existingPartition.getType().equals(partition.getType())
+                && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns())
+                && partition.getSortSpec().getWindowPeriodMillis() > existingPartition.getSortSpec().getWindowPeriodMillis()
+                || existingPartition.getType().equals(StreamPartition.Type.SHUFFLE)) {
+                effectivePartitions.put(partition.getStreamId(), partition);
+            } else {
+                // Throw exception as it unable to conflict effectivePartitions on same stream will not be able to run in distributed mode
+                throw new ExecutionPlanValidationException("You have incompatible partitions on stream " + partition.getStreamId()
+                    + ": [1] " + effectivePartitions.get(partition.getStreamId()).toString() + " [2] " + partition.toString() + "");
+            }
+        }
+    }
+
+    private void retrieveAliasForQuery(SingleInputStream inputStream, Map<String, SingleInputStream> aliasStreamMapping) {
+        if (inputStream.getStreamReferenceId() != null) {
+            if (aliasStreamMapping.containsKey(inputStream.getStreamReferenceId())) {
+                throw new ExecutionPlanValidationException("Duplicated stream alias " + inputStream.getStreamId() + " -> " + inputStream);
+            } else {
+                aliasStreamMapping.put(inputStream.getStreamReferenceId(), inputStream);
+            }
+        }
+    }
+
+    private StreamPartition generatePartition(String streamId, List<Window> windows, List<Variable> groupBy) {
+        StreamPartition partition = new StreamPartition();
+        partition.setStreamId(streamId);
+        StreamSortSpec sortSpec = null;
+        if (windows != null && windows.size() > 0) {
+            for (Window window : windows) {
+                if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) {
+                    sortSpec = new StreamSortSpec();
+                    sortSpec.setWindowPeriodMillis(getExternalTimeWindowSize(window));
+                    sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 5);
+                }
+            }
+        }
+        partition.setSortSpec(sortSpec);
+        if (groupBy != null && groupBy.size() > 0) {
+            partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
+            partition.setType(StreamPartition.Type.GROUPBY);
+        } else {
+            partition.setType(StreamPartition.Type.SHUFFLE);
+        }
+        return partition;
+    }
+
+    private static int getExternalTimeWindowSize(Window window) {
+        Expression windowSize = window.getParameters()[1];
+        if (windowSize instanceof TimeConstant) {
+            return ((TimeConstant) windowSize).getValue().intValue();
+        } else if (windowSize instanceof IntConstant) {
+            return ((IntConstant) windowSize).getValue();
+        } else if (windowSize instanceof LongConstant) {
+            return ((LongConstant) windowSize).getValue().intValue();
+        } else {
+            throw new UnsupportedOperationException("Illegal type of window size expression:" + windowSize.toString());
+        }
+    }
+
+    private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
+        return outputAttributeList.stream().map(outputAttribute -> {
+            StreamColumn streamColumn = new StreamColumn();
+            streamColumn.setName(outputAttribute.getRename());
+            streamColumn.setDescription(outputAttribute.getExpression().toString());
+            return streamColumn;
+        }).collect(Collectors.toList());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
new file mode 100644
index 0000000..8aec294
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * PolicyInterpreter Helper Methods:
+ * <ul>
+ * <li>Parse: parse siddhi query and generate static execution plan</li>
+ * <li>Validate: validate policy definition with execution plan and metadata</li>
+ * </ul>
+ *
+ * @see PolicyExecutionPlanner
+ * @see <a href="https://docs.wso2.com/display/CEP300/WSO2+Complex+Event+Processor+Documentation">WSO2 Complex Event Processor Documentation</a>
+ */
+public class PolicyInterpreter {
+    private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class);
+
+    /**
+     * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
+     */
+    private static final String WINDOW_EXTERNAL_TIME = "externalTime";
+
+    public static PolicyParseResult parse(String executionPlanQuery) {
+        PolicyParseResult policyParseResult = new PolicyParseResult();
+        try {
+            policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
+            policyParseResult.setSuccess(true);
+            policyParseResult.setMessage("Parsed successfully");
+        } catch (Exception exception) {
+            LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
+            policyParseResult.setSuccess(false);
+            policyParseResult.setMessage(exception.getMessage());
+            policyParseResult.setStackTrace(exception);
+        }
+        return policyParseResult;
+    }
+
+    /**
+     * Quick parseExecutionPlan policy.
+     */
+    public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
+        // Validate inputStreams are valid
+        Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
+        return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
+    }
+
+    public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
+        return PolicyExecutionPlanner.parseExecutionPlan(executionPlanQuery);
+    }
+
+    public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
+        Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
+        PolicyValidationResult policyValidationResult = new PolicyValidationResult();
+        policyValidationResult.setPolicyDefinition(policy);
+        try {
+            if (policy.getInputStreams() != null) {
+                for (String streamId : policy.getInputStreams()) {
+                    if (allDefinitions.containsKey(streamId)) {
+                        inputDefinitions.put(streamId, allDefinitions.get(streamId));
+                    } else {
+                        throw new StreamNotDefinedException(streamId);
+                    }
+                }
+            }
+
+            PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
+            // Validate output
+            if (policy.getOutputStreams() != null) {
+                for (String outputStream : policy.getOutputStreams()) {
+                    if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
+                        throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
+                    }
+                }
+            }
+            policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
+            policyValidationResult.setSuccess(true);
+            policyValidationResult.setMessage("Validated successfully");
+        } catch (Exception exception) {
+            LOG.error("Got error to validate policy definition: {}", policy, exception);
+            policyValidationResult.setSuccess(false);
+            policyValidationResult.setMessage(exception.getMessage());
+            policyValidationResult.setStackTrace(exception);
+        }
+
+        return policyValidationResult;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
new file mode 100644
index 0000000..a0f3ad2
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyParseResult {
+    private boolean success;
+    private String message;
+    private String exception;
+
+    private PolicyExecutionPlan policyExecutionPlan;
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(String exception) {
+        this.exception = exception;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public void setStackTrace(Throwable throwable) {
+        this.setException(ExceptionUtils.getStackTrace(throwable));
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public PolicyExecutionPlan getPolicyExecutionPlan() {
+        return policyExecutionPlan;
+    }
+
+    public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
+        this.policyExecutionPlan = policyExecutionPlan;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
new file mode 100644
index 0000000..17f6091
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class PolicyValidationResult {
+    private boolean success;
+    private String message;
+    private String exception;
+
+    private PolicyExecutionPlan policyExecutionPlan;
+    private PolicyDefinition policyDefinition;
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(String exception) {
+        this.exception = exception;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public void setStackTrace(Throwable throwable) {
+        this.setException(ExceptionUtils.getStackTrace(throwable));
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public PolicyExecutionPlan getPolicyExecutionPlan() {
+        return policyExecutionPlan;
+    }
+
+    public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
+        this.policyExecutionPlan = policyExecutionPlan;
+    }
+
+    public PolicyDefinition getPolicyDefinition() {
+        return policyDefinition;
+    }
+
+    public void setPolicyDefinition(PolicyDefinition policyDefinition) {
+        this.policyDefinition = policyDefinition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
new file mode 100644
index 0000000..cf1a7e9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.interpreter;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.interpreter.PolicyExecutionPlan;
+import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter;
+import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
+import org.junit.Assert;
+import org.junit.Test;
+import org.wso2.siddhi.core.exception.DefinitionNotExistException;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+import java.util.*;
+
+public class PolicyInterpreterTest {
+    // -------------------------
+    // Single Stream Test Cases
+    // -------------------------
+    @Test
+    public void testParseSingleStreamPolicyQuery() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) "
+            + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
+        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
+        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
+        Assert.assertEquals(1, executionPlan.getStreamPartitions().size());
+        Assert.assertEquals(2*60*1000,executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+    }
+
+    @Test
+    public void testParseSingleStreamPolicyWithPattern() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+            "from e1=Stream1[price >= 20] -> e2=Stream2[price >= e1.price] \n"
+                + "select e1.symbol as symbol, e2.price as price, e1.price+e2.price as total_price \n"
+                + "group by symbol, company insert into OutStream");
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("Stream1"));
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("Stream2"));
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("OutStream"));
+        Assert.assertEquals(StreamPartition.Type.GROUPBY,executionPlan.getStreamPartitions().get(0).getType());
+        Assert.assertArrayEquals(new String[]{"symbol","company"},executionPlan.getStreamPartitions().get(0).getColumns().toArray());
+        Assert.assertEquals(StreamPartition.Type.GROUPBY,executionPlan.getStreamPartitions().get(1).getType());
+        Assert.assertArrayEquals(new String[]{"symbol","company"},executionPlan.getStreamPartitions().get(1).getColumns().toArray());
+    }
+
+    @Test
+    public void testParseSingleStreamPolicyQueryWithMultiplePartitionUsingLargerWindow() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+            "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 1 min) "
+            + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
+            + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 1 hour) "
+            + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
+        );
+        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
+        Assert.assertEquals(1, executionPlan.getStreamPartitions().size());
+        Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+    }
+
+    @Test(expected = ExecutionPlanValidationException.class)
+    public void testParseSingleStreamPolicyQueryWithConflictPartition() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+            "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 5 min) "
+            + "select cmd, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
+            + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) "
+            + "select user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
+        );
+        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
+        Assert.assertEquals(2, executionPlan.getStreamPartitions().size());
+        Assert.assertEquals(5*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+    }
+
+    @Test
+    public void testValidPolicyWithExternalTimeWindow() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1#window.externalTime(timestamp, 2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+                put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
+                put("INPUT_STREAM_3", mockStreamDefinition("INPUT_STREAM_3"));
+                put("INPUT_STREAM_4", mockStreamDefinition("INPUT_STREAM_4"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+        Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+    }
+
+    @Test
+    public void testValidPolicyWithTimeWindow() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1#window.time(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+        Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+    }
+
+    @Test
+    public void testValidPolicyWithTooManyInputStreams() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+                put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+    }
+
+    @Test
+    public void testValidPolicyWithTooFewOutputStreams() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue(
+            "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
+                + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;"
+        );
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+                put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
+    }
+
+    @Test
+    public void testInvalidPolicyForSyntaxError() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM", mockStreamDefinition("INPUT_STREAM"));
+            }
+        });
+        Assert.assertFalse(validation.isSuccess());
+    }
+
+    @Test
+    public void testInvalidPolicyForNotDefinedInputStream() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2"));
+            }
+        });
+        Assert.assertFalse(validation.isSuccess());
+    }
+
+    @Test
+    public void testInvalidPolicyForNotDefinedOutputStream() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;");
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+            }
+        });
+        Assert.assertFalse(validation.isSuccess());
+    }
+
+    // ---------------------
+    // Two Stream Test Cases
+    // ---------------------
+
+    @Test
+    public void testParseTwoStreamPolicyQueryWithMultiplePartition() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+            "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1#window.externalTime(timestamp, 1 min) "
+                + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
+                + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2#window.externalTime(timestamp, 1 hour) "
+                + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
+        );
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1"));
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2"));
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
+        Assert.assertEquals(2, executionPlan.getStreamPartitions().size());
+        Assert.assertEquals(60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+        Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(1).getSortSpec().getWindowPeriodMillis());
+    }
+
+    @Test
+    public void testParseTwoStreamPolicyQueryWithSinglePartition() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+            "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1#window.externalTime(timestamp, 1 min) "
+                + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;"
+                + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2 select * insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;"
+        );
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1"));
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2"));
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1"));
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2"));
+        Assert.assertEquals(2, executionPlan.getStreamPartitions().size());
+        Assert.assertEquals(60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+        Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(1).getType());
+    }
+
+
+    @Test
+    public void testParseTwoStreamPolicyQueryInnerJoin() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+            "from TickEvent[symbol=='EBAY']#window.length(2000) " +
+                "join NewsEvent#window.externalTime(timestamp, 1000 sec) \n" +
+                "select * insert into JoinStream"
+        );
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent"));
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent"));
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream"));
+        Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType());
+        Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
+        Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+        Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(1).getType());
+        Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec());
+    }
+
+    @Test
+    public void testParseTwoStreamPolicyQueryInnerJoinWithCondition() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+            "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" +
+                "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" +
+                "on TickEvent.symbol == NewsEvent.company \n" +
+                "insert into JoinStream "
+        );
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent"));
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent"));
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream"));
+        Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType());
+        Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
+        Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+        Assert.assertEquals(StreamPartition.Type.GROUPBY, executionPlan.getStreamPartitions().get(1).getType());
+        Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec());
+    }
+
+    @Test
+    public void testParseTwoStreamPolicyQueryInnerJoinWithConditionHavingAlias() throws Exception {
+        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan(
+            "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" +
+                "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" +
+                "on t.symbol == n.company \n" +
+                "insert into JoinStream "
+        );
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent"));
+        Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent"));
+        Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream"));
+        Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType());
+        Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
+        Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+        Assert.assertEquals(StreamPartition.Type.GROUPBY, executionPlan.getStreamPartitions().get(1).getType());
+        Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec());
+    }
+
+    @Test(expected = DefinitionNotExistException.class)
+    public void testParseTwoStreamPolicyQueryInnerJoinWithConditionHavingNotFoundAlias() throws Exception {
+        PolicyInterpreter.parseExecutionPlan(
+            "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" +
+            "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" +
+            "on t.symbol == NOT_EXIST_ALIAS.company \n" +
+            "insert into JoinStream "
+        );
+    }
+
+    // --------------
+    // Helper Methods
+    // --------------
+
+    private static StreamDefinition mockStreamDefinition(String streamId) {
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setStreamId(streamId);
+        List<StreamColumn> columns = new ArrayList<>();
+        columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        columns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
+        streamDefinition.setColumns(columns);
+        return streamDefinition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 190da2a..3368517 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -23,6 +23,9 @@ import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
 import org.apache.eagle.alert.coordination.model.internal.Topology;
 import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter;
+import org.apache.eagle.alert.engine.interpreter.PolicyParseResult;
+import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
 import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
 import org.apache.eagle.alert.metadata.resource.Models;
@@ -207,7 +210,11 @@ public class MetadataResource {
     @Path("/policies/validate")
     @POST
     public PolicyValidationResult validatePolicy(PolicyDefinition policy) {
-        return PolicyInterpreter.validate(policy,dao);
+        Map<String, StreamDefinition> allDefinitions = new HashMap<>();
+        for (StreamDefinition definition : dao.listStreams()) {
+            allDefinitions.put(definition.getStreamId(), definition);
+        }
+        return PolicyInterpreter.validate(policy, allDefinitions);
     }
 
     @Path("/policies/parse")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
deleted file mode 100644
index f925e3d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-
-import java.util.List;
-import java.util.Map;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyExecutionPlan {
-    /**
-     * Actual input streams.
-     */
-    private Map<String, List<StreamColumn>> inputStreams;
-
-    /**
-     * Actual output streams.
-     */
-    private Map<String, List<StreamColumn>> outputStreams;
-
-    /**
-     * Execution plan source.
-     */
-    private String executionPlanSource;
-
-    /**
-     * Execution plan.
-     */
-    private ExecutionPlan internalExecutionPlan;
-
-    private String executionPlanDesc;
-
-    private List<StreamPartition> streamPartitions;
-
-    public String getExecutionPlanSource() {
-        return executionPlanSource;
-    }
-
-    public void setExecutionPlanSource(String executionPlanSource) {
-        this.executionPlanSource = executionPlanSource;
-    }
-
-    public ExecutionPlan getInternalExecutionPlan() {
-        return internalExecutionPlan;
-    }
-
-    public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) {
-        this.internalExecutionPlan = internalExecutionPlan;
-    }
-
-    public String getExecutionPlanDesc() {
-        return executionPlanDesc;
-    }
-
-    public void setExecutionPlanDesc(String executionPlanDesc) {
-        this.executionPlanDesc = executionPlanDesc;
-    }
-
-    public List<StreamPartition> getStreamPartitions() {
-        return streamPartitions;
-    }
-
-    public void setStreamPartitions(List<StreamPartition> streamPartitions) {
-        this.streamPartitions = streamPartitions;
-    }
-
-    public Map<String, List<StreamColumn>> getInputStreams() {
-        return inputStreams;
-    }
-
-    public void setInputStreams(Map<String, List<StreamColumn>> inputStreams) {
-        this.inputStreams = inputStreams;
-    }
-
-    public Map<String, List<StreamColumn>> getOutputStreams() {
-        return outputStreams;
-    }
-
-    public void setOutputStreams(Map<String, List<StreamColumn>> outputStreams) {
-        this.outputStreams = outputStreams;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
deleted file mode 100644
index b97583c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import com.google.common.base.Preconditions;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-import org.wso2.siddhi.query.api.execution.ExecutionElement;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
-import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
-import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
-import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.api.execution.query.selection.Selector;
-import org.wso2.siddhi.query.api.expression.Variable;
-import org.wso2.siddhi.query.api.expression.constant.TimeConstant;
-import org.wso2.siddhi.query.compiler.SiddhiCompiler;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * PolicyInterpreter:
- * <ul>
- * <li>Parse: parse siddhi query and generate static execution plan</li>
- * <li>Validate: validate policy definition with execution plan and metadata</li>
- * </ul>
- * @see <a href="https://docs.wso2.com/display/CEP300/WSO2+Complex+Event+Processor+Documentation">WSO2 Complex Event Processor Documentation</a>
- */
-public class PolicyInterpreter {
-    private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class);
-
-    /**
-     * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow.
-     */
-    private static final String WINDOW_EXTERNAL_TIME = "externalTime";
-
-    public static PolicyParseResult parse(String executionPlanQuery) {
-        PolicyParseResult policyParseResult = new PolicyParseResult();
-        try {
-            policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery));
-            policyParseResult.setSuccess(true);
-            policyParseResult.setMessage("Parsed successfully");
-        } catch (Exception exception) {
-            LOG.error("Got error to parse policy: {}", executionPlanQuery, exception);
-            policyParseResult.setSuccess(false);
-            policyParseResult.setMessage(exception.getMessage());
-            policyParseResult.setStackTrace(exception);
-        }
-        return policyParseResult;
-    }
-
-    /**
-     * Quick parseExecutionPlan policy.
-     */
-    public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) throws Exception {
-        // Validate inputStreams are valid
-        Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from");
-        return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions));
-    }
-
-    public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception {
-        PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan();
-        try {
-            ExecutionPlan executionPlan = SiddhiCompiler.parse(executionPlanQuery);
-
-            policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString());
-
-            // Set current execution plan as valid
-            policyExecutionPlan.setExecutionPlanSource(executionPlanQuery);
-            policyExecutionPlan.setInternalExecutionPlan(executionPlan);
-
-            Map<String, List<StreamColumn>> actualInputStreams = new HashMap<>();
-            Map<String, List<StreamColumn>> actualOutputStreams = new HashMap<>();
-            List<StreamPartition> partitions = new ArrayList<>();
-
-            // Go through execution element
-            for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) {
-                if (executionElement instanceof Query) {
-                    // -------------
-                    // Explain Query
-                    // -------------
-
-                    // Input streams
-                    InputStream inputStream = ((Query) executionElement).getInputStream();
-                    Selector selector = ((Query) executionElement).getSelector();
-
-                    for (String streamId : inputStream.getUniqueStreamIds()) {
-                        if (!actualInputStreams.containsKey(streamId)) {
-                            org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId);
-                            if (streamDefinition != null) {
-                                actualInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns());
-                            } else {
-                                actualInputStreams.put(streamId, null);
-                            }
-                        }
-                    }
-
-                    // Window Spec and Partition
-                    if (inputStream instanceof SingleInputStream) {
-                        // Window Spec
-                        List<Window> windows = new ArrayList<>();
-                        for (StreamHandler streamHandler : ((SingleInputStream) inputStream).getStreamHandlers()) {
-                            if (streamHandler instanceof Window) {
-                                windows.add((Window) streamHandler);
-                            }
-                        }
-
-                        // Group By Spec
-                        List<Variable> groupBy = selector.getGroupByList();
-
-                        if (windows.size() > 0 || groupBy.size() >= 0) {
-                            partitions.add(convertSingleStreamWindowAndGroupByToPartition(((SingleInputStream) inputStream).getStreamId(), windows, groupBy));
-                        }
-                    }
-                    //    else if(inputStream instanceof JoinInputStream) {
-                    //        // TODO: Parse multiple stream join
-                    //    } else if(inputStream instanceof StateInputStream) {
-                    //        // TODO: Parse StateInputStream
-                    //    }
-
-                    // Output streams
-                    OutputStream outputStream = ((Query) executionElement).getOutputStream();
-                    actualOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList()));
-                } else {
-                    LOG.warn("Unhandled execution element: {}", executionElement.toString());
-                }
-            }
-            // Set used input streams
-            policyExecutionPlan.setInputStreams(actualInputStreams);
-
-            // Set Partitions
-            policyExecutionPlan.setStreamPartitions(partitions);
-
-            // Validate outputStreams
-            policyExecutionPlan.setOutputStreams(actualOutputStreams);
-        } catch (Exception ex) {
-            LOG.error("Got error to parseExecutionPlan policy execution plan: \n{}", executionPlanQuery, ex);
-            throw ex;
-        }
-        return policyExecutionPlan;
-    }
-
-    private static StreamPartition convertSingleStreamWindowAndGroupByToPartition(String streamId, List<Window> windows, List<Variable> groupBy) {
-        StreamPartition partition = new StreamPartition();
-        partition.setStreamId(streamId);
-        StreamSortSpec sortSpec = null;
-
-        if (windows.size() > 0) {
-            for (Window window : windows) {
-                if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) {
-                    sortSpec = new StreamSortSpec();
-                    sortSpec.setWindowPeriodMillis(((TimeConstant) window.getParameters()[1]).getValue().intValue());
-                    sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 3);
-                }
-            }
-        }
-        partition.setSortSpec(sortSpec);
-        if (groupBy.size() > 0) {
-            partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList()));
-            partition.setType(StreamPartition.Type.GROUPBY);
-        } else {
-            partition.setType(StreamPartition.Type.SHUFFLE);
-        }
-        return partition;
-    }
-
-    public static PolicyValidationResult validate(PolicyDefinition policy, IMetadataDao metadataDao) {
-        Map<String, StreamDefinition> allDefinitions = new HashMap<>();
-        for (StreamDefinition definition : metadataDao.listStreams()) {
-            allDefinitions.put(definition.getStreamId(), definition);
-        }
-        return validate(policy, allDefinitions);
-    }
-
-    public static PolicyValidationResult validate(PolicyDefinition policy, Map<String, StreamDefinition> allDefinitions) {
-        Map<String, StreamDefinition> inputDefinitions = new HashMap<>();
-        PolicyValidationResult policyValidationResult = new PolicyValidationResult();
-        policyValidationResult.setPolicyDefinition(policy);
-        try {
-            if (policy.getInputStreams() != null) {
-                for (String streamId : policy.getInputStreams()) {
-                    if (allDefinitions.containsKey(streamId)) {
-                        inputDefinitions.put(streamId, allDefinitions.get(streamId));
-                    } else {
-                        throw new StreamNotDefinedException(streamId);
-                    }
-                }
-            }
-
-            PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
-            // Validate output
-            if (policy.getOutputStreams() != null) {
-                for (String outputStream : policy.getOutputStreams()) {
-                    if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
-                        throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
-                    }
-                }
-            }
-            policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan);
-            policyValidationResult.setSuccess(true);
-            policyValidationResult.setMessage("Validated successfully");
-        } catch (Exception exception) {
-            LOG.error("Got error to validate policy definition: {}", policy, exception);
-            policyValidationResult.setSuccess(false);
-            policyValidationResult.setMessage(exception.getMessage());
-            policyValidationResult.setStackTrace(exception);
-        }
-
-        return policyValidationResult;
-    }
-
-    private static List<StreamColumn> convertOutputStreamColumns(List<OutputAttribute> outputAttributeList) {
-        return outputAttributeList.stream().map(outputAttribute -> {
-            StreamColumn streamColumn = new StreamColumn();
-            streamColumn.setName(outputAttribute.getRename());
-            streamColumn.setDescription(outputAttribute.getExpression().toString());
-            return streamColumn;
-        }).collect(Collectors.toList());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
deleted file mode 100644
index 2522270..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyParseResult {
-    private boolean success;
-    private String message;
-    private String exception;
-
-    private PolicyExecutionPlan policyExecutionPlan;
-
-    public String getException() {
-        return exception;
-    }
-
-    public void setException(String exception) {
-        this.exception = exception;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-
-    public void setMessage(String message) {
-        this.message = message;
-    }
-
-    public void setStackTrace(Throwable throwable) {
-        this.setException(ExceptionUtils.getStackTrace(throwable));
-    }
-
-    public boolean isSuccess() {
-        return success;
-    }
-
-    public void setSuccess(boolean success) {
-        this.success = success;
-    }
-
-    public PolicyExecutionPlan getPolicyExecutionPlan() {
-        return policyExecutionPlan;
-    }
-
-    public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
-        this.policyExecutionPlan = policyExecutionPlan;
-    }
-}



Mime
View raw message