Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 179C9200C6F for ; Mon, 3 Apr 2017 13:54:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 16075160BDE; Mon, 3 Apr 2017 11:54:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E57E0160BB4 for ; Mon, 3 Apr 2017 13:54:11 +0200 (CEST) Received: (qmail 77367 invoked by uid 500); 3 Apr 2017 11:54:11 -0000 Mailing-List: contact commits-help@eagle.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.apache.org Delivered-To: mailing list commits@eagle.apache.org Received: (qmail 76690 invoked by uid 99); 3 Apr 2017 11:54:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Apr 2017 11:54:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8FA0BDFBAB; Mon, 3 Apr 2017 11:54:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.apache.org Date: Mon, 03 Apr 2017 11:54:32 -0000 Message-Id: In-Reply-To: <04280246f21e4dcd9fbfe899c4344da2@git.apache.org> References: <04280246f21e4dcd9fbfe899c4344da2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [24/84] [partial] eagle git commit: Clean repo for eagle site archived-at: Mon, 03 Apr 2017 11:54:15 -0000 http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java deleted file mode 100644 index b410cda..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import com.typesafe.config.Config; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.PublishmentType; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; -import org.apache.eagle.alert.engine.publisher.PublishConstants; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class AlertEagleStorePlugin extends AbstractPublishPlugin implements AlertPublishPluginProvider { - - private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class); - private transient IMetadataServiceClient client; - - @Override - public void init(Config config, Publishment publishment, Map conf) throws Exception { - super.init(config, publishment, conf); - client = new MetadataServiceClientImpl(config); - } - - @Override - public void close() { - try { - client.close(); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } - - @Override - public void onAlert(AlertStreamEvent event) throws Exception { - List eventList = this.dedup(event); - if (eventList == null || eventList.isEmpty()) { - return; - } - List alertEvents = new ArrayList<>(); - for (AlertStreamEvent e : eventList) { - alertEvents.add(AlertPublishEvent.createAlertPublishEvent(e)); - } - client.addAlertPublishEvents(alertEvents); - } - - @Override - protected Logger getLogger() { - return LOG; - } - - @Override - public PublishmentType getPluginType() { - return new PublishmentType.Builder() - .name("JDBCStorage") - .type(getClass()) - .description("Publish alerts into eagle metadata store") - .build(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java deleted file mode 100644 index 152a9f1..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.PublishmentType; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; -import org.apache.eagle.alert.engine.publisher.PublishConstants; -import org.apache.eagle.alert.engine.publisher.email.AlertEmailConstants; -import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator; -import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder; -import com.typesafe.config.Config; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*; -import static org.apache.eagle.common.mail.AlertEmailConstants.*; - -public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { - - private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class); - private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 4; - private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 8; - private static final long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute - - private AlertEmailGenerator emailGenerator; - private Map emailConfig; - - private transient ThreadPoolExecutor executorPool; - private String serverHost; - private int serverPort; - private Properties mailClientProperties; - - @Override - @SuppressWarnings("rawtypes") - public void init(Config config, Publishment publishment, Map conf) throws Exception { - super.init(config, publishment, conf); - this.serverHost = config.hasPath(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_HOST) - ? config.getString(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_HOST) : "localhost"; - this.serverPort = config.hasPath(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_PORT) - ? config.getInt(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_PORT) : 80; - this.mailClientProperties = parseMailClientConfig(config); - - executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); - LOG.info(" Creating Email Generator... "); - if (publishment.getProperties() != null) { - emailConfig = new HashMap<>(publishment.getProperties()); - emailGenerator = createEmailGenerator(emailConfig); - } - } - - private Properties parseMailClientConfig(Config config) { - Properties props = new Properties(); - Config mailConfig = null; - if (config.hasPath(EAGLE_COORDINATOR_EMAIL_SERVICE)) { - mailConfig = config.getConfig(EAGLE_COORDINATOR_EMAIL_SERVICE); - } else if (config.hasPath(EAGLE_APPLICATION_EMAIL_SERVICE)) { - mailConfig = config.getConfig(EAGLE_APPLICATION_EMAIL_SERVICE); - } - String mailSmtpServer = mailConfig.getString(EAGLE_EMAIL_SMTP_SERVER); - String mailSmtpPort = mailConfig.getString(EAGLE_EMAIL_SMTP_PORT); - String mailSmtpAuth = mailConfig.getString(EAGLE_EMAIL_SMTP_AUTH); - - props.put(AlertEmailConstants.CONF_MAIL_HOST, mailSmtpServer); - props.put(AlertEmailConstants.CONF_MAIL_PORT, mailSmtpPort); - props.put(AlertEmailConstants.CONF_MAIL_AUTH, mailSmtpAuth); - - if (Boolean.parseBoolean(mailSmtpAuth)) { - String mailSmtpUsername = mailConfig.getString(EAGLE_EMAIL_SMTP_USERNAME); - String mailSmtpPassword = mailConfig.getString(EAGLE_EMAIL_SMTP_PASSWORD); - props.put(AlertEmailConstants.CONF_AUTH_USER, mailSmtpUsername); - props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailSmtpPassword); - } - - String mailSmtpConn = mailConfig.hasPath(EAGLE_EMAIL_SMTP_CONN) ? mailConfig.getString(EAGLE_EMAIL_SMTP_CONN) : AlertEmailConstants.CONN_PLAINTEXT; - if (mailSmtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_TLS)) { - props.put("mail.smtp.starttls.enable", "true"); - } - if (mailSmtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) { - props.put("mail.smtp.socketFactory.port", "465"); - props.put("mail.smtp.socketFactory.class", - "javax.net.ssl.SSLSocketFactory"); - } - - String mailSmtpDebug = mailConfig.hasPath(EAGLE_EMAIL_SMTP_DEBUG) ? mailConfig.getString(EAGLE_EMAIL_SMTP_DEBUG) : "false"; - props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailSmtpDebug); - return props; - } - - @Override - public void onAlert(AlertStreamEvent event) throws Exception { - if (emailGenerator == null) { - LOG.warn("emailGenerator is null due to the incorrect configurations"); - return; - } - List outputEvents = dedup(event); - if (outputEvents == null) { - return; - } - - boolean isSuccess = true; - for (AlertStreamEvent outputEvent : outputEvents) { - if (!emailGenerator.sendAlertEmail(outputEvent)) { - isSuccess = false; - } - } - PublishStatus status = new PublishStatus(); - if (!isSuccess) { - status.errorMessage = "Failed to send email"; - status.successful = false; - } else { - status.errorMessage = ""; - status.successful = true; - } - this.status = status; - } - - @Override - public void update(String dedupIntervalMin, Map pluginProperties) { - super.update(dedupIntervalMin, pluginProperties); - - if (pluginProperties != null && !emailConfig.equals(pluginProperties)) { - emailConfig = new HashMap<>(pluginProperties); - emailGenerator = createEmailGenerator(pluginProperties); - } - } - - @Override - public void close() { - this.executorPool.shutdown(); - } - - private AlertEmailGenerator createEmailGenerator(Map notificationConfig) { - String tplFileName = (String) notificationConfig.get(PublishConstants.TEMPLATE); - if (tplFileName == null || tplFileName.equals("")) { - // tplFileName = "ALERT_DEFAULT_TEMPLATE.vm"; - // tplFileName = "ALERT_LIGHT_TEMPLATE.vm"; - tplFileName = "ALERT_INLINED_TEMPLATE.vm"; - } - String subject = (String) notificationConfig.get(PublishConstants.SUBJECT); - if (subject == null) { - subject = "No subject"; - } - String sender = (String) notificationConfig.get(PublishConstants.SENDER); - String recipients = (String) notificationConfig.get(PublishConstants.RECIPIENTS); - if (sender == null || recipients == null) { - LOG.warn("email sender or recipients is null"); - return null; - } - - AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder() - .withMailProps(this.mailClientProperties) - .withSubject(subject) - .withSender(sender) - .withRecipients(recipients) - .withTplFile(tplFileName) - .withExecutorPool(this.executorPool) - .withServerHost(this.serverHost) - .withServerPort(this.serverPort) - .build(); - return gen; - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode(); - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof AlertEmailPublisher)) { - return false; - } - return true; - } - - @Override - protected Logger getLogger() { - return LOG; - } - - @Override - public PublishmentType getPluginType() { - return new PublishmentType.Builder() - .name("Email") - .type(AlertEmailPublisher.class) - .description("Email alert publisher") - .field("subject") - .field("sender") - .field("recipients") - .build(); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java deleted file mode 100644 index 375a0da..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.PublishmentType; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; -import org.apache.eagle.alert.engine.publisher.PublishConstants; -import org.apache.eagle.common.DateTimeUtil; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; - -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; -import java.util.logging.*; - -public class AlertFilePublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { - - private Logger filelogger = Logger.getLogger(AlertFilePublisher.class.getName()); - private FileHandler handler; - private static ObjectMapper objectMapper = new ObjectMapper(); - - private static final String DEFAULT_FILE_NAME = "eagle-alert.log"; - private static final int DEFAULT_ROTATE_SIZE_KB = 1024; - private static final int DEFAULT_FILE_NUMBER = 5; - - @Override - public void init(Config config, Publishment publishment, Map conf) throws Exception { - super.init(config, publishment, conf); - - String fileName = DEFAULT_FILE_NAME; - int rotateSize = DEFAULT_ROTATE_SIZE_KB; - int numOfFiles = DEFAULT_FILE_NUMBER; - if (publishment.getProperties() != null) { - if (publishment.getProperties().containsKey(PublishConstants.FILE_NAME)) { - fileName = (String) publishment.getProperties().get(PublishConstants.FILE_NAME); - } - if (publishment.getProperties().containsKey(PublishConstants.ROTATE_EVERY_KB)) { - rotateSize = Integer.valueOf(publishment.getProperties().get(PublishConstants.ROTATE_EVERY_KB).toString()); - } - if (publishment.getProperties().containsKey(PublishConstants.NUMBER_OF_FILES)) { - numOfFiles = Integer.valueOf(publishment.getProperties().get(PublishConstants.NUMBER_OF_FILES).toString()); - } - } - handler = new FileHandler(fileName, rotateSize * 1024, numOfFiles, true); - handler.setFormatter(new AlertFileFormatter()); - filelogger.addHandler(handler); - filelogger.setUseParentHandlers(false); - } - - @Override - public PublishmentType getPluginType() { - return new PublishmentType.Builder() - .name("File") - .type(AlertFilePublisher.class) - .description("Local log file publisher") - .build(); - } - - class AlertFileFormatter extends Formatter { - - @Override - public String format(LogRecord record) { - return String.format("%s %s\n", DateTimeUtil.millisecondsToHumanDateWithSeconds(record.getMillis()), - record.getMessage()); - } - } - - @Override - public void onAlert(AlertStreamEvent event) throws Exception { - List eventList = this.dedup(event); - if (eventList == null || eventList.isEmpty()) { - return; - } - for (AlertStreamEvent e : eventList) { - //filelogger.info(e.toString()); - AlertPublishEvent alert = AlertPublishEvent.createAlertPublishEvent(e); - filelogger.info(objectMapper.writeValueAsString(alert)); - } - } - - @Override - public void close() { - if (handler != null) { - handler.close(); - } - } - - @Override - protected org.slf4j.Logger getLogger() { - return LoggerFactory.getLogger(AlertFilePublisher.class); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java deleted file mode 100644 index adac1aa..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.PublishmentType; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; -import org.apache.eagle.alert.engine.publisher.PublishConstants; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -public class AlertKafkaPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { - - private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class); - private static final long MAX_TIMEOUT_MS = 60000; - - @SuppressWarnings("rawtypes") - private KafkaProducer producer; - private String brokerList; - private String topic; - private KafkaWriteMode mode = KafkaWriteMode.async; - - @Override - @SuppressWarnings("rawtypes") - public void init(Config config, Publishment publishment, Map conf) throws Exception { - super.init(config, publishment, conf); - - if (publishment.getProperties() != null) { - Map publishConfig = new HashMap<>(publishment.getProperties()); - brokerList = ((String) publishConfig.get(PublishConstants.BROKER_LIST)).trim(); - producer = KafkaProducerManager.INSTANCE.getProducer(brokerList, publishConfig); - topic = ((String) publishConfig.get(PublishConstants.TOPIC)).trim(); - mode = KafkaProducerManager.INSTANCE.getKafkaWriteMode(publishConfig); - } - } - - @Override - public void onAlert(AlertStreamEvent event) throws Exception { - if (producer == null) { - LOG.warn("KafkaProducer is null due to the incorrect configurations"); - return; - } - - this.emit(this.topic, this.dedup(event)); - } - - @SuppressWarnings("rawtypes") - @Override - public void update(String dedupIntervalMin, Map pluginProperties) { - deduplicator.setDedupIntervalMin(dedupIntervalMin); - String newBrokerList = ((String) pluginProperties.get(PublishConstants.BROKER_LIST)).trim(); - String newTopic = ((String) pluginProperties.get(PublishConstants.TOPIC)).trim(); - if (!newBrokerList.equals(this.brokerList)) { - if (producer != null) { - producer.close(); - } - brokerList = newBrokerList; - KafkaProducer newProducer = null; - try { - newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList, pluginProperties); - } catch (Exception e) { - LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties); - } - producer = newProducer; - } - topic = newTopic; - } - - @Override - public void close() { - producer.close(); - } - - @SuppressWarnings( {"rawtypes", "unchecked"}) - protected void emit(String topic, List outputEvents) { - // we need to check producer here since the producer is invisable to extended kafka publisher - if (producer == null) { - LOG.warn("KafkaProducer is null due to the incorrect configurations"); - return; - } - if (outputEvents == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Alert stream events list in publishment is empty"); - } - return; - } - this.status = new PublishStatus(); - try { - for (AlertStreamEvent outputEvent : outputEvents) { - ProducerRecord record = createRecord(outputEvent, topic); - if (record == null) { - LOG.error("Alert serialize return null, ignored message! "); - return; - } - if (mode == KafkaWriteMode.sync) { - Future future = producer.send(record); - future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS); - succeed(mode, ""); - } else { - producer.send(record, new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null) { - failOnException(String.format("Failed to send message to %s, due to:%s", - brokerList, exception), exception); - return; - } - succeed(mode, ""); - } - }); - } - } - } catch (InterruptedException | ExecutionException e) { - failOnException(String.format("Failed to send message to %s, due to:%s", brokerList, e), e); - } catch (Exception ex) { - failOnException(String.format("Failed to send message to %s, due to:%s", brokerList, ex), ex); - } - } - - private void failOnException(String message, Exception e) { - status.successful = false; - status.errorMessage = message; - LOG.error(status.errorMessage, e); - } - - private void succeed(KafkaWriteMode mode, String message) { - status.successful = true; - status.errorMessage = ""; - if (LOG.isDebugEnabled()) { - LOG.debug("Successfully send message to Kafka: {} in mode {}", brokerList, mode); - } - } - - protected String getTopic() { - return this.topic; - } - - private ProducerRecord createRecord(AlertStreamEvent event, String topic) throws Exception { - Object o = serialzeEvent(event); - if (o != null) { - ProducerRecord record = new ProducerRecord<>(topic, o); - return record; - } else { - return null; - } - } - - private Object serialzeEvent(AlertStreamEvent event) { - return serializer.serialize(event); - } - - @Override - protected Logger getLogger() { - return LOG; - } - - @Override - public PublishmentType getPluginType() { - return new PublishmentType.Builder() - .name("Kafka") - .type(getClass()) - .description("Kafka alert publisher") - .field("kafka_broker","localhost:9092") - .field("topic") - .build(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java deleted file mode 100644 index 957d356..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * @since on 5/11/16. - */ -public class AlertPublishPluginsFactory { - - private static final Logger LOG = LoggerFactory.getLogger(AlertPublishPluginsFactory.class); - - @SuppressWarnings("rawtypes") - public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config, Map conf) { - AlertPublishPlugin plugin = null; - String publisherType = publishment.getType(); - try { - plugin = (AlertPublishPlugin) Class.forName(publisherType).newInstance(); - plugin.init(config, publishment, conf); - } catch (Exception ex) { - LOG.error("Error in loading AlertPublisherPlugin class: ", ex); - //throw new IllegalStateException(ex); - } - return plugin; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java deleted file mode 100644 index 5b902f9..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.eagle.alert.engine.coordinator.PublishPartition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; -import org.apache.eagle.alert.engine.publisher.AlertPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -@SuppressWarnings("rawtypes") -public class AlertPublisherImpl implements AlertPublisher { - - private static final long serialVersionUID = 4809983246198138865L; - private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class); - - private final String name; - - private volatile Map publishPluginMapping = new ConcurrentHashMap<>(1); - private Config config; - private Map conf; - - public AlertPublisherImpl(String name) { - this.name = name; - } - - @Override - public void init(Config config, Map conf) { - this.config = config; - this.conf = conf; - } - - @Override - public String getName() { - return name; - } - - @Override - public void nextEvent(PublishPartition partition, AlertStreamEvent event) { - if (LOG.isDebugEnabled()) { - LOG.debug(event.toString()); - } - notifyAlert(partition, event); - } - - private void notifyAlert(PublishPartition partition, AlertStreamEvent event) { - // remove the column values for publish plugin match - partition.getColumnValues().clear(); - if (!publishPluginMapping.containsKey(partition)) { - LOG.warn("PublishPartition {} is not found in publish plugin map", partition); - return; - } - AlertPublishPlugin plugin = publishPluginMapping.get(partition); - if (plugin == null) { - LOG.warn("PublishPartition {} has problems while initializing publish plugin", partition); - return; - } - event.ensureAlertId(); - try { - LOG.debug("Execute alert publisher {}", plugin.getClass().getCanonicalName()); - plugin.onAlert(event); - } catch (Exception ex) { - LOG.error("Fail invoking publisher's onAlert, continue ", ex); - } - } - - @Override - public void close() { - publishPluginMapping.values().forEach(plugin -> plugin.close()); - } - - @Override - public synchronized void onPublishChange(List added, - List removed, - List afterModified, - List beforeModified) { - if (added == null) { - added = new ArrayList<>(); - } - if (removed == null) { - removed = new ArrayList<>(); - } - if (afterModified == null) { - afterModified = new ArrayList<>(); - } - if (beforeModified == null) { - beforeModified = new ArrayList<>(); - } - - if (afterModified.size() != beforeModified.size()) { - LOG.warn("beforeModified size != afterModified size"); - return; - } - - // copy and swap to avoid concurrency issue - Map newPublishMap = new HashMap<>(publishPluginMapping); - - // added - for (Publishment publishment : added) { - LOG.debug("OnPublishmentChange : add publishment : {} ", publishment); - - AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); - if (plugin != null) { - for (PublishPartition p : getPublishPartitions(publishment)) { - newPublishMap.put(p, plugin); - } - } else { - LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); - } - } - //removed - List toBeClosed = new ArrayList<>(); - for (Publishment publishment : removed) { - AlertPublishPlugin plugin = null; - for (PublishPartition p : getPublishPartitions(publishment)) { - if (plugin == null) { - plugin = newPublishMap.remove(p); - } else { - newPublishMap.remove(p); - } - } - if (plugin != null) { - toBeClosed.add(plugin); - } - } - // updated - for (Publishment publishment : afterModified) { - // for updated publishment, need to init them too - AlertPublishPlugin newPlugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); - if (newPlugin != null) { - AlertPublishPlugin plugin = null; - for (PublishPartition p : getPublishPartitions(publishment)) { - if (plugin == null) { - plugin = newPublishMap.get(p); - } - newPublishMap.put(p, newPlugin); - } - if (plugin != null) { - toBeClosed.add(plugin); - } - } else { - LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); - } - } - - // now do the swap - publishPluginMapping = newPublishMap; - - // safely close : it depend on plugin to check if want to wait all data to be flushed. - closePlugins(toBeClosed); - } - - private Set getPublishPartitions(Publishment publish) { - List streamIds = new ArrayList<>(); - // add the publish to the bolt - if (publish.getStreamIds() == null || publish.getStreamIds().size() <= 0) { - streamIds.add(Publishment.STREAM_NAME_DEFAULT); - } else { - streamIds.addAll(publish.getStreamIds()); - } - Set publishPartitions = new HashSet<>(); - for (String streamId : streamIds) { - for (String policyId : publish.getPolicyIds()) { - publishPartitions.add(new PublishPartition(streamId, policyId, publish.getName(), publish.getPartitionColumns())); - } - } - return publishPartitions; - } - - private void closePlugins(List toBeClosed) { - for (AlertPublishPlugin p : toBeClosed) { - try { - p.close(); - } catch (Exception e) { - LOG.error(String.format("Error when close publish plugin {}!", p.getClass().getCanonicalName()), e); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java deleted file mode 100644 index 0d60246..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import com.typesafe.config.Config; -import com.ullink.slack.simpleslackapi.SlackAttachment; -import com.ullink.slack.simpleslackapi.SlackChannel; -import com.ullink.slack.simpleslackapi.SlackSession; -import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory; -import org.apache.commons.lang.StringUtils; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.PublishmentType; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider; -import org.apache.eagle.alert.engine.publisher.PublishConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @since Sep 14, 2016. - */ -public class AlertSlackPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { - private static final Logger LOG = LoggerFactory.getLogger(AlertSlackPublisher.class); - - private SlackSession session; - private String slackChannels; - private String severitys; - - @Override - public void init(Config config, Publishment publishment, Map conf) throws Exception { - super.init(config, publishment, conf); - - if (publishment.getProperties() != null) { - Map slackConfig = new HashMap<>(publishment.getProperties()); - final String token = ((String) slackConfig.get(PublishConstants.TOKEN)).trim(); - slackChannels = ((String) slackConfig.get(PublishConstants.CHANNELS)).trim(); - severitys = ((String) slackConfig.get(PublishConstants.SEVERITYS)).trim(); - - if (StringUtils.isNotEmpty(token)) { - LOG.debug(" Creating Slack Session... "); - session = SlackSessionFactory.createWebSocketSlackSession(token); - session.connect(); - } - } - - } - - @Override - public void onAlert(AlertStreamEvent event) throws Exception { - if (session == null) { - LOG.warn("Slack session is null due to incorrect configurations!"); - return; - } - List outputEvents = dedup(event); - if (outputEvents == null) { - return; - } - - PublishStatus status = new PublishStatus(); - for (AlertStreamEvent outputEvent: outputEvents) { - String message = ""; - String severity = ""; - String color = ""; - // only user defined severity level alert will send to Slack; - boolean publishToSlack = false; - - StreamDefinition streamDefinition = outputEvent.getSchema(); - for (int i = 0; i < outputEvent.getData().length; i++) { - if (i > streamDefinition.getColumns().size()) { - if (LOG.isWarnEnabled()) { - LOG.warn("output column does not found for event data, this indicate code error!"); - } - continue; - } - String colName = streamDefinition.getColumns().get(i).getName(); - if (colName.equalsIgnoreCase("severity")) { - severity = outputEvent.getData()[i].toString(); - publishToSlack = severitys.contains(severity); - } - if (colName.equalsIgnoreCase("message")) { - message = outputEvent.getData()[i].toString(); - } - } - - if (publishToSlack) { - try { - // get hex color code from severity - switch (severity) { - case "CRITICAL": - color = "#dd3333"; //red - break; - case "WARNING": - color = "#ffc04c"; //yellow - break; - default: - color = "#439FE0"; //blue - break; - } - - // here to be generic, only publish message like "CRITICAL port-1 is down" to Slack - String messageToSlack = String.format("%s %s", severity, message); - SlackAttachment attachment = new SlackAttachment(); - attachment.setColor(color); - attachment.setText(messageToSlack); - - for (String slackChannel: slackChannels.split(",")) { - sendMessageToAChannel(session, slackChannel, null, attachment); - } - } catch (Exception e) { - status.successful = false; - status.errorMessage = String.format("Failed to send message to slack channel %s, due to:%s", slackChannels, e); - LOG.error(status.errorMessage, e); - } - } - } - - } - - @Override - public void close() { - try { - session.disconnect(); - } catch (IOException e) { - LOG.error(e.getMessage()); - } - } - - @Override - protected Logger getLogger() { - return LOG; - } - - private void sendMessageToAChannel(SlackSession session, String channelName, String message, SlackAttachment attachment) { - //get a channel - SlackChannel channel = session.findChannelByName(channelName); - session.sendMessage(channel, message, attachment); - } - - @Override - public PublishmentType getPluginType() { - return new PublishmentType.Builder() - .name("Slack") - .type(getClass()) - .description("Slack alert publisher") - .field("token") - .field("channels") - .field("severitys") - .field("urltemplate") - .build(); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java deleted file mode 100644 index ac99db3..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.publisher.impl; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; -import org.apache.eagle.alert.engine.publisher.dedup.DedupCache; -import org.joda.time.Period; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class DefaultDeduplicator implements AlertDeduplicator { - - private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class); - - private long dedupIntervalSec; - private List customDedupFields = new ArrayList<>(); - private String dedupStateField; - private String dedupStateCloseValue; - - private DedupCache dedupCache; - - private Cache withoutStatesCache; - - public DefaultDeduplicator() { - this.dedupIntervalSec = 0; - } - - public DefaultDeduplicator(String intervalMin) { - setDedupIntervalMin(intervalMin); - } - - public DefaultDeduplicator(long intervalMin) { - this.dedupIntervalSec = intervalMin; - } - - public DefaultDeduplicator(String intervalMin, List customDedupFields, - String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) { - setDedupIntervalMin(intervalMin); - if (customDedupFields != null) { - this.customDedupFields = customDedupFields; - } - if (StringUtils.isNotBlank(dedupStateField)) { - this.dedupStateField = dedupStateField; - } - if (StringUtils.isNotBlank(dedupStateCloseValue)) { - this.dedupStateCloseValue = dedupStateCloseValue; - } - this.dedupCache = dedupCache; - - withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite( - this.dedupIntervalSec, TimeUnit.SECONDS).build(); - } - - /* - * @param key - * @return - */ - public List checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) { - if (StringUtils.isBlank(stateFiledValue)) { - // without state field, we cannot determine whether it is duplicated - // without custom filed values, we cannot determine whether it is duplicated - synchronized (withoutStatesCache) { - if (withoutStatesCache != null && withoutStatesCache.getIfPresent(key) != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Alert event {} with key {} is skipped since it is duplicated", event, key); - } - return null; - } else if (withoutStatesCache != null) { - withoutStatesCache.put(key, ""); - } - } - return Arrays.asList(event); - } - return dedupCache.dedup(event, key, dedupStateField, stateFiledValue, dedupStateCloseValue); - } - - public List dedup(AlertStreamEvent event) { - if (event == null) { - return null; - } - // check custom field, and get the field values - StreamDefinition streamDefinition = event.getSchema(); - HashMap customFieldValues = new HashMap<>(); - String stateFiledValue = null; - for (int i = 0; i < event.getData().length; i++) { - if (i > streamDefinition.getColumns().size()) { - if (LOG.isWarnEnabled()) { - LOG.warn("output column does not found for event data, this indicate code error!"); - } - continue; - } - String colName = streamDefinition.getColumns().get(i).getName(); - Object colValue = event.getData()[i]; - - if (colName.equals(dedupStateField) && colValue != null) { - stateFiledValue = colValue.toString(); - } - - // make all of the field as unique key if no custom dedup field provided - if (colValue != null) { - if (customDedupFields == null || customDedupFields.size() <= 0) { - customFieldValues.put(colName, colValue.toString()); - } else { - for (String field : customDedupFields) { - if (colName.equals(field)) { - customFieldValues.put(field, colValue.toString()); - break; - } - } - } - } - } - - List outputEvents = checkDedup(event, new EventUniq(event.getStreamId(), - event.getPolicyId(), event.getCreatedTime(), customFieldValues), stateFiledValue); - if (outputEvents != null && outputEvents.size() > 0) { - return outputEvents; - } else if (LOG.isInfoEnabled()) { - LOG.info("Alert event is skipped because it's duplicated: {}", event.toString()); - } - return null; - } - - @Override - public void setDedupIntervalMin(String newDedupIntervalMin) { - if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) { - dedupIntervalSec = 0; - return; - } - try { - Period period = Period.parse(newDedupIntervalMin); - this.dedupIntervalSec = period.toStandardSeconds().getSeconds(); - } catch (Exception e) { - LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e); - this.dedupIntervalSec = 0; - } - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java deleted file mode 100644 index 511abcd..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -/** - * - */ -package org.apache.eagle.alert.engine.publisher.impl; - -import org.apache.commons.lang3.builder.HashCodeBuilder; - -import com.google.common.base.Joiner; - -import java.util.HashMap; - -/** - * @since Mar 19, 2015. - */ -public class EventUniq { - public String streamId; - public String policyId; - public Long timestamp; // event's createTimestamp - public long createdTime; // created time, for cache removal; - public HashMap customFieldValues; - public boolean removable = false; - - public EventUniq(String streamId, String policyId, long timestamp) { - this.streamId = streamId; - this.timestamp = timestamp; - this.policyId = policyId; - this.createdTime = System.currentTimeMillis(); - } - - public EventUniq(String streamId, String policyId, long timestamp, HashMap customFieldValues) { - this.streamId = streamId; - this.timestamp = timestamp; - this.policyId = policyId; - this.createdTime = System.currentTimeMillis(); - this.customFieldValues = customFieldValues; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof EventUniq) { - EventUniq au = (EventUniq) obj; - boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId); - if (this.customFieldValues != null && au.customFieldValues != null) { - result = result & this.customFieldValues.equals(au.customFieldValues); - } - return result; - } - return false; - } - - @Override - public int hashCode() { - HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId); - - if (customFieldValues != null) { - builder.append(customFieldValues); - } - return builder.build(); - } - - @Override - public String toString() { - return String.format("EventUniq[streamId: %s, policyId: %s, timestamp: %s, removable: %s, customFieldValues: %s]", - streamId, policyId, timestamp, removable, Joiner.on(",").withKeyValueSeparator(">").join(customFieldValues)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java deleted file mode 100644 index aa22884..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.impl; - -import org.apache.eagle.alert.engine.codec.IEventSerializer; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.utils.JsonUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @since Jul 9, 2016. - */ -public class JsonEventSerializer implements IEventSerializer { - - private static final Logger LOG = LoggerFactory.getLogger(JsonEventSerializer.class); - - @SuppressWarnings("rawtypes") - public JsonEventSerializer(Map stormConf) throws Exception { - } - - @Override - public Object serialize(AlertStreamEvent event) { - String result = streamEventToJson(event); - if (LOG.isDebugEnabled()) { - LOG.debug("serialized alert event : {}", result); - } - return result; - } - - public String streamEventToJson(AlertStreamEvent event) { - Map jsonMap = new HashMap(); - jsonMap.put("policyId", event.getPolicyId()); - jsonMap.put("streamId", event.getStreamId()); - jsonMap.put("createBy", event.getCreatedBy()); - jsonMap.put("createTime", event.getCreatedTime()); - // data - int size = event.getData().length; - List columns = event.getSchema().getColumns(); - for (int i = 0; i < size; i++) { - if (columns.size() < i) { - // redundant check to log inconsistency - LOG.error(" stream event data have different lenght compare to column definition! "); - } else { - jsonMap.put(columns.get(i).getName(), event.getData()[i]); - } - } - return JsonUtils.writeValueAsString(jsonMap); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java deleted file mode 100644 index 0c7f04c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.publisher.impl; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * The producer is thread safe and sharing a single producer instance across threads will generally be faster than - * having multiple instances. - */ -public class KafkaProducerManager { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerManager.class); - - private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String STRING_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - - private static final String VALUE_DESERIALIZER = "value.deserializer"; - private static final String VALUE_DESERIALIZER_UNDERSCORE = "value_deserializer"; - - private static final String VALUE_SERIALIZER = "value.serializer"; - private static final String VALUE_SERIALIZER_UNDERSCORE = "value_serializer"; - - private static final String KEY_DESERIALIZER = "key.deserializer"; - private static final String KEY_DESERIALIZER_UNDERSCORE = "key_deserializer"; - - private static final String KEY_SERIALIZER = "key.serializer"; - private static final String KEY_SERIALIZER_UNDERSCORE = "key_serializer"; - - private static final String REQUEST_REQUIRED_ACKS = "request.required.acks"; - private static final String REQUEST_REQUIRED_ACKS_UNDERSCORE = "request_required_acks"; - // the producer gets an acknowledgement after the leader replica has received the data - private static final String REQUEST_REQUIRED_ACKS_DEFAULT = "1"; - - private static final String PRODUCER_TYPE = "producer.type"; - - private static final String KEY_KAFKA_PROPERTIES = "kafka_client_config"; - - private static final String KEY_KAFKA_PROPERTY_NAME = "name"; - private static final String KEY_KAFKA_PROPERTY_VALUE = "value"; - - public static final KafkaProducerManager INSTANCE = new KafkaProducerManager(); - - public KafkaProducer getProducer(String brokerList, Map publishConfig) { - Properties configMap = new Properties(); - configMap.put("bootstrap.servers", brokerList); - configMap.put("metadata.broker.list", brokerList); - - // key serializer - if (publishConfig.containsKey(KEY_SERIALIZER_UNDERSCORE)) { - configMap.put(KEY_SERIALIZER, publishConfig.get(KEY_SERIALIZER_UNDERSCORE)); - } else { - configMap.put(KEY_SERIALIZER, STRING_SERIALIZER); - } - - if (publishConfig.containsKey(KEY_DESERIALIZER_UNDERSCORE)) { - configMap.put(KEY_DESERIALIZER, publishConfig.get(KEY_DESERIALIZER_UNDERSCORE)); - } else { - configMap.put(KEY_DESERIALIZER, STRING_DESERIALIZER); - } - - // value serializer - if (publishConfig.containsKey(VALUE_SERIALIZER_UNDERSCORE)) { - configMap.put(VALUE_SERIALIZER, publishConfig.get(VALUE_SERIALIZER_UNDERSCORE)); - } else { - configMap.put(VALUE_SERIALIZER, STRING_SERIALIZER); - } - String requestRequiredAcks = REQUEST_REQUIRED_ACKS_DEFAULT; - if (publishConfig.containsKey(REQUEST_REQUIRED_ACKS_UNDERSCORE)) { - requestRequiredAcks = (String) publishConfig.get(REQUEST_REQUIRED_ACKS_UNDERSCORE); - } - configMap.put(REQUEST_REQUIRED_ACKS, requestRequiredAcks); - - // value deserializer - if (publishConfig.containsKey(VALUE_DESERIALIZER_UNDERSCORE)) { - configMap.put(VALUE_DESERIALIZER, publishConfig.get(VALUE_DESERIALIZER_UNDERSCORE)); - } else { - configMap.put(VALUE_DESERIALIZER, STRING_DESERIALIZER); - } - // kafka config will overwrite the config defined in publishment properties - if (publishConfig.containsKey(KEY_KAFKA_PROPERTIES)) { - Map kafkaProperties = getKafkaProperties(publishConfig.get(KEY_KAFKA_PROPERTIES)); - kafkaProperties.forEach((k, v) -> configMap.put(k, v)); - } - - if (LOG.isInfoEnabled()) { - LOG.info(" given kafka config {}, create producer config map {}", publishConfig, configMap); - } - - KafkaProducer producer = new KafkaProducer<>(configMap); - return producer; - } - - public KafkaWriteMode getKafkaWriteMode(Map publishConfig) { - if (publishConfig.containsKey(KEY_KAFKA_PROPERTIES)) { - return KafkaWriteMode.fromString((String) getKafkaProperty(publishConfig.get(KEY_KAFKA_PROPERTIES), PRODUCER_TYPE)); - } - return KafkaWriteMode.async; - } - - private Map getKafkaProperties(Object kafkaProperties) { - Map result = new HashMap(); - try { - @SuppressWarnings("unchecked") - List> tempKafkaProperties = (List>) kafkaProperties; - if (tempKafkaProperties != null) { - tempKafkaProperties.forEach(one -> { - if (one.containsKey(KEY_KAFKA_PROPERTY_NAME) && one.containsKey(KEY_KAFKA_PROPERTY_VALUE)) { - result.put((String) one.get(KEY_KAFKA_PROPERTY_NAME), one.get(KEY_KAFKA_PROPERTY_VALUE)); - } - }); - } - } catch (ClassCastException e) { - LOG.warn("fail to cast kafka properties", e); - } - return result; - } - - private Object getKafkaProperty(Object kafkaProperties, String propertyName) { - return getKafkaProperties(kafkaProperties).get(propertyName); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaWriteMode.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaWriteMode.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaWriteMode.java deleted file mode 100644 index 6e59474..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaWriteMode.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.impl; - -public enum KafkaWriteMode { - - sync, async; - - public static KafkaWriteMode fromString(String mode) { - for (KafkaWriteMode one : KafkaWriteMode.values()) { - if (one.name().equalsIgnoreCase(mode)) { - return one; - } - } - // default mode is async - return async; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java deleted file mode 100644 index 44f902c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -/** - * Object that holds the status of Notification Posted to Notification Plugin. - */ -public class PublishStatus { - public boolean successful; - public String errorMessage; -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java deleted file mode 100644 index 5eb444d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.impl; - -import org.apache.eagle.alert.engine.codec.IEventSerializer; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; - -import java.util.Map; - -/** - * @since Jun 3, 2016. - */ -public class StringEventSerializer implements IEventSerializer { - - @SuppressWarnings("rawtypes") - public StringEventSerializer(Map stormConf) { - } - - @Override - public Object serialize(AlertStreamEvent event) { - return event.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java deleted file mode 100644 index 1fe318e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.template; - -import java.util.Arrays; -import java.util.List; - -public class AlertContextFields { - public static final String STREAM_ID = "STREAM_ID"; - public static final String SITE_ID = "SITE_ID"; - public static final String ALERT_ID = "ALERT_ID"; - public static final String CREATED_BY = "CREATED_BY"; - public static final String POLICY_ID = "POLICY_ID"; - public static final String CREATED_TIMESTAMP = "CREATED_TIMESTAMP"; - public static final String CREATED_TIME = "CREATED_TIME"; - public static final String ALERT_TIMESTAMP = "ALERT_TIMESTAMP"; - public static final String ALERT_TIME = "ALERT_TIME"; - public static final String ALERT_SCHEMA = "ALERT_SCHEMA"; - public static final String ALERT_EVENT = "ALERT_EVENT"; - public static final String POLICY_DESC = "POLICY_DESC"; - public static final String POLICY_TYPE = "POLICY_TYPE"; - public static final String POLICY_DEFINITION = "POLICY_DEFINITION"; - public static final String POLICY_HANDLER = "POLICY_HANDLER"; - - public static List getAllContextFields() { - return Arrays.asList( - STREAM_ID, ALERT_ID, CREATED_BY, POLICY_ID, CREATED_TIMESTAMP, CREATED_TIME, ALERT_TIMESTAMP, ALERT_TIME, ALERT_SCHEMA, POLICY_DESC, POLICY_TYPE, POLICY_DEFINITION, POLICY_HANDLER - ); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java deleted file mode 100644 index 760ec7c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.template; - -import com.typesafe.config.Config; -import org.apache.eagle.alert.engine.publisher.AlertStreamFilter; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; - -import java.util.Collection; - -/** - * Alert Template Engine. - */ -public interface AlertTemplateEngine extends AlertStreamFilter { - /** - * Initialize AlertTemplateEngine with Config. - */ - void init(Config config); - - /** - * Register policy with definition. - */ - void register(PolicyDefinition policyDefinition); - - /** - * Register policy by policyId. - */ - void unregister(String policyId); - - /** - * @return registered policy definitions. - */ - Collection getPolicies(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java deleted file mode 100644 index 875facb..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.template; - -public class AlertTemplateProvider { - public static AlertTemplateEngine createAlertTemplateEngine() { - return new VelocityAlertTemplateEngine(); - } -} \ No newline at end of file