eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [1/2] incubator-eagle git commit: [EAGLE-647] Support Policy Execution Interpreter and Planner to compile siddhi query to distributed execution plan
Date Thu, 20 Oct 2016 08:08:49 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 8991b61e2 -> 64fce8f80


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
deleted file mode 100644
index b1912af..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/PolicyValidationResult.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-public class PolicyValidationResult {
-    private boolean success;
-    private String message;
-    private String exception;
-
-    private PolicyExecutionPlan policyExecutionPlan;
-    private PolicyDefinition policyDefinition;
-
-    public String getException() {
-        return exception;
-    }
-
-    public void setException(String exception) {
-        this.exception = exception;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-
-    public void setMessage(String message) {
-        this.message = message;
-    }
-
-    public void setStackTrace(Throwable throwable) {
-        this.setException(ExceptionUtils.getStackTrace(throwable));
-    }
-
-    public boolean isSuccess() {
-        return success;
-    }
-
-    public void setSuccess(boolean success) {
-        this.success = success;
-    }
-
-    public PolicyExecutionPlan getPolicyExecutionPlan() {
-        return policyExecutionPlan;
-    }
-
-    public void setPolicyExecutionPlan(PolicyExecutionPlan policyExecutionPlan) {
-        this.policyExecutionPlan = policyExecutionPlan;
-    }
-
-    public PolicyDefinition getPolicyDefinition() {
-        return policyDefinition;
-    }
-
-    public void setPolicyDefinition(PolicyDefinition policyDefinition) {
-        this.policyDefinition = policyDefinition;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/64fce8f8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
deleted file mode 100644
index 48ac9eb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/metadata/resource/PolicyInterpreterTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.metadata.resource;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-public class PolicyInterpreterTest {
-    @Test
-    public void parseFullPolicyQuery() throws Exception {
-        PolicyExecutionPlan executionPlan = PolicyInterpreter.parseExecutionPlan("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX#window.externalTime(timestamp,
2 min) "
-            + "select cmd, user, count() as total_count group by cmd,user insert into HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT");
-        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX", executionPlan.getInputStreams().keySet().toArray()[0]);
-        Assert.assertEquals("HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX_OUT", executionPlan.getOutputStreams().keySet().toArray()[0]);
-        Assert.assertEquals(1, executionPlan.getStreamPartitions().size());
-        Assert.assertNotNull(executionPlan.getStreamPartitions().get(0).getSortSpec());
-    }
-
-    @Test
-    public void testValidPolicyWithExternalTimeWindow() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1#window.externalTime(timestamp, 2 min) select
name, sum(value) as total group by name insert into OUTPUT_STREAM_1 ;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition,
new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-                put("INPUT_STREAM_3", createStreamDefinition("INPUT_STREAM_3"));
-                put("INPUT_STREAM_4", createStreamDefinition("INPUT_STREAM_4"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
-        Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
-    }
-
-    @Test
-    public void testValidPolicyWithTimeWindow() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1#window.time(2 min) select name, sum(value)
as total group by name insert into OUTPUT_STREAM_1 ;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition,
new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
-        Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
-    }
-
-    @Test
-    public void testValidPolicyWithTooManyInputStreams() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name
insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition,
new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
-    }
-
-    @Test
-    public void testValidPolicyWithTooFewOutputStreams() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Arrays.asList("INPUT_STREAM_1", "INPUT_STREAM_2"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue(
-            "from INPUT_STREAM_1[value > 90.0] select * group by name insert into OUTPUT_STREAM_1;"
-                + "from INPUT_STREAM_1[value < 90.0] select * group by name insert into
OUTPUT_STREAM_2;"
-        );
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition,
new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertTrue(validation.isSuccess());
-        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
-        Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
-    }
-
-    @Test
-    public void testInvalidPolicyForSyntaxError() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM (value > 90.0) select * group by name insert
into OUTPUT_STREAM;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition,
new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM", createStreamDefinition("INPUT_STREAM"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    @Test
-    public void testInvalidPolicyForNotDefinedInputStream() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name
insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition,
new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_2", createStreamDefinition("INPUT_STREAM_2"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    @Test
-    public void testInvalidPolicyForNotDefinedOutputStream() {
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("test_policy");
-        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
-        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_2"));
-
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        definition.setType("siddhi");
-        definition.setValue("from INPUT_STREAM_1[value > 90.0] select * group by name
insert into OUTPUT_STREAM_1;");
-        definition.setInputStreams(policyDefinition.getInputStreams());
-        definition.setOutputStreams(policyDefinition.getOutputStreams());
-        policyDefinition.setDefinition(definition);
-
-        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition,
new HashMap<String, StreamDefinition>() {
-            {
-                put("INPUT_STREAM_1", createStreamDefinition("INPUT_STREAM_1"));
-            }
-        });
-        Assert.assertFalse(validation.isSuccess());
-    }
-
-    // --------------
-    // Helper Methods
-    // --------------
-
-    private static StreamDefinition createStreamDefinition(String streamId) {
-        StreamDefinition streamDefinition = new StreamDefinition();
-        streamDefinition.setStreamId(streamId);
-        List<StreamColumn> columns = new ArrayList<>();
-        columns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
-        columns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-        columns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
-        streamDefinition.setColumns(columns);
-        return streamDefinition;
-    }
-}
\ No newline at end of file


Mime
View raw message