Return-Path: X-Original-To: apmail-eagle-commits-archive@minotaur.apache.org Delivered-To: apmail-eagle-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C6781185E3 for ; Thu, 19 Nov 2015 10:48:11 +0000 (UTC) Received: (qmail 37444 invoked by uid 500); 19 Nov 2015 10:48:11 -0000 Delivered-To: apmail-eagle-commits-archive@eagle.apache.org Received: (qmail 37420 invoked by uid 500); 19 Nov 2015 10:48:11 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 37293 invoked by uid 99); 19 Nov 2015 10:48:11 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Nov 2015 10:48:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 0860BC0EC7 for ; Thu, 19 Nov 2015 10:48:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ARdpYKpif1Rg for ; Thu, 19 Nov 2015 10:47:51 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 6092525DAC for ; Thu, 19 Nov 2015 10:47:15 +0000 (UTC) Received: (qmail 32014 invoked by uid 99); 19 Nov 2015 10:47:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Nov 2015 10:47:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7204CE17D1; Thu, 19 Nov 2015 10:47:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.incubator.apache.org Date: Thu, 19 Nov 2015 10:47:52 -0000 Message-Id: In-Reply-To: <52adcae0110e43338593830e27f8f2fb@git.apache.org> References: <52adcae0110e43338593830e27f8f2fb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [45/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java new file mode 100644 index 0000000..c245a24 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.siddhi; + +import org.apache.eagle.executor.AlertExecutor; +import org.apache.eagle.alert.policy.PolicyEvaluator; +import org.apache.eagle.datastream.Collector; + +public class EagleAlertContext { + + public AlertExecutor alertExecutor; + + public String policyId; + + public PolicyEvaluator evaluator; + + public Collector outputCollector; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java new file mode 100644 index 0000000..4f38f11 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.siddhi; + +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.eagle.alert.common.AlertConstants; +import org.apache.eagle.alert.entity.AlertAPIEntity; +import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; +import org.apache.eagle.alert.notification.UrlBuilder; +import org.apache.eagle.common.config.EagleConfigConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.common.metric.AlertContext; +import com.typesafe.config.Config; + +public class SiddhiAlertAPIEntityRendner { + + public static final Logger LOG = LoggerFactory.getLogger(SiddhiAlertAPIEntityRendner.class); + public static final String source = ManagementFactory.getRuntimeMXBean().getName(); + + public static AlertAPIEntity render(Config config, List rets, EagleAlertContext siddhiAlertContext, long timestamp) { + SiddhiPolicyEvaluator evaluator = (SiddhiPolicyEvaluator)siddhiAlertContext.evaluator; + String alertExecutorId = siddhiAlertContext.alertExecutor.getAlertExecutorId(); + AlertAPIEntity entity = new AlertAPIEntity(); + AlertContext context = new AlertContext(); + String sourceStreams = evaluator.getAdditionalContext().get(AlertConstants.SOURCE_STREAMS); + String[] sourceStreamsArr = sourceStreams.split(","); + List attrRenameList = evaluator.getOutputStreamAttrNameList(); + Map tags = new HashMap(); + for (String sourceStream : sourceStreamsArr) { + List list = StreamMetadataManager.getInstance().getMetadataEntitiesForStream(sourceStream.trim()); + for (AlertStreamSchemaEntity alertStream : list) { + if (alertStream.getUsedAsTag() != null && alertStream.getUsedAsTag() == true) { + String attrName = alertStream.getTags().get(AlertConstants.ATTR_NAME); + tags.put(attrName, rets.get(attrRenameList.indexOf(attrName))); + } + } + } + + for (int index = 0; index < rets.size(); index++) { + //attrRenameList.get(0) -> "eagleAlertContext". We need to skip "eagleAlertContext", index is from 1 for attRenameList. + context.addProperty(attrRenameList.get(index + 1), rets.get(index)); + } + + StringBuilder sb = new StringBuilder(); + for (Entry entry : context.getProperties().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + sb.append(key + "=\"" + value + "\" "); + } + context.addAll(evaluator.getAdditionalContext()); + String policyId = context.getProperty(AlertConstants.POLICY_ID); + String alertMessage = "The Policy \"" + policyId + "\" has been detected with the below information: " + sb.toString() ; + String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE); + String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE); + String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); + Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); + + context.addProperty(AlertConstants.ALERT_EVENT, sb.toString()); + context.addProperty(AlertConstants.ALERT_MESSAGE, alertMessage); + context.addProperty(AlertConstants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis())); + context.addProperty(EagleConfigConstants.DATA_SOURCE, dataSource); + context.addProperty(EagleConfigConstants.SITE, site); + entity.setTimestamp(timestamp); + /** If we need to add severity tag, we should add severity filed in AbstractpolicyDefinition, and pass it down **/ + tags.put(EagleConfigConstants.SITE, site); + tags.put(EagleConfigConstants.DATA_SOURCE, dataSource); + tags.put(AlertConstants.SOURCE_STREAMS, context.getProperty(AlertConstants.SOURCE_STREAMS)); + tags.put(AlertConstants.POLICY_ID, context.getProperty(AlertConstants.POLICY_ID)); + tags.put(AlertConstants.ALERT_SOURCE, source); + tags.put(AlertConstants.ALERT_EXECUTOR_ID, alertExecutorId); + entity.setTags(tags); + + context.addProperty(AlertConstants.POLICY_DETAIL_URL, UrlBuilder.buiildPolicyDetailUrl(host, port, tags)); + context.addProperty(AlertConstants.ALERT_DETAIL_URL, UrlBuilder.buildAlertDetailUrl(host, port, entity)); + entity.setAlertContext(context); + return entity; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java new file mode 100644 index 0000000..f89e506 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.siddhi; + +import org.apache.eagle.alert.entity.AlertAPIEntity; + +import java.util.List; + +public interface SiddhiAlertHandler { + + void onAlerts(EagleAlertContext context, List alerts); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java new file mode 100644 index 0000000..39eb0c9 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.siddhi; + +import org.apache.eagle.alert.config.AbstractPolicyDefinition; + +/** + * siddhi policy definition has the following format + * { + "type":"SiddhiCEPEngine", + "expression" : "from every b1=HeapUsage[metric == 'eagle.metric.gc'] -> a1=FullGCEvent[eventName == 'full gc'] -> b2=HeapUsage[metric == b1.metric and host == b1.host and value >= b1.value * 0.8] within 100 sec select a1.eventName, b1.metric, b2.timestamp, 60 as timerange insert into GCMonitor; " + } + */ +public class SiddhiPolicyDefinition extends AbstractPolicyDefinition { + private String expression; + + public String getExpression() { + return expression; + } + public void setExpression(String expression) { + this.expression = expression; + } + + @Override + public String toString(){ + return expression; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java new file mode 100644 index 0000000..6ba2826 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.siddhi; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.apache.eagle.alert.config.AbstractPolicyDefinition; +import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; +import org.apache.eagle.alert.common.AlertConstants; +import org.apache.eagle.alert.policy.PolicyManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.query.api.execution.query.Query; +import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute; + +import org.apache.eagle.alert.entity.AlertAPIEntity; +import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; +import org.apache.eagle.alert.policy.PolicyEvaluator; +import org.apache.eagle.dataproc.core.JsonSerDeserUtils; +import org.apache.eagle.dataproc.core.ValuesArray; +import com.typesafe.config.Config; + +/** + * when policy is updated or deleted, SiddhiManager.shutdown should be invoked to release resources. + * during this time, synchronization is important + */ +public class SiddhiPolicyEvaluator implements PolicyEvaluator{ + private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class); + public static final int DEFAULT_QUEUE_SIZE = 1000; + private final BlockingQueue queue = new ArrayBlockingQueue(DEFAULT_QUEUE_SIZE); + private volatile SiddhiRuntime siddhiRuntime; + private String[] sourceStreams; + private boolean needValidation; + private String policyId; + private Config config; + private final static String EXECUTION_PLAN_NAME = "query"; + + /** + * everything dependent on policyDef should be together and switched in runtime + */ + public static class SiddhiRuntime{ + QueryCallback callback; + Map siddhiInputHandlers; + SiddhiManager siddhiManager; + SiddhiPolicyDefinition policyDef; + List outputFields; + String executionPlanName; + } + + public SiddhiPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams){ + this(config, policyName, policyDef, sourceStreams, false); + } + + public SiddhiPolicyEvaluator(Config config, String policyId, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){ + this.config = config; + this.policyId = policyId; + this.needValidation = needValidation; + this.sourceStreams = sourceStreams; + init(policyDef); + } + + public void init(AbstractPolicyDefinition policyDef){ + siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef); + } + + public static String addContextFieldIfNotExist(String expression) { + // select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB + int pos = expression.indexOf("select ") + 7; + int index = pos; + boolean isSelectStarPattern = true; + while(index < expression.length()) { + if (expression.charAt(index) == ' ') index++; + else if (expression.charAt(index) == '*') break; + else { + isSelectStarPattern = false; + break; + } + } + if (isSelectStarPattern) return expression; + StringBuilder sb = new StringBuilder(); + sb.append(expression.substring(0, pos)); + sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + ","); + sb.append(expression.substring(pos, expression.length())); + return sb.toString(); + } + + private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition policyDef){ + SiddhiManager siddhiManager = new SiddhiManager(); + Map siddhiInputHandlers = new HashMap(); + + StringBuilder sb = new StringBuilder(); + for(String sourceStream : sourceStreams){ + String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream); + LOG.info("Siddhi stream definition : " + streamDef); + sb.append(streamDef); + } + + String expression = addContextFieldIfNotExist(policyDef.getExpression()); + String executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + expression; + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + + for(String sourceStream : sourceStreams){ + siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream)); + } + executionPlanRuntime.start(); + + QueryCallback callback = new SiddhiQueryCallbackImpl(config, this); + + LOG.info("Siddhi query: " + expression); + executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback); + + List outputFields = new ArrayList(); + try { + Field field = QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME); + field.setAccessible(true); + Query query = (Query)field.get(callback); + List list = query.getSelector().getSelectionList(); + for (OutputAttribute output : list) { + outputFields.add(output.getRename()); + } + } + catch (Exception ex) { + LOG.error("Got an Exception when initial outputFields ", ex); + } + SiddhiRuntime runtime = new SiddhiRuntime(); + runtime.siddhiInputHandlers = siddhiInputHandlers; + runtime.siddhiManager = siddhiManager; + runtime.callback = callback; + runtime.policyDef = policyDef; + runtime.outputFields = outputFields; + runtime.executionPlanName = executionPlanRuntime.getName(); + return runtime; + } + + /** + * 1. input has 3 fields, first is siddhi context, second is streamName, the last one is map of attribute name/value + * 2. runtime check for input data (This is very expensive, so we ignore for now) + * the size of input map should be equal to size of attributes which stream metadata defines + * the attribute names should be equal to attribute names which stream metadata defines + * the input field cannot be null + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void evaluate(ValuesArray data) throws Exception { + if(LOG.isDebugEnabled()) LOG.debug("Siddhi policy evaluator consumers data :" + data); + Object siddhiAlertContext = data.get(0); + String streamName = (String)data.get(1); + SortedMap map = (SortedMap)data.get(2); + validateEventInRuntime(streamName, map); + synchronized(siddhiRuntime){ + //insert siddhiAlertContext into the first field + List input = new ArrayList<>(); + input.add(siddhiAlertContext); + putAttrsIntoInputStream(input, streamName, map); + siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0])); + } + } + + /** + * this is a heavy operation, we should avoid to use + * @param sourceStream + * @param data + */ + private void validateEventInRuntime(String sourceStream, SortedMap data){ + if(!needValidation) + return; + SortedMap map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(sourceStream); + if(!map.keySet().equals(data.keySet())) + throw new IllegalStateException("incoming data schema is different from supported data schema, incoming data: " + data.keySet() + ", schema: " + map.keySet()); + } + + private void putAttrsIntoInputStream(List input, String streamName, SortedMap map) { + if(!needValidation) { + input.addAll(map.values()); + return; + } + for (Object key : map.keySet()) { + Object value = map.get(key); + if (value == null) { + input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, (String)key)); + } + else input.add(value); + } + } + + @Override + public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef) { + AbstractPolicyDefinition policyDef = null; + try { + policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(), + AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(AlertConstants.POLICY_TYPE))); + } + catch (Exception ex) { + LOG.error("Initial policy def error, ", ex); + } + SiddhiRuntime previous = siddhiRuntime; + siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef); + synchronized(previous){ + previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown(); + } + } + + @Override + public void onPolicyDelete(){ + synchronized(siddhiRuntime){ + LOG.info("Going to shutdown siddhi execution plan, planName: " + siddhiRuntime.executionPlanName); + siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown(); + LOG.info("Siddhi execution plan " + siddhiRuntime.executionPlanName + " is successfully shutdown "); + } + } + + @Override + public String toString(){ + // show the policyDef + return siddhiRuntime.policyDef.toString(); + } + + public String[] getStreamNames() { + return sourceStreams; + } + + public Map getAdditionalContext() { + Map context = new HashMap(); + StringBuilder sourceStreams = new StringBuilder(); + for (String streamName : getStreamNames()) { + sourceStreams.append(streamName + ","); + } + if (sourceStreams.length() > 0) { + sourceStreams.deleteCharAt(sourceStreams.length() - 1); + } + context.put(AlertConstants.SOURCE_STREAMS, sourceStreams.toString()); + context.put(AlertConstants.POLICY_ID, policyId); + return context; + } + + public List getOutputStreamAttrNameList() { + return siddhiRuntime.outputFields; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java new file mode 100644 index 0000000..168b04f --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.siddhi; + +import java.util.Arrays; +import java.util.List; + +import org.apache.eagle.alert.common.AlertConstants; +import org.apache.eagle.alert.policy.PolicyEvaluator; +import org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; + +public class SiddhiPolicyEvaluatorServiceProviderImpl implements PolicyEvaluatorServiceProvider { + @Override + public String getPolicyType() { + return AlertConstants.policyType.siddhiCEPEngine.name(); + } + + @Override + public Class getPolicyEvaluator() { + return SiddhiPolicyEvaluator.class; + } + + @Override + public List getBindingModules() { + Module module1 = new SimpleModule(AlertConstants.POLICY_DEFINITION).registerSubtypes(new NamedType(SiddhiPolicyDefinition.class, getPolicyType())); + return Arrays.asList(module1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java new file mode 100644 index 0000000..44b9c77 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.siddhi; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.eagle.alert.entity.AlertAPIEntity; +import org.apache.eagle.executor.AlertExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; + +public class SiddhiQueryCallbackImpl extends QueryCallback{ + + private SiddhiPolicyEvaluator evaluator; + public static final Logger LOG = LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class); + public static final ObjectMapper mapper = new ObjectMapper(); + public Config config; + + public SiddhiQueryCallbackImpl(Config config, SiddhiPolicyEvaluator evaluator) { + this.config = config; + this.evaluator = evaluator; + } + + public List getOutputMessage(Event event) { + Object[] data = event.getData(); + List rets = new ArrayList(); + boolean isFirst = true; + for (Object object : data) { + // The first field is siddhiAlertContext, skip it + if (isFirst) { + isFirst = false; + continue; + } + String value = null; + if (object instanceof Double) { + value = String.valueOf((Double)object); + } + else if (object instanceof Integer) { + value = String.valueOf((Integer)object); + } + else if (object instanceof Long) { + value = String.valueOf((Long)object); + } + else if (object instanceof String) { + value = (String)object; + } + else if (object instanceof Boolean) { + value = String.valueOf((Boolean)object); + } + rets.add(value); + } + return rets; + } + + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + Object[] data = inEvents[0].getData(); + EagleAlertContext siddhiAlertContext = (EagleAlertContext)data[0]; + List rets = getOutputMessage(inEvents[0]); + AlertAPIEntity alert = SiddhiAlertAPIEntityRendner.render(config, rets, siddhiAlertContext, timeStamp); + AlertExecutor alertExecutor = siddhiAlertContext.alertExecutor; + alertExecutor.onAlerts(siddhiAlertContext, Arrays.asList(alert)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java new file mode 100644 index 0000000..ba495dd --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.siddhi; + +import java.util.Map; +import java.util.SortedMap; + +import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * convert metadata entities for a stream to stream definition for siddhi cep engine + * define stream HeapUsage (metric string, host string, value double, timestamp long) + */ +public class SiddhiStreamMetadataUtils { + private final static Logger LOG = LoggerFactory.getLogger(SiddhiStreamMetadataUtils.class); + + public final static String EAGLE_ALERT_CONTEXT_FIELD = "eagleAlertContext"; + + public static SortedMap getAttrMap(String streamName) { + SortedMap map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName); + if(map == null || map.size() == 0){ + throw new IllegalStateException("alert stream schema should never be empty"); + } + return map; + } + + /** + * @see org.wso2.siddhi.query.api.definition.Attribute.Type + * make sure StreamMetadataManager.init is invoked before this method + * @param streamName + * @return + */ + public static String convertToStreamDef(String streamName){ + SortedMap map = getAttrMap(streamName); + StringBuilder sb = new StringBuilder(); + sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object,"); + for(Map.Entry entry : map.entrySet()){ + String attrName = entry.getKey(); + sb.append(attrName); + sb.append(" "); + String attrType = entry.getValue().getAttrType(); + if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){ + sb.append("string"); + }else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){ + sb.append("int"); + }else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){ + sb.append("long"); + }else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){ + sb.append("bool"); + }else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){ + sb.append("float"); + }else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){ + sb.append("double"); + }else{ + LOG.warn("AttrType is not recognized, ignore : " + attrType); + } + sb.append(","); + } + if(sb.length() > 0){ + sb.deleteCharAt(sb.length()-1); + } + + String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");"; + return String.format(siddhiStreamDefFormat, sb.toString()); + } + + public static Object getAttrDefaultValue(String streamName, String attrName){ + SortedMap map = getAttrMap(streamName); + AlertStreamSchemaEntity entity = map.get(attrName); + if (entity.getDefaultValue() != null) { + return entity.getDefaultValue(); + } + else { + String attrType = entity.getAttrType(); + if (attrType.equalsIgnoreCase(AttributeType.STRING.name())) { + return "NA"; + } else if (attrType.equalsIgnoreCase(AttributeType.INTEGER.name()) || attrType.equalsIgnoreCase(AttributeType.LONG.name())) { + return -1; + } else if (attrType.equalsIgnoreCase(AttributeType.BOOL.name())) { + return true; + } else { + LOG.warn("AttrType is not recognized: " + attrType + ", treat it as string"); + return "N/A"; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java new file mode 100644 index 0000000..618d245 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.siddhi; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.eagle.alert.common.AlertConstants; +import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; +import com.typesafe.config.Config; +import org.apache.eagle.alert.dao.AlertStreamSchemaDAO; +import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.commons.collections.map.UnmodifiableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * centralized memory where all stream metadata sit on, it is not mutable data + */ +public class StreamMetadataManager { + private static final Logger LOG = LoggerFactory.getLogger(StreamMetadataManager.class); + + private static StreamMetadataManager instance = new StreamMetadataManager(); + private Map> map = new HashMap>(); + private Map> map2 = new HashMap>(); + private volatile boolean initialized = false; + + private StreamMetadataManager(){ + } + + public static StreamMetadataManager getInstance(){ + return instance; + } + + private void internalInit(Config config, AlertStreamSchemaDAO dao){ + try{ + String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE); + List list = dao.findAlertStreamSchemaByDataSource(dataSource); + if(list == null) + return; + for (AlertStreamSchemaEntity entity : list) { + String streamName = entity.getTags().get(AlertConstants.STREAM_NAME); + if (map.get(streamName) == null) { + map.put(streamName, new ArrayList()); + map2.put(streamName, new TreeMap()); + } + map.get(streamName).add(entity); + map2.get(streamName).put(entity.getTags().get(AlertConstants.ATTR_NAME), entity); + } + }catch(Exception ex){ + LOG.error("Fail building metadata manger", ex); + throw new IllegalStateException(ex); + } + } + + /** + * singleton with init would be good for unit test as well, and it ensures that + * initialization happens only once before you use it. + * @param config + * @param dao + */ + public void init(Config config, AlertStreamSchemaDAO dao){ + if(!initialized){ + synchronized(this){ + if(!initialized){ + if(LOG.isDebugEnabled()) LOG.debug("Initializing ..."); + internalInit(config, dao); + initialized = true; + LOG.info("Successfully initialized"); + } + } + }else{ + LOG.info("Already initialized, skip"); + } + } + + // Only for unit test purpose + public void reset() { + synchronized (this) { + initialized = false; + map.clear(); + map2.clear(); + } + } + + private void ensureInitialized(){ + if(!initialized) + throw new IllegalStateException("StreamMetadataManager should be initialized before using it"); + } + + public List getMetadataEntitiesForStream(String streamName){ + ensureInitialized(); + return getMetadataEntitiesForAllStreams().get(streamName); + } + + public Map> getMetadataEntitiesForAllStreams(){ + ensureInitialized(); + return UnmodifiableMap.decorate(map); + } + + public SortedMap getMetadataEntityMapForStream(String streamName){ + ensureInitialized(); + return getMetadataEntityMapForAllStreams().get(streamName); + } + + public Map> getMetadataEntityMapForAllStreams(){ + ensureInitialized(); + return UnmodifiableMap.decorate(map2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java new file mode 100644 index 0000000..cf76134 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.siddhi.extension; + +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; + +public class ContainsIgnoreCaseExtension extends FunctionExecutor { + + Attribute.Type returnType = Attribute.Type.BOOL; + + @Override + protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + if (attributeExpressionExecutors.length != 2) { + throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, " + + "but found " + attributeExpressionExecutors.length); + } + if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) { + throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, " + + "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString()); + } + if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) { + throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, " + + "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString()); + } + } + + @Override + protected Object execute(Object[] data) { + if (data[0] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. First argument cannot be null"); + } + if (data[1] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null"); + } + String str1 = (String)data[0]; + String str2 = (String)data[1]; + return str1.toUpperCase().contains(str2.toUpperCase()); + } + + @Override + protected Object execute(Object data) { + return null; //Since the containsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented. + } + + @Override + public void start() { + //Nothing to start + } + + @Override + public void stop() { + //Nothing to stop + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } + + @Override + public Object[] currentState() { + return new Object[]{}; + } + + @Override + public void restoreState(Object[] state) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java new file mode 100644 index 0000000..0b6e7ec --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.siddhi.extension; + +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; +import org.wso2.siddhi.core.executor.ConstantExpressionExecutor; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class EqualsIgnoreCaseExtension extends FunctionExecutor { + + Attribute.Type returnType = Attribute.Type.BOOL; + + @Override + protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + if (attributeExpressionExecutors.length != 2) { + throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, " + + "but found " + attributeExpressionExecutors.length); + } + if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) { + throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, " + + "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString()); + } + if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) { + throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, " + + "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString()); + } + } + + @Override + protected Object execute(Object[] data) { + if (data[0] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. First argument cannot be null"); + } + if (data[1] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null"); + } + String str1 = (String)data[0]; + String str2 = (String)data[1]; + return str1.equalsIgnoreCase(str2); + } + + @Override + protected Object execute(Object data) { + return null; //Since the equalsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented. + } + + @Override + public void start() { + //Nothing to start + } + + @Override + public void stop() { + //Nothing to stop + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } + + @Override + public Object[] currentState() { + return new Object[]{}; + } + + @Override + public void restoreState(Object[] state) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java new file mode 100644 index 0000000..0bf80de --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.siddhi.extension; + +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; +import org.wso2.siddhi.core.executor.ConstantExpressionExecutor; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.extension.string.RegexpFunctionExtension; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * regexpIgnoreCase(string, regex) + * Tells whether or not this 'string' matches the given regular expression 'regex'. + * Accept Type(s): (STRING,STRING) + * Return Type(s): BOOLEAN + */ +public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension { + + //state-variables + boolean isRegexConstant = false; + String regexConstant; + Pattern patternConstant; + + @Override + protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + if (attributeExpressionExecutors.length != 2) { + throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, " + + "but found " + attributeExpressionExecutors.length); + } + if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) { + throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, " + + "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString()); + } + if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) { + throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, " + + "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString()); + } + if(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor){ + isRegexConstant = true; + regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue(); + patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE); + } + } + + @Override + protected Object execute(Object[] data) { + String regex; + Pattern pattern; + Matcher matcher; + + if (data[0] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. First argument cannot be null"); + } + if (data[1] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. Second argument cannot be null"); + } + String source = (String) data[0]; + + if(!isRegexConstant){ + regex = (String) data[1]; + pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE); + matcher = pattern.matcher(source); + return matcher.matches(); + + } else { + matcher = patternConstant.matcher(source); + return matcher.matches(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java new file mode 100644 index 0000000..4f0f4b3 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.executor; + +import org.apache.eagle.alert.common.AlertConstants; +import org.apache.eagle.alert.config.AbstractPolicyDefinition; +import org.apache.eagle.alert.dao.AlertDefinitionDAO; +import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl; +import org.apache.eagle.alert.entity.AlertAPIEntity; +import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; +import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.datastream.Collector; +import org.apache.eagle.datastream.JavaStormStreamExecutor2; +import org.apache.eagle.datastream.Tuple2; +import org.apache.eagle.metric.CountingMetric; +import org.apache.eagle.metric.Metric; +import org.apache.eagle.metric.report.EagleSerivceMetricReport; +import com.sun.jersey.client.impl.CopyOnWriteHashMap; +import com.typesafe.config.Config; +import org.apache.eagle.alert.policy.*; +import org.apache.eagle.alert.siddhi.EagleAlertContext; +import org.apache.eagle.alert.siddhi.SiddhiAlertHandler; +import org.apache.eagle.alert.siddhi.StreamMetadataManager; +import org.apache.eagle.dataproc.core.JsonSerDeserUtils; +import org.apache.eagle.dataproc.core.ValuesArray; +import org.apache.commons.lang3.time.DateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +public class AlertExecutor extends JavaStormStreamExecutor2 implements PolicyLifecycleMethods, SiddhiAlertHandler { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AlertExecutor.class); + + private String alertExecutorId; + private volatile CopyOnWriteHashMap policyEvaluators; + private PolicyPartitioner partitioner; + private int numPartitions; + private int partitionSeq; + private Config config; + private Map> initialAlertDefs; + private AlertDefinitionDAO alertDefinitionDao; + private String[] sourceStreams; + private static String EAGLE_EVENT_COUNT = "eagle.event.count"; + private static String EAGLE_POLICY_EVAL_COUNT = "eagle.policy.eval.count"; + private static String EAGLE_POLICY_EVAL_FAIL_COUNT = "eagle.policy.eval.fail.count"; + private static String EAGLE_ALERT_COUNT = "eagle.alert.count"; + private static String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count"; + private static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE; + private Map metricMap; // metricMap's key = metricName[#policyId] + private Map> dimensionsMap; // cache it for performance + private Map baseDimensions; + private Thread metricReportThread; + private EagleSerivceMetricReport metricReport; + + public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq, + AlertDefinitionDAO alertDefinitionDao, String[] sourceStreams){ + this.alertExecutorId = alertExecutorId; + this.partitioner = partitioner; + this.numPartitions = numPartitions; + this.partitionSeq = partitionSeq; + this.alertDefinitionDao = alertDefinitionDao; + this.sourceStreams = sourceStreams; + } + + public String getAlertExecutorId(){ + return this.alertExecutorId; + } + + public int getNumPartitions() { + return this.numPartitions; + } + + public int getPartitionSeq(){ + return this.partitionSeq; + } + + public PolicyPartitioner getPolicyPartitioner() { + return this.partitioner; + } + + public Map> getInitialAlertDefs() { + return this.initialAlertDefs; + } + + public AlertDefinitionDAO getAlertDefinitionDao() { + return alertDefinitionDao; + } + + @Override + public void prepareConfig(Config config) { + this.config = config; + } + + public void initMetricReportor() { + String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); + int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); + + String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ? + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null; + String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ? + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null; + this.metricReport = new EagleSerivceMetricReport(eagleServiceHost, eagleServicePort, username, password); + + metricMap = new ConcurrentHashMap(); + baseDimensions = new HashMap(); + baseDimensions.put(AlertConstants.ALERT_EXECUTOR_ID, alertExecutorId); + baseDimensions.put(AlertConstants.PARTITIONSEQ, String.valueOf(partitionSeq)); + baseDimensions.put(AlertConstants.SOURCE, ManagementFactory.getRuntimeMXBean().getName()); + baseDimensions.put(EagleConfigConstants.DATA_SOURCE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE)); + baseDimensions.put(EagleConfigConstants.SITE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE)); + + dimensionsMap = new HashMap>(); + this.metricReportThread = new Thread() { + @Override + public void run() { + runMetricReporter(); + } + }; + this.metricReportThread.setDaemon(true); + metricReportThread.start(); + } + + @Override + public void init() { + // initialize StreamMetadataManager before it is used + StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAOImpl(config)); + // for each AlertDefinition, to create a PolicyEvaluator + Map tmpPolicyEvaluators = new HashMap(); + + String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE); + String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE); + try { + initialAlertDefs = alertDefinitionDao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource); + } + catch (Exception ex) { + LOG.error("fail to initialize initialAlertDefs: ", ex); + throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex); + } + if(initialAlertDefs == null || initialAlertDefs.isEmpty()){ + LOG.warn("No alert definitions was found for site: " + site + ", dataSource: " + dataSource); + } + else if (initialAlertDefs.get(alertExecutorId) != null) { + for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){ + int part = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID)); + if (part == partitionSeq) { + tmpPolicyEvaluators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), createPolicyEvaluator(alertDef)); + } + } + } + + policyEvaluators = new CopyOnWriteHashMap<>(); + // for efficency, we don't put single policy evaluator + policyEvaluators.putAll(tmpPolicyEvaluators); + DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance(); + + policyLoader.init(initialAlertDefs, alertDefinitionDao, config); + policyLoader.addPolicyChangeListener(alertExecutorId + "_" + partitionSeq, this); + LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions); + LOG.info("All policy evaluators: " + policyEvaluators); + + initMetricReportor(); + } + + /** + * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class + * + * @param alertDef alert definition + * @return PolicyEvaluator instance + */ + private PolicyEvaluator createPolicyEvaluator(AlertDefinitionAPIEntity alertDef){ + String policyType = alertDef.getTags().get(AlertConstants.POLICY_TYPE); + Class evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType); + if(evalCls == null){ + String msg = "No policy evaluator defined for policy type : " + policyType; + LOG.error(msg); + throw new IllegalStateException(msg); + } + + // check out whether strong incoming data validation is necessary + String needValidationConfigKey= AlertConstants.ALERT_EXECUTOR_CONFIGS + "." + alertExecutorId + ".needValidation"; + + // Default: true + boolean needValidation = !config.hasPath(needValidationConfigKey) || config.getBoolean(needValidationConfigKey); + + AbstractPolicyDefinition policyDef = null; + try { + policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(policyType)); + } catch (Exception ex) { + LOG.error("Fail initial alert policy def: "+alertDef.getPolicyDef(), ex); + } + PolicyEvaluator pe; + try{ + // Create evaluator instances + pe = evalCls.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, String[].class, boolean.class).newInstance(config, alertDef.getTags().get("policyId"), policyDef, sourceStreams, needValidation); + }catch(Exception ex){ + LOG.error("Fail creating new policyEvaluator", ex); + LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef()); + throw new IllegalStateException(ex); + } + return pe; + } + + /** + * verify both alertExecutor logic name and partition id + * @param alertDef alert definition + * + * @return whether accept the alert definition + */ + private boolean accept(AlertDefinitionAPIEntity alertDef){ + if(!alertDef.getTags().get("alertExecutorId").equals(alertExecutorId)) { + if(LOG.isDebugEnabled()){ + LOG.debug("alertDef does not belong to this alertExecutorId : " + alertExecutorId + ", alertDef : " + alertDef); + } + return false; + } + int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID)); + if(targetPartitionSeq == partitionSeq) + return true; + return false; + } + + public long trim(long value, long granularity) { + return value / granularity * granularity; + } + + public void runMetricReporter() { + while(true) { + try { + long current = System.currentTimeMillis(); + List metricList = new ArrayList(); + synchronized (this.metricMap) { + for (Entry entry : metricMap.entrySet()) { + String name = entry.getKey(); + Metric metric = entry.getValue(); + long previous = metric.getTimestamp(); + if (current > previous + MERITE_GRANULARITY) { + metricList.add(metric); + metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDemensions(), metric.getMetricName())); + } + } + } + if (metricList.size() > 0) { + LOG.info("Going to persist alert metrics, size: " + metricList.size()); + metricReport.emit(metricList); + } + try { + Thread.sleep(MERITE_GRANULARITY / 2); + } catch (InterruptedException ex) { /* Do nothing */ } + } + catch (Throwable t) { + LOG.error("Got a throwable in metricReporter " , t); + } + } + } + + public void updateCounter(String name, Map dimensions, double value) { + long current = System.currentTimeMillis(); + synchronized (metricMap) { + if (metricMap.get(name) == null) { + String metricName = name.split("#")[0]; + metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), dimensions, metricName)); + } + metricMap.get(name).update(value); + } + } + + public void updateCounter(String name, Map dimensions) { + updateCounter(name, dimensions, 1.0); + } + + public Map getDimensions(String policyId) { + if (dimensionsMap.get(policyId) == null) { + Map newDimensions = new HashMap(baseDimensions); + newDimensions.put(AlertConstants.POLICY_ID, policyId); + dimensionsMap.put(policyId, newDimensions); + } + return dimensionsMap.get(policyId); + } + + public String getMetricKey(String metricName, String policy) { + return metricName + "#" + policy; + } + + /** + * within this single executor, execute all PolicyEvaluator sequentially + * the contract for input: + * 1. total # of fields for input is 3, which is fixed + * 2. the first field is key + * 3. the second field is stream name + * 4. the third field is value which is java SortedMap + */ + @Override + public void flatMap(java.util.List input, Collector> outputCollector){ + if(input.size() != 3) + throw new IllegalStateException("AlertExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)"); + if(LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2)); + if(LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + policyEvaluators.keySet().toString()); + + updateCounter(EAGLE_EVENT_COUNT, baseDimensions); + try{ + synchronized(this.policyEvaluators) { + for(Entry entry : policyEvaluators.entrySet()){ + String policyId = entry.getKey(); + PolicyEvaluator evaluator = entry.getValue(); + updateCounter(getMetricKey(EAGLE_POLICY_EVAL_COUNT, policyId), getDimensions(policyId)); + try { + EagleAlertContext siddhiAlertContext = new EagleAlertContext(); + siddhiAlertContext.alertExecutor = this; + siddhiAlertContext.policyId = policyId; + siddhiAlertContext.evaluator = evaluator; + siddhiAlertContext.outputCollector = outputCollector; + evaluator.evaluate(new ValuesArray(siddhiAlertContext, input.get(1), input.get(2))); + } + catch (Exception ex) { + LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex); + updateCounter(getMetricKey(EAGLE_POLICY_EVAL_FAIL_COUNT, policyId), getDimensions(policyId)); + } + } + } + } catch(Exception ex){ + LOG.error(alertExecutorId + ", partition " + partitionSeq + ", error fetching alerts, but continue to run", ex); + updateCounter(EAGLE_ALERT_FAIL_COUNT, baseDimensions); + } + } + + @Override + public void onPolicyCreated(Map added) { + if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy added : " + added + " policyEvaluators " + policyEvaluators); + for(AlertDefinitionAPIEntity alertDef : added.values()){ + if(!accept(alertDef)) + continue; + LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really added " + alertDef); + PolicyEvaluator newEvaluator = createPolicyEvaluator(alertDef); + if(newEvaluator != null){ + synchronized(this.policyEvaluators) { + policyEvaluators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), newEvaluator); + } + } + } + } + + @Override + public void onPolicyChanged(Map changed) { + if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy changed : " + changed); + for(AlertDefinitionAPIEntity alertDef : changed.values()){ + if(!accept(alertDef)) + continue; + LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really changed " + alertDef); + synchronized(this.policyEvaluators) { + PolicyEvaluator pe = policyEvaluators.get(alertDef.getTags().get(AlertConstants.POLICY_ID)); + pe.onPolicyUpdate(alertDef); + } + } + } + + @Override + public void onPolicyDeleted(Map deleted) { + if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy deleted : " + deleted); + for(AlertDefinitionAPIEntity alertDef : deleted.values()){ + if(!accept(alertDef)) + continue; + LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really deleted " + alertDef); + String policyId = alertDef.getTags().get(AlertConstants.POLICY_ID); + synchronized(this.policyEvaluators) { + if (policyEvaluators.containsKey(policyId)) { + PolicyEvaluator pe = policyEvaluators.remove(alertDef.getTags().get(AlertConstants.POLICY_ID)); + pe.onPolicyDelete(); + } + } + } + } + + @Override + public void onAlerts(EagleAlertContext context, List alerts) { + if(alerts != null && !alerts.isEmpty()){ + String policyId = context.policyId; + LOG.info(String.format("Detected %s alerts for policy %s",alerts.size(),policyId)); + Collector outputCollector = context.outputCollector; + PolicyEvaluator evaluator = context.evaluator; + updateCounter(getMetricKey(EAGLE_ALERT_COUNT, policyId), getDimensions(policyId), alerts.size()); + for (AlertAPIEntity entity : alerts) { + synchronized(this) { + outputCollector.collect(new Tuple2(policyId, entity)); + } + if(LOG.isDebugEnabled()) LOG.debug("A new alert is triggered: "+alertExecutorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity.getAlertContext() + ", for policy " + evaluator); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java new file mode 100644 index 0000000..c073939 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.executor; + +import org.apache.eagle.alert.common.AlertConstants; +import org.apache.eagle.alert.dao.AlertDefinitionDAO; +import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl; +import org.apache.eagle.alert.dao.AlertExecutorDAOImpl; +import org.apache.eagle.alert.entity.AlertExecutorEntity; +import org.apache.eagle.alert.policy.DefaultPolicyPartitioner; +import org.apache.eagle.alert.policy.PolicyPartitioner; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValue; +import org.apache.eagle.common.config.EagleConfigConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors + * + *

+ * Explanations for programId, alertExecutorId and policy

+ * - programId - distributed or single-process program for example one storm topology
+ * - alertExecutorId - one process/thread which executes multiple policies
+ * - policy - some rules to be evaluated
+ * + *
+ * + * Normally the mapping is like following: + *
+ * programId (1:N) alertExecutorId
+ * alertExecutorId (1:N) policy
+ * 
+ */ +public class AlertExecutorCreationUtils { + private final static Logger LOG = LoggerFactory.getLogger(AlertExecutorCreationUtils.class); + + public static AlertExecutor[] createAlertExecutors(Config config, String alertExecutorId) throws Exception{ + // Read site and dataSource from configuration. + String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE); + LOG.info("Loading alerting definitions for dataSource: " + dataSource); + + // Get map from alertExecutorId to alert stream + // (dataSource) => Map[alertExecutorId:String,streamName:List[String]] + List streamNames = new ArrayList(); + AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(config); + List alertExecutorEntities = alertExecutorDAO.findAlertExecutor(dataSource, alertExecutorId); + for(AlertExecutorEntity entity : alertExecutorEntities){ + streamNames.add(entity.getTags().get(AlertConstants.STREAM_NAME)); + } + + if(streamNames.isEmpty()){ + throw new IllegalStateException("upstream names should not be empty for alert " + alertExecutorId); + } + return createAlertExecutors(config, new AlertDefinitionDAOImpl(config), + streamNames, alertExecutorId); + } + + /** + * Build DAG Tasks based on persisted alert definition and schemas from eagle store. + * + *

Require configuration:

+ * + *
    + *
  • eagleProps.site: program site id.
  • + *
  • eagleProps.dataSource: program data source.
  • + *
  • alertExecutorConfigs: only configured executor will be built into execution tasks.
  • + *
+ * + *

Steps:

+ * + *
    + *
  1. (upstreamTasks) => Map[streamName:String,upstreamTask:Task]
  2. + *
  3. (dataSource) => Map[alertExecutorId:String,streamName:List[String]]
  4. + *
  5. (site,dataSource) => Map[alertExecutorId,Map[policyId,alertDefinition]]
  6. + *
  7. (config["alertExecutorConfigs"]) => AlertExecutor(alertExecutorID, partitioner, numPartitions, partitionSeq, alertDefs, alertDefDAO, sourceStreams)[]
  8. + *
+ */ + public static AlertExecutor[] createAlertExecutors(Config config, AlertDefinitionDAO alertDefDAO, + List streamNames, String alertExecutorId) throws Exception{ + // Read `alertExecutorConfigs` from configuration and get config for this alertExecutorId + int numPartitions =1; + String partitionerCls = DefaultPolicyPartitioner.class.getCanonicalName(); + String alertExecutorConfigsKey = "alertExecutorConfigs"; + if(config.hasPath(alertExecutorConfigsKey)) { + Map alertExecutorConfigs = config.getObject(alertExecutorConfigsKey); + if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorConfigs)) { + Map alertExecutorConfig = (Map) alertExecutorConfigs.get(alertExecutorId).unwrapped(); + int parts = 0; + if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism")); + numPartitions = parts == 0 ? 1 : parts; + if(alertExecutorConfig.containsKey("partitioner")) partitionerCls = (String) alertExecutorConfig.get("partitioner"); + } + } + + return createAlertExecutors(alertDefDAO, streamNames, alertExecutorId, numPartitions, partitionerCls); + } + + /** + * Build alert executors and assign alert definitions between these executors by partitioner (alertExecutorConfigs["${alertExecutorId}"]["partitioner"]) + */ + public static AlertExecutor[] createAlertExecutors(AlertDefinitionDAO alertDefDAO, List sourceStreams, + String alertExecutorID, int numPartitions, String partitionerCls) throws Exception{ + LOG.info("Creating alert executors with alertExecutorID: " + alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: "+ partitionerCls); + + PolicyPartitioner partitioner = (PolicyPartitioner)Class.forName(partitionerCls).newInstance(); + AlertExecutor[] alertExecutors = new AlertExecutor[numPartitions]; + String[] _sourceStreams = sourceStreams.toArray(new String[0]); + + for(int i = 0; i < numPartitions; i++){ + alertExecutors[i] = new AlertExecutor(alertExecutorID, partitioner, numPartitions, i, alertDefDAO,_sourceStreams); + } + return alertExecutors; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/eagle.alert.policy.PolicyEvaluatorServiceProvider ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/eagle.alert.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/eagle.alert.policy.PolicyEvaluatorServiceProvider deleted file mode 100644 index 887047a..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/eagle.alert.policy.PolicyEvaluatorServiceProvider +++ /dev/null @@ -1 +0,0 @@ -eagle.alert.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider new file mode 100644 index 0000000..c683983 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider @@ -0,0 +1 @@ +org.apache.eagle.alert.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext index 1c19f50..4d5e237 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext @@ -30,6 +30,6 @@ trim=org.wso2.siddhi.extension.string.TrimFunctionExtension upper=org.wso2.siddhi.extension.string.UpperFunctionExtension hex=org.wso2.siddhi.extension.string.HexFunctionExtension unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension -equalsIgnoreCase=eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension -containsIgnoreCase=eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension -regexpIgnoreCase=eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension +equalsIgnoreCase=org.apache.eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension +containsIgnoreCase=org.apache.eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension +regexpIgnoreCase=org.apache.eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/cep/TestSiddhiEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/cep/TestSiddhiEvaluator.java deleted file mode 100644 index e6cfd9f..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/cep/TestSiddhiEvaluator.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.cep; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import eagle.alert.dao.AlertDefinitionDAO; -import eagle.alert.dao.AlertDefinitionDAOImpl; -import eagle.alert.dao.AlertStreamSchemaDAO; -import eagle.alert.dao.AlertStreamSchemaDAOImpl; -import eagle.alert.entity.AlertAPIEntity; -import eagle.alert.entity.AlertDefinitionAPIEntity; -import eagle.alert.entity.AlertStreamSchemaEntity; -import eagle.alert.siddhi.EagleAlertContext; -import eagle.alert.siddhi.SiddhiPolicyDefinition; -import eagle.alert.siddhi.SiddhiPolicyEvaluator; -import eagle.alert.siddhi.StreamMetadataManager; -import eagle.dataproc.core.ValuesArray; -import eagle.datastream.Collector; -import eagle.datastream.Tuple2; -import eagle.executor.AlertExecutor; -import junit.framework.Assert; -import org.junit.Test; - -import java.util.*; - -public class TestSiddhiEvaluator { - - int alertCount = 0; - - public AlertStreamSchemaEntity createStreamMetaEntity(String attrName, String type) { - AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity(); - Map tags = new HashMap(); - tags.put("dataSource", "hdfsAuditLog"); - tags.put("streamName", "hdfsAuditLogEventStream"); - tags.put("attrName", attrName); - entity.setTags(tags); - entity.setAttrType(type); - return entity; - } - - @Test - public void test() throws Exception{ - Config config = ConfigFactory.load("unittest.conf"); - AlertStreamSchemaDAO streamDao = new AlertStreamSchemaDAOImpl(null, null) { - @Override - public List findAlertStreamSchemaByDataSource(String dataSource) throws Exception { - List list = new ArrayList(); - list.add(createStreamMetaEntity("cmd", "string")); - list.add(createStreamMetaEntity("dst", "string")); - list.add(createStreamMetaEntity("src", "string")); - list.add(createStreamMetaEntity("host", "string")); - list.add(createStreamMetaEntity("user", "string")); - list.add(createStreamMetaEntity("timestamp", "long")); - list.add(createStreamMetaEntity("securityZone", "string")); - list.add(createStreamMetaEntity("sensitivityType", "string")); - list.add(createStreamMetaEntity("allowed", "string")); - return list; - } - }; - StreamMetadataManager.getInstance().init(config, streamDao); - - Map data1 = new TreeMap(){{ - put("cmd", "open"); - put("dst", ""); - put("src", ""); - put("host", ""); - put("user", ""); - put("timestamp", String.valueOf(System.currentTimeMillis())); - put("securityZone", ""); - put("sensitivityType", ""); - put("allowed", "true"); - }}; - final SiddhiPolicyDefinition policyDef = new SiddhiPolicyDefinition(); - policyDef.setType("SiddhiCEPEngine"); - String expression = "from hdfsAuditLogEventStream[cmd=='open'] " + - "select * " + - "insert into outputStream ;"; - policyDef.setExpression(expression); - SiddhiPolicyEvaluator evaluator = new SiddhiPolicyEvaluator(config, "testPolicy", policyDef, new String[]{"hdfsAuditLogEventStream"}); - EagleAlertContext context = new EagleAlertContext(); - - AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(null, null) { - @Override - public Map> findActiveAlertDefsGroupbyAlertExecutorId(String site, String dataSource) throws Exception { - return null; - } - }; - - context.alertExecutor = new AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new String[]{"hdfsAuditLogEventStream"}) { - @Override - public Map getDimensions(String policyId) { - return new HashMap(); - } - - @Override - public void runMetricReporter() {} - }; - context.alertExecutor.prepareConfig(config); - context.alertExecutor.init(); - context.evaluator = evaluator; - context.policyId = "testPolicy"; - context.outputCollector = new Collector> () { - @Override - public void collect(Tuple2 stringAlertAPIEntityTuple2) { - alertCount++; - } - }; - evaluator.evaluate(new ValuesArray(context, "hdfsAuditLogEventStream", data1)); - Thread.sleep(2 * 1000); - Assert.assertEquals(alertCount, 1); - StreamMetadataManager.getInstance().reset(); - } -}