Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D7084200BA3 for ; Thu, 20 Oct 2016 10:18:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D59AD160AE0; Thu, 20 Oct 2016 08:18:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9046A160ADB for ; Thu, 20 Oct 2016 10:18:24 +0200 (CEST) Received: (qmail 30689 invoked by uid 500); 20 Oct 2016 08:18:23 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 30679 invoked by uid 99); 20 Oct 2016 08:18:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2016 08:18:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 23463C0E19 for ; Thu, 20 Oct 2016 08:18:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 9lzB8-x8-V0s for ; Thu, 20 Oct 2016 08:18:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 0A5FA5FB4B for ; Thu, 20 Oct 2016 08:18:12 +0000 (UTC) Received: (qmail 13755 invoked by uid 99); 20 Oct 2016 08:08:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2016 08:08:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BD15CE0A59; Thu, 20 Oct 2016 08:08:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.incubator.apache.org Date: Thu, 20 Oct 2016 08:08:50 -0000 Message-Id: <518dba700f0249b6a65f41d14fe1572b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-eagle git commit: [EAGLE-647] Support Policy Execution Interpreter and Planner to compile siddhi query to distributed execution plan archived-at: Thu, 20 Oct 2016 08:18:27 -0000 [EAGLE-647] Support Policy Execution Interpreter and Planner to compile siddhi query to distributed execution plan Support Policy Execution Interpreter and Planner to compile siddhi query to distributed execution plan * Support parse siddhi pattern and join query as distributed execution * Support alias in inner join condition * Refactor PolicyIntepreter to eagle-alert-engine and decoupel PolicyExecutionPlanner * Fix factory method for PolicyExecutionPlanner Author: Hao Chen Closes #536 from haoch/SupportPatternAndJoin. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/64fce8f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/64fce8f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/64fce8f8 Branch: refs/heads/master Commit: 64fce8f80745d6180ff04b6950364a802d73723e Parents: 8991b61 Author: Hao Chen Authored: Thu Oct 20 16:08:33 2016 +0800 Committer: Hao Chen Committed: Thu Oct 20 16:08:33 2016 +0800 ---------------------------------------------------------------------- .../engine/interpreter/PolicyExecutionPlan.java | 100 +++++ .../interpreter/PolicyExecutionPlanner.java | 28 ++ .../interpreter/PolicyExecutionPlannerImpl.java | 339 +++++++++++++++++ .../engine/interpreter/PolicyInterpreter.java | 112 ++++++ .../engine/interpreter/PolicyParseResult.java | 65 ++++ .../interpreter/PolicyValidationResult.java | 76 ++++ .../interpreter/PolicyInterpreterTest.java | 381 +++++++++++++++++++ .../metadata/resource/MetadataResource.java | 9 +- .../metadata/resource/PolicyExecutionPlan.java | 100 ----- .../metadata/resource/PolicyInterpreter.java | 244 ------------ .../metadata/resource/PolicyParseResult.java | 65 ---- .../resource/PolicyValidationResult.java | 76 ---- .../resource/PolicyInterpreterTest.java | 226 ----------- 13 files changed, 1109 insertions(+), 712 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java new file mode 100644 index 0000000..7ecc36f --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlan.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.interpreter; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.eagle.alert.engine.coordinator.StreamColumn; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.wso2.siddhi.query.api.ExecutionPlan; + +import java.util.List; +import java.util.Map; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +public class PolicyExecutionPlan { + /** + * Actual input streams. + */ + private Map> inputStreams; + + /** + * Actual output streams. + */ + private Map> outputStreams; + + /** + * Execution plan source. + */ + private String executionPlanSource; + + /** + * Execution plan. + */ + private ExecutionPlan internalExecutionPlan; + + private String executionPlanDesc; + + private List streamPartitions; + + public String getExecutionPlanSource() { + return executionPlanSource; + } + + public void setExecutionPlanSource(String executionPlanSource) { + this.executionPlanSource = executionPlanSource; + } + + public ExecutionPlan getInternalExecutionPlan() { + return internalExecutionPlan; + } + + public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) { + this.internalExecutionPlan = internalExecutionPlan; + } + + public String getExecutionPlanDesc() { + return executionPlanDesc; + } + + public void setExecutionPlanDesc(String executionPlanDesc) { + this.executionPlanDesc = executionPlanDesc; + } + + public List getStreamPartitions() { + return streamPartitions; + } + + public void setStreamPartitions(List streamPartitions) { + this.streamPartitions = streamPartitions; + } + + public Map> getInputStreams() { + return inputStreams; + } + + public void setInputStreams(Map> inputStreams) { + this.inputStreams = inputStreams; + } + + public Map> getOutputStreams() { + return outputStreams; + } + + public void setOutputStreams(Map> outputStreams) { + this.outputStreams = outputStreams; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java new file mode 100644 index 0000000..9e8f9f1 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.interpreter; + +interface PolicyExecutionPlanner { + /** + * @return PolicyExecutionPlan. + */ + PolicyExecutionPlan getExecutionPlan(); + + static PolicyExecutionPlan parseExecutionPlan(String executionPlan) throws Exception { + return new PolicyExecutionPlannerImpl(executionPlan).getExecutionPlan(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java new file mode 100644 index 0000000..82bb64f --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.interpreter; + +import com.google.common.base.Preconditions; +import org.apache.commons.collections.ListUtils; +import org.apache.eagle.alert.engine.coordinator.StreamColumn; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; +import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.exception.DefinitionNotExistException; +import org.wso2.siddhi.query.api.ExecutionPlan; +import org.wso2.siddhi.query.api.exception.DuplicateDefinitionException; +import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; +import org.wso2.siddhi.query.api.execution.ExecutionElement; +import org.wso2.siddhi.query.api.execution.query.Query; +import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler; +import org.wso2.siddhi.query.api.execution.query.input.handler.Window; +import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream; +import org.wso2.siddhi.query.api.execution.query.input.stream.JoinInputStream; +import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream; +import org.wso2.siddhi.query.api.execution.query.input.stream.StateInputStream; +import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream; +import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute; +import org.wso2.siddhi.query.api.execution.query.selection.Selector; +import org.wso2.siddhi.query.api.expression.Expression; +import org.wso2.siddhi.query.api.expression.Variable; +import org.wso2.siddhi.query.api.expression.condition.Compare; +import org.wso2.siddhi.query.api.expression.constant.IntConstant; +import org.wso2.siddhi.query.api.expression.constant.LongConstant; +import org.wso2.siddhi.query.api.expression.constant.TimeConstant; +import org.wso2.siddhi.query.compiler.SiddhiCompiler; + +import java.util.*; +import java.util.stream.Collectors; + +class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner { + + private static final Logger LOG = LoggerFactory.getLogger(PolicyExecutionPlannerImpl.class); + + /** + * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow. + */ + private static final String WINDOW_EXTERNAL_TIME = "externalTime"; + + private final String executionPlan; + private final Map> effectiveInputStreams; + private final Map> effectiveOutputStreams; + private final Map effectivePartitions; + private final PolicyExecutionPlan policyExecutionPlan; + + public PolicyExecutionPlannerImpl(String executionPlan) throws Exception { + this.executionPlan = executionPlan; + this.effectiveInputStreams = new HashMap<>(); + this.effectiveOutputStreams = new HashMap<>(); + this.effectivePartitions = new HashMap<>(); + this.policyExecutionPlan = doParse(); + } + + @Override + public PolicyExecutionPlan getExecutionPlan() { + return policyExecutionPlan; + } + + private PolicyExecutionPlan doParse() throws Exception { + PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan(); + try { + ExecutionPlan executionPlan = SiddhiCompiler.parse(this.executionPlan); + + policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString()); + + // Set current execution plan as valid + policyExecutionPlan.setExecutionPlanSource(this.executionPlan); + policyExecutionPlan.setInternalExecutionPlan(executionPlan); + + + // Go through execution element + for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) { + // ------------- + // Explain Query + // ------------- + if (executionElement instanceof Query) { + // ----------------------- + // Query Level Variables + // ----------------------- + InputStream inputStream = ((Query) executionElement).getInputStream(); + Selector selector = ((Query) executionElement).getSelector(); + Map queryLevelAliasToStreamMapping = new HashMap<>(); + + // Inputs stream definitions + for (String streamId : inputStream.getUniqueStreamIds()) { + if (!effectiveInputStreams.containsKey(streamId)) { + org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId); + if (streamDefinition != null) { + effectiveInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns()); + } else { + effectiveInputStreams.put(streamId, null); + } + } + } + + // Window Spec and Partition + if (inputStream instanceof SingleInputStream) { + retrieveAliasForQuery((SingleInputStream) inputStream, queryLevelAliasToStreamMapping); + retrievePartition(findStreamPartition((SingleInputStream) inputStream, selector)); + } else { + if (inputStream instanceof JoinInputStream) { + // Only Support JOIN/INNER_JOIN Now + if (((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.INNER_JOIN) || ((JoinInputStream) inputStream).getType().equals(JoinInputStream.Type.JOIN)) { + SingleInputStream leftInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getLeftInputStream(); + SingleInputStream rightInputStream = (SingleInputStream) ((JoinInputStream) inputStream).getRightInputStream(); + + retrievePartition(findStreamPartition(leftInputStream, selector)); + retrievePartition(findStreamPartition(rightInputStream, selector)); + retrieveAliasForQuery(leftInputStream, queryLevelAliasToStreamMapping); + retrieveAliasForQuery(rightInputStream, queryLevelAliasToStreamMapping); + + } else { + throw new ExecutionPlanValidationException("Not support " + ((JoinInputStream) inputStream).getType() + " yet, currently support: INNER JOIN"); + } + + Expression joinCondition = ((JoinInputStream) inputStream).getOnCompare(); + + if (joinCondition != null) { + if (joinCondition instanceof Compare) { + if (((Compare) joinCondition).getOperator().equals(Compare.Operator.EQUAL)) { + Variable leftExpression = (Variable) ((Compare) joinCondition).getLeftExpression(); + Preconditions.checkNotNull(leftExpression.getStreamId()); + Preconditions.checkNotNull(leftExpression.getAttributeName()); + + StreamPartition leftPartition = new StreamPartition(); + leftPartition.setType(StreamPartition.Type.GROUPBY); + leftPartition.setColumns(Collections.singletonList(leftExpression.getAttributeName())); + leftPartition.setStreamId(retrieveStreamId(leftExpression, effectiveInputStreams,queryLevelAliasToStreamMapping)); + retrievePartition(leftPartition); + + Variable rightExpression = (Variable) ((Compare) joinCondition).getRightExpression(); + Preconditions.checkNotNull(rightExpression.getStreamId()); + Preconditions.checkNotNull(rightExpression.getAttributeName()); + StreamPartition rightPartition = new StreamPartition(); + rightPartition.setType(StreamPartition.Type.GROUPBY); + rightPartition.setColumns(Collections.singletonList(rightExpression.getAttributeName())); + rightPartition.setStreamId(retrieveStreamId(rightExpression, effectiveInputStreams,queryLevelAliasToStreamMapping)); + retrievePartition(leftPartition); + } else { + throw new ExecutionPlanValidationException("Only support \"EQUAL\" condition in INNER JOIN" + joinCondition); + } + } else { + throw new ExecutionPlanValidationException("Only support \"Compare\" on INNER JOIN condition in INNER JOIN: " + joinCondition); + } + } + } else if (inputStream instanceof StateInputStream) { + // Group By Spec + List groupBy = selector.getGroupByList(); + if (groupBy.size() >= 0) { + Map> streamGroupBy = new HashMap<>(); + for (String streamId : inputStream.getUniqueStreamIds()) { + streamGroupBy.put(streamId, new ArrayList<>()); + } + for (Variable variable : groupBy) { + // Not stream not set, then should be all streams' same field + if (variable.getStreamId() == null) { + for (String streamId : inputStream.getUniqueStreamIds()) { + streamGroupBy.get(streamId).add(variable); + } + } else { + String streamId = retrieveStreamId(variable, effectiveInputStreams,queryLevelAliasToStreamMapping); + if (streamGroupBy.containsKey(streamId)) { + streamGroupBy.get(streamId).add(variable); + } else { + throw new DefinitionNotExistException(streamId); + } + } + } + for (Map.Entry> entry : streamGroupBy.entrySet()) { + if (entry.getValue().size() > 0) { + retrievePartition(generatePartition(entry.getKey(), null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()])))); + } + } + } + } + } + + // Output streams + OutputStream outputStream = ((Query) executionElement).getOutputStream(); + effectiveOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList())); + } else { + LOG.warn("Unhandled execution element: {}", executionElement.toString()); + } + } + // Set effective input streams + policyExecutionPlan.setInputStreams(effectiveInputStreams); + + // Set effective output streams + policyExecutionPlan.setOutputStreams(effectiveOutputStreams); + + // Set Partitions + for (String streamId : effectiveInputStreams.keySet()) { + // Use shuffle partition by default + if (!effectivePartitions.containsKey(streamId)) { + StreamPartition shufflePartition = new StreamPartition(); + shufflePartition.setStreamId(streamId); + shufflePartition.setType(StreamPartition.Type.SHUFFLE); + effectivePartitions.put(streamId, shufflePartition); + } + } + policyExecutionPlan.setStreamPartitions(new ArrayList<>(effectivePartitions.values())); + } catch (Exception ex) { + LOG.error("Got error to parse policy execution plan: \n{}", this.executionPlan, ex); + throw ex; + } + return policyExecutionPlan; + } + + private String retrieveStreamId(Variable variable, Map> streamMap, Map aliasMap) { + Preconditions.checkNotNull(variable.getStreamId(), "streamId"); + if (streamMap.containsKey(variable.getStreamId()) && aliasMap.containsKey(variable.getStreamId())) { + throw new DuplicateDefinitionException("Duplicated streamId and alias: " + variable.getStreamId()); + } else if (streamMap.containsKey(variable.getStreamId())) { + return variable.getStreamId(); + } else if (aliasMap.containsKey(variable.getStreamId())) { + return aliasMap.get(variable.getStreamId()).getStreamId(); + } else { + throw new DefinitionNotExistException(variable.getStreamId()); + } + } + + private StreamPartition findStreamPartition(SingleInputStream inputStream, Selector selector) { + // Window Spec + List windows = new ArrayList<>(); + for (StreamHandler streamHandler : inputStream.getStreamHandlers()) { + if (streamHandler instanceof Window) { + windows.add((Window) streamHandler); + } + } + + // Group By Spec + List groupBy = selector.getGroupByList(); + if (windows.size() > 0 || groupBy.size() >= 0) { + return generatePartition(inputStream.getStreamId(), windows, groupBy); + } else { + return null; + } + } + + private void retrievePartition(StreamPartition partition) { + if (partition == null) { + return; + } + + if (!effectivePartitions.containsKey(partition.getStreamId())) { + effectivePartitions.put(partition.getStreamId(), partition); + } else if (!effectivePartitions.get(partition.getStreamId()).equals(partition)) { + StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId()); + // If same Type & Columns but different sort spec, then use larger + if (existingPartition.getType().equals(partition.getType()) + && ListUtils.isEqualList(existingPartition.getColumns(), partition.getColumns()) + && partition.getSortSpec().getWindowPeriodMillis() > existingPartition.getSortSpec().getWindowPeriodMillis() + || existingPartition.getType().equals(StreamPartition.Type.SHUFFLE)) { + effectivePartitions.put(partition.getStreamId(), partition); + } else { + // Throw exception as it unable to conflict effectivePartitions on same stream will not be able to run in distributed mode + throw new ExecutionPlanValidationException("You have incompatible partitions on stream " + partition.getStreamId() + + ": [1] " + effectivePartitions.get(partition.getStreamId()).toString() + " [2] " + partition.toString() + ""); + } + } + } + + private void retrieveAliasForQuery(SingleInputStream inputStream, Map aliasStreamMapping) { + if (inputStream.getStreamReferenceId() != null) { + if (aliasStreamMapping.containsKey(inputStream.getStreamReferenceId())) { + throw new ExecutionPlanValidationException("Duplicated stream alias " + inputStream.getStreamId() + " -> " + inputStream); + } else { + aliasStreamMapping.put(inputStream.getStreamReferenceId(), inputStream); + } + } + } + + private StreamPartition generatePartition(String streamId, List windows, List groupBy) { + StreamPartition partition = new StreamPartition(); + partition.setStreamId(streamId); + StreamSortSpec sortSpec = null; + if (windows != null && windows.size() > 0) { + for (Window window : windows) { + if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) { + sortSpec = new StreamSortSpec(); + sortSpec.setWindowPeriodMillis(getExternalTimeWindowSize(window)); + sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 5); + } + } + } + partition.setSortSpec(sortSpec); + if (groupBy != null && groupBy.size() > 0) { + partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList())); + partition.setType(StreamPartition.Type.GROUPBY); + } else { + partition.setType(StreamPartition.Type.SHUFFLE); + } + return partition; + } + + private static int getExternalTimeWindowSize(Window window) { + Expression windowSize = window.getParameters()[1]; + if (windowSize instanceof TimeConstant) { + return ((TimeConstant) windowSize).getValue().intValue(); + } else if (windowSize instanceof IntConstant) { + return ((IntConstant) windowSize).getValue(); + } else if (windowSize instanceof LongConstant) { + return ((LongConstant) windowSize).getValue().intValue(); + } else { + throw new UnsupportedOperationException("Illegal type of window size expression:" + windowSize.toString()); + } + } + + private static List convertOutputStreamColumns(List outputAttributeList) { + return outputAttributeList.stream().map(outputAttribute -> { + StreamColumn streamColumn = new StreamColumn(); + streamColumn.setName(outputAttribute.getRename()); + streamColumn.setDescription(outputAttribute.getExpression().toString()); + return streamColumn; + }).collect(Collectors.toList()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java new file mode 100644 index 0000000..8aec294 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.interpreter; + +import com.google.common.base.Preconditions; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException; +import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * PolicyInterpreter Helper Methods: + *

    + *
  • Parse: parse siddhi query and generate static execution plan
  • + *
  • Validate: validate policy definition with execution plan and metadata
  • + *
+ * + * @see PolicyExecutionPlanner + * @see WSO2 Complex Event Processor Documentation + */ +public class PolicyInterpreter { + private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class); + + /** + * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow. + */ + private static final String WINDOW_EXTERNAL_TIME = "externalTime"; + + public static PolicyParseResult parse(String executionPlanQuery) { + PolicyParseResult policyParseResult = new PolicyParseResult(); + try { + policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery)); + policyParseResult.setSuccess(true); + policyParseResult.setMessage("Parsed successfully"); + } catch (Exception exception) { + LOG.error("Got error to parse policy: {}", executionPlanQuery, exception); + policyParseResult.setSuccess(false); + policyParseResult.setMessage(exception.getMessage()); + policyParseResult.setStackTrace(exception); + } + return policyParseResult; + } + + /** + * Quick parseExecutionPlan policy. + */ + public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map inputStreamDefinitions) throws Exception { + // Validate inputStreams are valid + Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from"); + return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions)); + } + + public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception { + return PolicyExecutionPlanner.parseExecutionPlan(executionPlanQuery); + } + + public static PolicyValidationResult validate(PolicyDefinition policy, Map allDefinitions) { + Map inputDefinitions = new HashMap<>(); + PolicyValidationResult policyValidationResult = new PolicyValidationResult(); + policyValidationResult.setPolicyDefinition(policy); + try { + if (policy.getInputStreams() != null) { + for (String streamId : policy.getInputStreams()) { + if (allDefinitions.containsKey(streamId)) { + inputDefinitions.put(streamId, allDefinitions.get(streamId)); + } else { + throw new StreamNotDefinedException(streamId); + } + } + } + + PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions); + // Validate output + if (policy.getOutputStreams() != null) { + for (String outputStream : policy.getOutputStreams()) { + if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) { + throw new StreamNotDefinedException("Output stream " + outputStream + " not defined"); + } + } + } + policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan); + policyValidationResult.setSuccess(true); + policyValidationResult.setMessage("Validated successfully"); + } catch (Exception exception) { + LOG.error("Got error to validate policy definition: {}", policy, exception); + policyValidationResult.setSuccess(false); + policyValidationResult.setMessage(exception.getMessage()); + policyValidationResult.setStackTrace(exception); + } + + return policyValidationResult; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java new file mode 100644 index 0000000..a0f3ad2 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyParseResult.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.interpreter; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.commons.lang3.exception.ExceptionUtils; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +public class PolicyParseResult { + private boolean success; + private String message; + private String exception; + + private PolicyExecutionPlan policyExecutionPlan; + + public String getException() { + return exception; + } + + public void setException(String exception) { + this.exception = exception; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public void setStackTrace(Throwable throwable) { + this.setException(ExceptionUtils.getStackTrace(throwable)); + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public PolicyExecutionPlan getPolicyExecutionPlan() { + return policyExecutionPlan; + } + + public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) { + this.policyExecutionPlan = policyExecutionPlan; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java new file mode 100644 index 0000000..17f6091 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyValidationResult.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.interpreter; + + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +public class PolicyValidationResult { + private boolean success; + private String message; + private String exception; + + private PolicyExecutionPlan policyExecutionPlan; + private PolicyDefinition policyDefinition; + + public String getException() { + return exception; + } + + public void setException(String exception) { + this.exception = exception; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public void setStackTrace(Throwable throwable) { + this.setException(ExceptionUtils.getStackTrace(throwable)); + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public PolicyExecutionPlan getPolicyExecutionPlan() { + return policyExecutionPlan; + } + + public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) { + this.policyExecutionPlan = policyExecutionPlan; + } + + public PolicyDefinition getPolicyDefinition() { + return policyDefinition; + } + + public void setPolicyDefinition(PolicyDefinition policyDefinition) { + this.policyDefinition = policyDefinition; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java new file mode 100644 index 0000000..cf1a7e9 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.interpreter; + +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamColumn; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.interpreter.PolicyExecutionPlan; +import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter; +import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult; +import org.junit.Assert; +import org.junit.Test; +import org.wso2.siddhi.core.exception.DefinitionNotExistException; +import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; + +import java.util.*; + +public class PolicyInterpreterTest { + // ------------------------- + // Single Stream Test Cases + // ------------------------- + @Test + public void testParseSingleStreamPolicyQuery() throws Exception { + PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) " + + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT"); + Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]); + Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]); + Assert.assertEquals(1, executionPlan.getStreamPartitions().size()); + Assert.assertEquals(2*60*1000,executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); + } + + @Test + public void testParseSingleStreamPolicyWithPattern() throws Exception { + PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( + "from e1=Stream1[price >= 20] -> e2=Stream2[price >= e1.price] \n" + + "select e1.symbol as symbol, e2.price as price, e1.price+e2.price as total_price \n" + + "group by symbol, company insert into OutStream"); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("Stream1")); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("Stream2")); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("OutStream")); + Assert.assertEquals(StreamPartition.Type.GROUPBY,executionPlan.getStreamPartitions().get(0).getType()); + Assert.assertArrayEquals(new String[]{"symbol","company"},executionPlan.getStreamPartitions().get(0).getColumns().toArray()); + Assert.assertEquals(StreamPartition.Type.GROUPBY,executionPlan.getStreamPartitions().get(1).getType()); + Assert.assertArrayEquals(new String[]{"symbol","company"},executionPlan.getStreamPartitions().get(1).getColumns().toArray()); + } + + @Test + public void testParseSingleStreamPolicyQueryWithMultiplePartitionUsingLargerWindow() throws Exception { + PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 1 min) " + + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;" + + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 1 hour) " + + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;" + ); + Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1")); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2")); + Assert.assertEquals(1, executionPlan.getStreamPartitions().size()); + Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); + } + + @Test(expected = ExecutionPlanValidationException.class) + public void testParseSingleStreamPolicyQueryWithConflictPartition() throws Exception { + PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 5 min) " + + "select cmd, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;" + + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp, 2 min) " + + "select user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;" + ); + Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1")); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2")); + Assert.assertEquals(2, executionPlan.getStreamPartitions().size()); + Assert.assertEquals(5*60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); + } + + @Test + public void testValidPolicyWithExternalTimeWindow() { + PolicyDefinition policyDefinition = new PolicyDefinition(); + policyDefinition.setName("test_policy"); + policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1")); + policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType("siddhi"); + definition.setValue("from INPUT_STREAM_1#window.externalTime(timestamp, 2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;"); + definition.setInputStreams(policyDefinition.getInputStreams()); + definition.setOutputStreams(policyDefinition.getOutputStreams()); + policyDefinition.setDefinition(definition); + + PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap() { + { + put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); + put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2")); + put("INPUT_STREAM_3", mockStreamDefinition("INPUT_STREAM_3")); + put("INPUT_STREAM_4", mockStreamDefinition("INPUT_STREAM_4")); + } + }); + Assert.assertTrue(validation.isSuccess()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size()); + Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec()); + } + + @Test + public void testValidPolicyWithTimeWindow() { + PolicyDefinition policyDefinition = new PolicyDefinition(); + policyDefinition.setName("test_policy"); + policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1")); + policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType("siddhi"); + definition.setValue("from INPUT_STREAM_1#window.time(2 min) select name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;"); + definition.setInputStreams(policyDefinition.getInputStreams()); + definition.setOutputStreams(policyDefinition.getOutputStreams()); + policyDefinition.setDefinition(definition); + + PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap() { + { + put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); + } + }); + Assert.assertTrue(validation.isSuccess()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size()); + Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec()); + } + + @Test + public void testValidPolicyWithTooManyInputStreams() { + PolicyDefinition policyDefinition = new PolicyDefinition(); + policyDefinition.setName("test_policy"); + policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2")); + policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType("siddhi"); + definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"); + definition.setInputStreams(policyDefinition.getInputStreams()); + definition.setOutputStreams(policyDefinition.getOutputStreams()); + policyDefinition.setDefinition(definition); + + PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap() { + { + put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); + put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2")); + } + }); + Assert.assertTrue(validation.isSuccess()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size()); + } + + @Test + public void testValidPolicyWithTooFewOutputStreams() { + PolicyDefinition policyDefinition = new PolicyDefinition(); + policyDefinition.setName("test_policy"); + policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2")); + policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType("siddhi"); + definition.setValue( + "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;" + + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into OUTPUT_STREAM_2;" + ); + definition.setInputStreams(policyDefinition.getInputStreams()); + definition.setOutputStreams(policyDefinition.getOutputStreams()); + policyDefinition.setDefinition(definition); + + PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap() { + { + put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); + put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2")); + } + }); + Assert.assertTrue(validation.isSuccess()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); + Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size()); + } + + @Test + public void testInvalidPolicyForSyntaxError() { + PolicyDefinition policyDefinition = new PolicyDefinition(); + policyDefinition.setName("test_policy"); + policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM")); + policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM")); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType("siddhi"); + definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert into OUTPUT_STREAM;"); + definition.setInputStreams(policyDefinition.getInputStreams()); + definition.setOutputStreams(policyDefinition.getOutputStreams()); + policyDefinition.setDefinition(definition); + + PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap() { + { + put("INPUT_STREAM", mockStreamDefinition("INPUT_STREAM")); + } + }); + Assert.assertFalse(validation.isSuccess()); + } + + @Test + public void testInvalidPolicyForNotDefinedInputStream() { + PolicyDefinition policyDefinition = new PolicyDefinition(); + policyDefinition.setName("test_policy"); + policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1")); + policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1")); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType("siddhi"); + definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"); + definition.setInputStreams(policyDefinition.getInputStreams()); + definition.setOutputStreams(policyDefinition.getOutputStreams()); + policyDefinition.setDefinition(definition); + + PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap() { + { + put("INPUT_STREAM_2", mockStreamDefinition("INPUT_STREAM_2")); + } + }); + Assert.assertFalse(validation.isSuccess()); + } + + @Test + public void testInvalidPolicyForNotDefinedOutputStream() { + PolicyDefinition policyDefinition = new PolicyDefinition(); + policyDefinition.setName("test_policy"); + policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1")); + policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2")); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType("siddhi"); + definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"); + definition.setInputStreams(policyDefinition.getInputStreams()); + definition.setOutputStreams(policyDefinition.getOutputStreams()); + policyDefinition.setDefinition(definition); + + PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap() { + { + put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1")); + } + }); + Assert.assertFalse(validation.isSuccess()); + } + + // --------------------- + // Two Stream Test Cases + // --------------------- + + @Test + public void testParseTwoStreamPolicyQueryWithMultiplePartition() throws Exception { + PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1#window.externalTime(timestamp, 1 min) " + + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;" + + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2#window.externalTime(timestamp, 1 hour) " + + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;" + ); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1")); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2")); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1")); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2")); + Assert.assertEquals(2, executionPlan.getStreamPartitions().size()); + Assert.assertEquals(60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); + Assert.assertEquals(60*60*1000, executionPlan.getStreamPartitions().get(1).getSortSpec().getWindowPeriodMillis()); + } + + @Test + public void testParseTwoStreamPolicyQueryWithSinglePartition() throws Exception { + PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1#window.externalTime(timestamp, 1 min) " + + "select cmd,user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1;" + + "from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2 select * insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2;" + ); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_1")); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_2")); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_1")); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT_2")); + Assert.assertEquals(2, executionPlan.getStreamPartitions().size()); + Assert.assertEquals(60*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); + Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(1).getType()); + } + + + @Test + public void testParseTwoStreamPolicyQueryInnerJoin() throws Exception { + PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( + "from TickEvent[symbol=='EBAY']#window.length(2000) " + + "join NewsEvent#window.externalTime(timestamp, 1000 sec) \n" + + "select * insert into JoinStream" + ); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent")); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent")); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream")); + Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType()); + Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec()); + Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); + Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(1).getType()); + Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec()); + } + + @Test + public void testParseTwoStreamPolicyQueryInnerJoinWithCondition() throws Exception { + PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( + "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" + + "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" + + "on TickEvent.symbol == NewsEvent.company \n" + + "insert into JoinStream " + ); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent")); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent")); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream")); + Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType()); + Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec()); + Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); + Assert.assertEquals(StreamPartition.Type.GROUPBY, executionPlan.getStreamPartitions().get(1).getType()); + Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec()); + } + + @Test + public void testParseTwoStreamPolicyQueryInnerJoinWithConditionHavingAlias() throws Exception { + PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan( + "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" + + "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" + + "on t.symbol == n.company \n" + + "insert into JoinStream " + ); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("TickEvent")); + Assert.assertTrue(executionPlan.getInputStreams().containsKey("NewsEvent")); + Assert.assertTrue(executionPlan.getOutputStreams().containsKey("JoinStream")); + Assert.assertEquals(StreamPartition.Type.SHUFFLE, executionPlan.getStreamPartitions().get(0).getType()); + Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec()); + Assert.assertEquals(1000*1000, executionPlan.getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); + Assert.assertEquals(StreamPartition.Type.GROUPBY, executionPlan.getStreamPartitions().get(1).getType()); + Assert.assertNull(executionPlan.getStreamPartitions().get(1).getSortSpec()); + } + + @Test(expected = DefinitionNotExistException.class) + public void testParseTwoStreamPolicyQueryInnerJoinWithConditionHavingNotFoundAlias() throws Exception { + PolicyInterpreter.parseExecutionPlan( + "from TickEvent[symbol=='EBAY']#window.length(2000) as t unidirectional \n" + + "join NewsEvent#window.externalTime(timestamp, 1000 sec) as n \n" + + "on t.symbol == NOT_EXIST_ALIAS.company \n" + + "insert into JoinStream " + ); + } + + // -------------- + // Helper Methods + // -------------- + + private static StreamDefinition mockStreamDefinition(String streamId) { + StreamDefinition streamDefinition = new StreamDefinition(); + streamDefinition.setStreamId(streamId); + List columns = new ArrayList<>(); + columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build()); + columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build()); + columns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build()); + streamDefinition.setColumns(columns); + return streamDefinition; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java index 190da2a..3368517 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java @@ -23,6 +23,9 @@ import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter; +import org.apache.eagle.alert.engine.interpreter.PolicyParseResult; +import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult; import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory; import org.apache.eagle.alert.metadata.resource.Models; @@ -207,7 +210,11 @@ public class MetadataResource { @Path("/policies/validate") @POST public PolicyValidationResult validatePolicy(PolicyDefinition policy) { - return PolicyInterpreter.validate(policy,dao); + Map allDefinitions = new HashMap<>(); + for (StreamDefinition definition : dao.listStreams()) { + allDefinitions.put(definition.getStreamId(), definition); + } + return PolicyInterpreter.validate(policy, allDefinitions); } @Path("/policies/parse") http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java deleted file mode 100644 index f925e3d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyExecutionPlan.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.service.metadata.resource; - -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.wso2.siddhi.query.api.ExecutionPlan; - -import java.util.List; -import java.util.Map; - -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -public class PolicyExecutionPlan { - /** - * Actual input streams. - */ - private Map> inputStreams; - - /** - * Actual output streams. - */ - private Map> outputStreams; - - /** - * Execution plan source. - */ - private String executionPlanSource; - - /** - * Execution plan. - */ - private ExecutionPlan internalExecutionPlan; - - private String executionPlanDesc; - - private List streamPartitions; - - public String getExecutionPlanSource() { - return executionPlanSource; - } - - public void setExecutionPlanSource(String executionPlanSource) { - this.executionPlanSource = executionPlanSource; - } - - public ExecutionPlan getInternalExecutionPlan() { - return internalExecutionPlan; - } - - public void setInternalExecutionPlan(ExecutionPlan internalExecutionPlan) { - this.internalExecutionPlan = internalExecutionPlan; - } - - public String getExecutionPlanDesc() { - return executionPlanDesc; - } - - public void setExecutionPlanDesc(String executionPlanDesc) { - this.executionPlanDesc = executionPlanDesc; - } - - public List getStreamPartitions() { - return streamPartitions; - } - - public void setStreamPartitions(List streamPartitions) { - this.streamPartitions = streamPartitions; - } - - public Map> getInputStreams() { - return inputStreams; - } - - public void setInputStreams(Map> inputStreams) { - this.inputStreams = inputStreams; - } - - public Map> getOutputStreams() { - return outputStreams; - } - - public void setOutputStreams(Map> outputStreams) { - this.outputStreams = outputStreams; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java deleted file mode 100644 index b97583c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyInterpreter.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.service.metadata.resource; - -import com.google.common.base.Preconditions; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.query.api.ExecutionPlan; -import org.wso2.siddhi.query.api.execution.ExecutionElement; -import org.wso2.siddhi.query.api.execution.query.Query; -import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler; -import org.wso2.siddhi.query.api.execution.query.input.handler.Window; -import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream; -import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream; -import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream; -import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute; -import org.wso2.siddhi.query.api.execution.query.selection.Selector; -import org.wso2.siddhi.query.api.expression.Variable; -import org.wso2.siddhi.query.api.expression.constant.TimeConstant; -import org.wso2.siddhi.query.compiler.SiddhiCompiler; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * PolicyInterpreter: - *

    - *
  • Parse: parse siddhi query and generate static execution plan
  • - *
  • Validate: validate policy definition with execution plan and metadata
  • - *
- * @see WSO2 Complex Event Processor Documentation - */ -public class PolicyInterpreter { - private static final Logger LOG = LoggerFactory.getLogger(PolicyInterpreter.class); - - /** - * See https://docs.wso2.com/display/CEP300/Windows#Windows-ExternalTimeWindow. - */ - private static final String WINDOW_EXTERNAL_TIME = "externalTime"; - - public static PolicyParseResult parse(String executionPlanQuery) { - PolicyParseResult policyParseResult = new PolicyParseResult(); - try { - policyParseResult.setPolicyExecutionPlan(parseExecutionPlan(executionPlanQuery)); - policyParseResult.setSuccess(true); - policyParseResult.setMessage("Parsed successfully"); - } catch (Exception exception) { - LOG.error("Got error to parse policy: {}", executionPlanQuery, exception); - policyParseResult.setSuccess(false); - policyParseResult.setMessage(exception.getMessage()); - policyParseResult.setStackTrace(exception); - } - return policyParseResult; - } - - /** - * Quick parseExecutionPlan policy. - */ - public static PolicyExecutionPlan parseExecutionPlan(String policyDefinition, Map inputStreamDefinitions) throws Exception { - // Validate inputStreams are valid - Preconditions.checkNotNull(inputStreamDefinitions, "No inputStreams to connect from"); - return parseExecutionPlan(SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition, inputStreamDefinitions)); - } - - public static PolicyExecutionPlan parseExecutionPlan(String executionPlanQuery) throws Exception { - PolicyExecutionPlan policyExecutionPlan = new PolicyExecutionPlan(); - try { - ExecutionPlan executionPlan = SiddhiCompiler.parse(executionPlanQuery); - - policyExecutionPlan.setExecutionPlanDesc(executionPlan.toString()); - - // Set current execution plan as valid - policyExecutionPlan.setExecutionPlanSource(executionPlanQuery); - policyExecutionPlan.setInternalExecutionPlan(executionPlan); - - Map> actualInputStreams = new HashMap<>(); - Map> actualOutputStreams = new HashMap<>(); - List partitions = new ArrayList<>(); - - // Go through execution element - for (ExecutionElement executionElement : executionPlan.getExecutionElementList()) { - if (executionElement instanceof Query) { - // ------------- - // Explain Query - // ------------- - - // Input streams - InputStream inputStream = ((Query) executionElement).getInputStream(); - Selector selector = ((Query) executionElement).getSelector(); - - for (String streamId : inputStream.getUniqueStreamIds()) { - if (!actualInputStreams.containsKey(streamId)) { - org.wso2.siddhi.query.api.definition.StreamDefinition streamDefinition = executionPlan.getStreamDefinitionMap().get(streamId); - if (streamDefinition != null) { - actualInputStreams.put(streamId, SiddhiDefinitionAdapter.convertFromSiddiDefinition(streamDefinition).getColumns()); - } else { - actualInputStreams.put(streamId, null); - } - } - } - - // Window Spec and Partition - if (inputStream instanceof SingleInputStream) { - // Window Spec - List windows = new ArrayList<>(); - for (StreamHandler streamHandler : ((SingleInputStream) inputStream).getStreamHandlers()) { - if (streamHandler instanceof Window) { - windows.add((Window) streamHandler); - } - } - - // Group By Spec - List groupBy = selector.getGroupByList(); - - if (windows.size() > 0 || groupBy.size() >= 0) { - partitions.add(convertSingleStreamWindowAndGroupByToPartition(((SingleInputStream) inputStream).getStreamId(), windows, groupBy)); - } - } - // else if(inputStream instanceof JoinInputStream) { - // // TODO: Parse multiple stream join - // } else if(inputStream instanceof StateInputStream) { - // // TODO: Parse StateInputStream - // } - - // Output streams - OutputStream outputStream = ((Query) executionElement).getOutputStream(); - actualOutputStreams.put(outputStream.getId(), convertOutputStreamColumns(selector.getSelectionList())); - } else { - LOG.warn("Unhandled execution element: {}", executionElement.toString()); - } - } - // Set used input streams - policyExecutionPlan.setInputStreams(actualInputStreams); - - // Set Partitions - policyExecutionPlan.setStreamPartitions(partitions); - - // Validate outputStreams - policyExecutionPlan.setOutputStreams(actualOutputStreams); - } catch (Exception ex) { - LOG.error("Got error to parseExecutionPlan policy execution plan: \n{}", executionPlanQuery, ex); - throw ex; - } - return policyExecutionPlan; - } - - private static StreamPartition convertSingleStreamWindowAndGroupByToPartition(String streamId, List windows, List groupBy) { - StreamPartition partition = new StreamPartition(); - partition.setStreamId(streamId); - StreamSortSpec sortSpec = null; - - if (windows.size() > 0) { - for (Window window : windows) { - if (window.getFunction().equals(WINDOW_EXTERNAL_TIME)) { - sortSpec = new StreamSortSpec(); - sortSpec.setWindowPeriodMillis(((TimeConstant) window.getParameters()[1]).getValue().intValue()); - sortSpec.setWindowMargin(sortSpec.getWindowPeriodMillis() / 3); - } - } - } - partition.setSortSpec(sortSpec); - if (groupBy.size() > 0) { - partition.setColumns(groupBy.stream().map(Variable::getAttributeName).collect(Collectors.toList())); - partition.setType(StreamPartition.Type.GROUPBY); - } else { - partition.setType(StreamPartition.Type.SHUFFLE); - } - return partition; - } - - public static PolicyValidationResult validate(PolicyDefinition policy, IMetadataDao metadataDao) { - Map allDefinitions = new HashMap<>(); - for (StreamDefinition definition : metadataDao.listStreams()) { - allDefinitions.put(definition.getStreamId(), definition); - } - return validate(policy, allDefinitions); - } - - public static PolicyValidationResult validate(PolicyDefinition policy, Map allDefinitions) { - Map inputDefinitions = new HashMap<>(); - PolicyValidationResult policyValidationResult = new PolicyValidationResult(); - policyValidationResult.setPolicyDefinition(policy); - try { - if (policy.getInputStreams() != null) { - for (String streamId : policy.getInputStreams()) { - if (allDefinitions.containsKey(streamId)) { - inputDefinitions.put(streamId, allDefinitions.get(streamId)); - } else { - throw new StreamNotDefinedException(streamId); - } - } - } - - PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions); - // Validate output - if (policy.getOutputStreams() != null) { - for (String outputStream : policy.getOutputStreams()) { - if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) { - throw new StreamNotDefinedException("Output stream " + outputStream + " not defined"); - } - } - } - policyValidationResult.setPolicyExecutionPlan(policyExecutionPlan); - policyValidationResult.setSuccess(true); - policyValidationResult.setMessage("Validated successfully"); - } catch (Exception exception) { - LOG.error("Got error to validate policy definition: {}", policy, exception); - policyValidationResult.setSuccess(false); - policyValidationResult.setMessage(exception.getMessage()); - policyValidationResult.setStackTrace(exception); - } - - return policyValidationResult; - } - - private static List convertOutputStreamColumns(List outputAttributeList) { - return outputAttributeList.stream().map(outputAttribute -> { - StreamColumn streamColumn = new StreamColumn(); - streamColumn.setName(outputAttribute.getRename()); - streamColumn.setDescription(outputAttribute.getExpression().toString()); - return streamColumn; - }).collect(Collectors.toList()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java deleted file mode 100644 index 2522270..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyParseResult.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.service.metadata.resource; - -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.apache.commons.lang3.exception.ExceptionUtils; - -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -public class PolicyParseResult { - private boolean success; - private String message; - private String exception; - - private PolicyExecutionPlan policyExecutionPlan; - - public String getException() { - return exception; - } - - public void setException(String exception) { - this.exception = exception; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - public void setStackTrace(Throwable throwable) { - this.setException(ExceptionUtils.getStackTrace(throwable)); - } - - public boolean isSuccess() { - return success; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public PolicyExecutionPlan getPolicyExecutionPlan() { - return policyExecutionPlan; - } - - public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) { - this.policyExecutionPlan = policyExecutionPlan; - } -}