eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [24/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:32 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
deleted file mode 100644
index b410cda..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *  <p/>
- *  http://www.apache.org/licenses/LICENSE-2.0
- *  <p/>
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
-import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class AlertEagleStorePlugin extends AbstractPublishPlugin implements AlertPublishPluginProvider {
-
-    private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class);
-    private transient IMetadataServiceClient client;
-
-    @Override
-    public void init(Config config, Publishment publishment, Map conf) throws Exception {
-        super.init(config, publishment, conf);
-        client = new MetadataServiceClientImpl(config);
-    }
-
-    @Override
-    public void close() {
-        try {
-            client.close();
-        } catch (IOException e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void onAlert(AlertStreamEvent event) throws Exception {
-        List<AlertStreamEvent> eventList = this.dedup(event);
-        if (eventList == null || eventList.isEmpty()) {
-            return;
-        }
-        List<AlertPublishEvent> alertEvents = new ArrayList<>();
-        for (AlertStreamEvent e : eventList) {
-            alertEvents.add(AlertPublishEvent.createAlertPublishEvent(e));
-        }
-        client.addAlertPublishEvents(alertEvents);
-    }
-
-    @Override
-    protected Logger getLogger() {
-        return LOG;
-    }
-
-    @Override
-    public PublishmentType getPluginType() {
-        return new PublishmentType.Builder()
-                .name("JDBCStorage")
-                .type(getClass())
-                .description("Publish alerts into eagle metadata store")
-                .build();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
deleted file mode 100644
index 152a9f1..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
-import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import org.apache.eagle.alert.engine.publisher.email.AlertEmailConstants;
-import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
-import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*;
-import static org.apache.eagle.common.mail.AlertEmailConstants.*;
-
-public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
-    private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
-    private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
-    private static final long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
-
-    private AlertEmailGenerator emailGenerator;
-    private Map<String, Object> emailConfig;
-
-    private transient ThreadPoolExecutor executorPool;
-    private String serverHost;
-    private int serverPort;
-    private Properties mailClientProperties;
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public void init(Config config, Publishment publishment, Map conf) throws Exception {
-        super.init(config, publishment, conf);
-        this.serverHost = config.hasPath(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_HOST)
-            ? config.getString(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_HOST) : "localhost";
-        this.serverPort = config.hasPath(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_PORT)
-            ? config.getInt(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_PORT) : 80;
-        this.mailClientProperties = parseMailClientConfig(config);
-
-        executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-        LOG.info(" Creating Email Generator... ");
-        if (publishment.getProperties() != null) {
-            emailConfig = new HashMap<>(publishment.getProperties());
-            emailGenerator = createEmailGenerator(emailConfig);
-        }
-    }
-
-    private Properties parseMailClientConfig(Config config) {
-        Properties props = new Properties();
-        Config mailConfig = null;
-        if (config.hasPath(EAGLE_COORDINATOR_EMAIL_SERVICE)) {
-            mailConfig = config.getConfig(EAGLE_COORDINATOR_EMAIL_SERVICE);
-        } else if (config.hasPath(EAGLE_APPLICATION_EMAIL_SERVICE)) {
-            mailConfig = config.getConfig(EAGLE_APPLICATION_EMAIL_SERVICE);
-        }
-        String mailSmtpServer = mailConfig.getString(EAGLE_EMAIL_SMTP_SERVER);
-        String mailSmtpPort = mailConfig.getString(EAGLE_EMAIL_SMTP_PORT);
-        String mailSmtpAuth =  mailConfig.getString(EAGLE_EMAIL_SMTP_AUTH);
-
-        props.put(AlertEmailConstants.CONF_MAIL_HOST, mailSmtpServer);
-        props.put(AlertEmailConstants.CONF_MAIL_PORT, mailSmtpPort);
-        props.put(AlertEmailConstants.CONF_MAIL_AUTH, mailSmtpAuth);
-
-        if (Boolean.parseBoolean(mailSmtpAuth)) {
-            String mailSmtpUsername = mailConfig.getString(EAGLE_EMAIL_SMTP_USERNAME);
-            String mailSmtpPassword = mailConfig.getString(EAGLE_EMAIL_SMTP_PASSWORD);
-            props.put(AlertEmailConstants.CONF_AUTH_USER, mailSmtpUsername);
-            props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailSmtpPassword);
-        }
-
-        String mailSmtpConn = mailConfig.hasPath(EAGLE_EMAIL_SMTP_CONN) ? mailConfig.getString(EAGLE_EMAIL_SMTP_CONN) : AlertEmailConstants.CONN_PLAINTEXT;
-        if (mailSmtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_TLS)) {
-            props.put("mail.smtp.starttls.enable", "true");
-        }
-        if (mailSmtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) {
-            props.put("mail.smtp.socketFactory.port", "465");
-            props.put("mail.smtp.socketFactory.class",
-                    "javax.net.ssl.SSLSocketFactory");
-        }
-
-        String mailSmtpDebug = mailConfig.hasPath(EAGLE_EMAIL_SMTP_DEBUG) ? mailConfig.getString(EAGLE_EMAIL_SMTP_DEBUG) : "false";
-        props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailSmtpDebug);
-        return props;
-    }
-
-    @Override
-    public void onAlert(AlertStreamEvent event) throws Exception {
-        if (emailGenerator == null) {
-            LOG.warn("emailGenerator is null due to the incorrect configurations");
-            return;
-        }
-        List<AlertStreamEvent> outputEvents = dedup(event);
-        if (outputEvents == null) {
-            return;
-        }
-
-        boolean isSuccess = true;
-        for (AlertStreamEvent outputEvent : outputEvents) {
-            if (!emailGenerator.sendAlertEmail(outputEvent)) {
-                isSuccess = false;
-            }
-        }
-        PublishStatus status = new PublishStatus();
-        if (!isSuccess) {
-            status.errorMessage = "Failed to send email";
-            status.successful = false;
-        } else {
-            status.errorMessage = "";
-            status.successful = true;
-        }
-        this.status = status;
-    }
-
-    @Override
-    public void update(String dedupIntervalMin, Map<String, Object> pluginProperties) {
-        super.update(dedupIntervalMin, pluginProperties);
-
-        if (pluginProperties != null && !emailConfig.equals(pluginProperties)) {
-            emailConfig = new HashMap<>(pluginProperties);
-            emailGenerator = createEmailGenerator(pluginProperties);
-        }
-    }
-
-    @Override
-    public void close() {
-        this.executorPool.shutdown();
-    }
-
-    private AlertEmailGenerator createEmailGenerator(Map<String, Object> notificationConfig) {
-        String tplFileName = (String) notificationConfig.get(PublishConstants.TEMPLATE);
-        if (tplFileName == null || tplFileName.equals("")) {
-            // tplFileName = "ALERT_DEFAULT_TEMPLATE.vm";
-            // tplFileName = "ALERT_LIGHT_TEMPLATE.vm";
-            tplFileName = "ALERT_INLINED_TEMPLATE.vm";
-        }
-        String subject = (String) notificationConfig.get(PublishConstants.SUBJECT);
-        if (subject == null) {
-            subject = "No subject";
-        }
-        String sender = (String) notificationConfig.get(PublishConstants.SENDER);
-        String recipients = (String) notificationConfig.get(PublishConstants.RECIPIENTS);
-        if (sender == null || recipients == null) {
-            LOG.warn("email sender or recipients is null");
-            return null;
-        }
-
-        AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder()
-            .withMailProps(this.mailClientProperties)
-            .withSubject(subject)
-            .withSender(sender)
-            .withRecipients(recipients)
-            .withTplFile(tplFileName)
-            .withExecutorPool(this.executorPool)
-            .withServerHost(this.serverHost)
-            .withServerPort(this.serverPort)
-            .build();
-        return gen;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof AlertEmailPublisher)) {
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    protected Logger getLogger() {
-        return LOG;
-    }
-
-    @Override
-    public PublishmentType getPluginType() {
-        return new PublishmentType.Builder()
-                .name("Email")
-                .type(AlertEmailPublisher.class)
-                .description("Email alert publisher")
-                .field("subject")
-                .field("sender")
-                .field("recipients")
-                .build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
deleted file mode 100644
index 375a0da..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertFilePublisher.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *  <p/>
- *  http://www.apache.org/licenses/LICENSE-2.0
- *  <p/>
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
-import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import org.apache.eagle.common.DateTimeUtil;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.logging.*;
-
-public class AlertFilePublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
-
-    private Logger filelogger = Logger.getLogger(AlertFilePublisher.class.getName());
-    private FileHandler handler;
-    private static ObjectMapper objectMapper = new ObjectMapper();
-
-    private static final String DEFAULT_FILE_NAME = "eagle-alert.log";
-    private static final int DEFAULT_ROTATE_SIZE_KB = 1024;
-    private static final int DEFAULT_FILE_NUMBER = 5;
-
-    @Override
-    public void init(Config config, Publishment publishment, Map conf) throws Exception {
-        super.init(config, publishment, conf);
-
-        String fileName = DEFAULT_FILE_NAME;
-        int rotateSize = DEFAULT_ROTATE_SIZE_KB;
-        int numOfFiles = DEFAULT_FILE_NUMBER;
-        if (publishment.getProperties() != null) {
-            if (publishment.getProperties().containsKey(PublishConstants.FILE_NAME)) {
-                fileName = (String) publishment.getProperties().get(PublishConstants.FILE_NAME);
-            }
-            if (publishment.getProperties().containsKey(PublishConstants.ROTATE_EVERY_KB)) {
-                rotateSize = Integer.valueOf(publishment.getProperties().get(PublishConstants.ROTATE_EVERY_KB).toString());
-            }
-            if (publishment.getProperties().containsKey(PublishConstants.NUMBER_OF_FILES)) {
-                numOfFiles = Integer.valueOf(publishment.getProperties().get(PublishConstants.NUMBER_OF_FILES).toString());
-            }
-        }
-        handler = new FileHandler(fileName, rotateSize * 1024, numOfFiles, true);
-        handler.setFormatter(new AlertFileFormatter());
-        filelogger.addHandler(handler);
-        filelogger.setUseParentHandlers(false);
-    }
-
-    @Override
-    public PublishmentType getPluginType() {
-        return new PublishmentType.Builder()
-                .name("File")
-                .type(AlertFilePublisher.class)
-                .description("Local log file publisher")
-                .build();
-    }
-
-    class AlertFileFormatter extends Formatter {
-
-        @Override
-        public String format(LogRecord record) {
-            return String.format("%s %s\n", DateTimeUtil.millisecondsToHumanDateWithSeconds(record.getMillis()),
-                    record.getMessage());
-        }
-    }
-
-    @Override
-    public void onAlert(AlertStreamEvent event) throws Exception {
-        List<AlertStreamEvent> eventList = this.dedup(event);
-        if (eventList == null || eventList.isEmpty()) {
-            return;
-        }
-        for (AlertStreamEvent e : eventList) {
-            //filelogger.info(e.toString());
-            AlertPublishEvent alert = AlertPublishEvent.createAlertPublishEvent(e);
-            filelogger.info(objectMapper.writeValueAsString(alert));
-        }
-    }
-
-    @Override
-    public void close() {
-        if (handler != null) {
-            handler.close();
-        }
-    }
-
-    @Override
-    protected org.slf4j.Logger getLogger() {
-        return LoggerFactory.getLogger(AlertFilePublisher.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
deleted file mode 100644
index adac1aa..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
-import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-public class AlertKafkaPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class);
-    private static final long MAX_TIMEOUT_MS = 60000;
-
-    @SuppressWarnings("rawtypes")
-    private KafkaProducer producer;
-    private String brokerList;
-    private String topic;
-    private KafkaWriteMode mode = KafkaWriteMode.async;
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public void init(Config config, Publishment publishment, Map conf) throws Exception {
-        super.init(config, publishment, conf);
-
-        if (publishment.getProperties() != null) {
-            Map<String, Object> publishConfig = new HashMap<>(publishment.getProperties());
-            brokerList = ((String) publishConfig.get(PublishConstants.BROKER_LIST)).trim();
-            producer = KafkaProducerManager.INSTANCE.getProducer(brokerList, publishConfig);
-            topic = ((String) publishConfig.get(PublishConstants.TOPIC)).trim();
-            mode = KafkaProducerManager.INSTANCE.getKafkaWriteMode(publishConfig);
-        }
-    }
-
-    @Override
-    public void onAlert(AlertStreamEvent event) throws Exception {
-        if (producer == null) {
-            LOG.warn("KafkaProducer is null due to the incorrect configurations");
-            return;
-        }
-
-        this.emit(this.topic, this.dedup(event));
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public void update(String dedupIntervalMin, Map<String, Object> pluginProperties) {
-        deduplicator.setDedupIntervalMin(dedupIntervalMin);
-        String newBrokerList = ((String) pluginProperties.get(PublishConstants.BROKER_LIST)).trim();
-        String newTopic = ((String) pluginProperties.get(PublishConstants.TOPIC)).trim();
-        if (!newBrokerList.equals(this.brokerList)) {
-            if (producer != null) {
-                producer.close();
-            }
-            brokerList = newBrokerList;
-            KafkaProducer newProducer = null;
-            try {
-                newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList, pluginProperties);
-            } catch (Exception e) {
-                LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties);
-            }
-            producer = newProducer;
-        }
-        topic = newTopic;
-    }
-
-    @Override
-    public void close() {
-        producer.close();
-    }
-
-    @SuppressWarnings( {"rawtypes", "unchecked"})
-    protected void emit(String topic, List<AlertStreamEvent> outputEvents) {
-        // we need to check producer here since the producer is invisable to extended kafka publisher
-        if (producer == null) {
-            LOG.warn("KafkaProducer is null due to the incorrect configurations");
-            return;
-        }
-        if (outputEvents == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Alert stream events list in publishment is empty");
-            }
-            return;
-        }
-        this.status = new PublishStatus();
-        try {
-            for (AlertStreamEvent outputEvent : outputEvents) {
-                ProducerRecord record = createRecord(outputEvent, topic);
-                if (record == null) {
-                    LOG.error("Alert serialize return null, ignored message! ");
-                    return;
-                }
-                if (mode == KafkaWriteMode.sync) {
-                    Future<?> future = producer.send(record);
-                    future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-                    succeed(mode, "");
-                } else {
-                    producer.send(record, new Callback() {
-                        @Override
-                        public void onCompletion(RecordMetadata metadata, Exception exception) {
-                            if (exception != null) {
-                                failOnException(String.format("Failed to send message to %s, due to:%s",
-                                    brokerList, exception), exception);
-                                return;
-                            }
-                            succeed(mode, "");
-                        }
-                    });
-                }
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            failOnException(String.format("Failed to send message to %s, due to:%s", brokerList, e), e);
-        } catch (Exception ex) {
-            failOnException(String.format("Failed to send message to %s, due to:%s", brokerList, ex), ex);
-        }
-    }
-
-    private void failOnException(String message, Exception e) {
-        status.successful = false;
-        status.errorMessage = message;
-        LOG.error(status.errorMessage, e);
-    }
-
-    private void succeed(KafkaWriteMode mode, String message) {
-        status.successful = true;
-        status.errorMessage = "";
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Successfully send message to Kafka: {} in mode {}", brokerList, mode);
-        }
-    }
-
-    protected String getTopic() {
-        return this.topic;
-    }
-
-    private ProducerRecord<String, Object> createRecord(AlertStreamEvent event, String topic) throws Exception {
-        Object o = serialzeEvent(event);
-        if (o != null) {
-            ProducerRecord<String, Object> record = new ProducerRecord<>(topic, o);
-            return record;
-        } else {
-            return null;
-        }
-    }
-
-    private Object serialzeEvent(AlertStreamEvent event) {
-        return serializer.serialize(event);
-    }
-
-    @Override
-    protected Logger getLogger() {
-        return LOG;
-    }
-
-    @Override
-    public PublishmentType getPluginType() {
-        return new PublishmentType.Builder()
-                .name("Kafka")
-                .type(getClass())
-                .description("Kafka alert publisher")
-                .field("kafka_broker","localhost:9092")
-                .field("topic")
-                .build();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
deleted file mode 100644
index 957d356..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * @since on 5/11/16.
- */
-public class AlertPublishPluginsFactory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AlertPublishPluginsFactory.class);
-
-    @SuppressWarnings("rawtypes")
-    public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config, Map conf) {
-        AlertPublishPlugin plugin = null;
-        String publisherType = publishment.getType();
-        try {
-            plugin = (AlertPublishPlugin) Class.forName(publisherType).newInstance();
-            plugin.init(config, publishment, conf);
-        } catch (Exception ex) {
-            LOG.error("Error in loading AlertPublisherPlugin class: ", ex);
-            //throw new IllegalStateException(ex);
-        }
-        return plugin;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
deleted file mode 100644
index 5b902f9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-@SuppressWarnings("rawtypes")
-public class AlertPublisherImpl implements AlertPublisher {
-
-    private static final long serialVersionUID = 4809983246198138865L;
-    private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class);
-
-    private final String name;
-
-    private volatile Map<PublishPartition, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
-    private Config config;
-    private Map conf;
-
-    public AlertPublisherImpl(String name) {
-        this.name = name;
-    }
-
-    @Override
-    public void init(Config config, Map conf) {
-        this.config = config;
-        this.conf = conf;
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public void nextEvent(PublishPartition partition, AlertStreamEvent event) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(event.toString());
-        }
-        notifyAlert(partition, event);
-    }
-
-    private void notifyAlert(PublishPartition partition, AlertStreamEvent event) {
-        // remove the column values for publish plugin match
-        partition.getColumnValues().clear();
-        if (!publishPluginMapping.containsKey(partition)) {
-            LOG.warn("PublishPartition {} is not found in publish plugin map", partition);
-            return;
-        }
-        AlertPublishPlugin plugin = publishPluginMapping.get(partition);
-        if (plugin == null) {
-            LOG.warn("PublishPartition {} has problems while initializing publish plugin", partition);
-            return;
-        }
-        event.ensureAlertId();
-        try {
-            LOG.debug("Execute alert publisher {}", plugin.getClass().getCanonicalName());
-            plugin.onAlert(event);
-        } catch (Exception ex) {
-            LOG.error("Fail invoking publisher's onAlert, continue ", ex);
-        }
-    }
-
-    @Override
-    public void close() {
-        publishPluginMapping.values().forEach(plugin -> plugin.close());
-    }
-
-    @Override
-    public synchronized void onPublishChange(List<Publishment> added,
-                                             List<Publishment> removed,
-                                             List<Publishment> afterModified,
-                                             List<Publishment> beforeModified) {
-        if (added == null) {
-            added = new ArrayList<>();
-        }
-        if (removed == null) {
-            removed = new ArrayList<>();
-        }
-        if (afterModified == null) {
-            afterModified = new ArrayList<>();
-        }
-        if (beforeModified == null) {
-            beforeModified = new ArrayList<>();
-        }
-
-        if (afterModified.size() != beforeModified.size()) {
-            LOG.warn("beforeModified size != afterModified size");
-            return;
-        }
-
-        // copy and swap to avoid concurrency issue
-        Map<PublishPartition, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping);
-
-        // added
-        for (Publishment publishment : added) {
-            LOG.debug("OnPublishmentChange : add publishment : {} ", publishment);
-
-            AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
-            if (plugin != null) {
-                for (PublishPartition p : getPublishPartitions(publishment)) {
-                    newPublishMap.put(p, plugin);
-                }
-            } else {
-                LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment);
-            }
-        }
-        //removed
-        List<AlertPublishPlugin> toBeClosed = new ArrayList<>();
-        for (Publishment publishment : removed) {
-            AlertPublishPlugin plugin = null;
-            for (PublishPartition p : getPublishPartitions(publishment)) {
-                if (plugin == null) {
-                    plugin = newPublishMap.remove(p);
-                } else {
-                    newPublishMap.remove(p);
-                }
-            }
-            if (plugin != null) {
-                toBeClosed.add(plugin);
-            }
-        }
-        // updated
-        for (Publishment publishment : afterModified) {
-            // for updated publishment, need to init them too
-            AlertPublishPlugin newPlugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
-            if (newPlugin != null) {
-                AlertPublishPlugin plugin = null;
-                for (PublishPartition p : getPublishPartitions(publishment)) {
-                    if (plugin == null) {
-                        plugin = newPublishMap.get(p);
-                    }
-                    newPublishMap.put(p, newPlugin);
-                }
-                if (plugin != null) {
-                    toBeClosed.add(plugin);
-                }
-            } else {
-                LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment);
-            }
-        }
-
-        // now do the swap
-        publishPluginMapping = newPublishMap;
-
-        // safely close : it depend on plugin to check if want to wait all data to be flushed.
-        closePlugins(toBeClosed);
-    }
-
-    private Set<PublishPartition> getPublishPartitions(Publishment publish) {
-        List<String> streamIds = new ArrayList<>();
-        // add the publish to the bolt
-        if (publish.getStreamIds() == null || publish.getStreamIds().size() <= 0) {
-            streamIds.add(Publishment.STREAM_NAME_DEFAULT);
-        } else {
-            streamIds.addAll(publish.getStreamIds());
-        }
-        Set<PublishPartition> publishPartitions = new HashSet<>();
-        for (String streamId : streamIds) {
-            for (String policyId : publish.getPolicyIds()) {
-                publishPartitions.add(new PublishPartition(streamId, policyId, publish.getName(), publish.getPartitionColumns()));
-            }
-        }
-        return publishPartitions;
-    }
-
-    private void closePlugins(List<AlertPublishPlugin> toBeClosed) {
-        for (AlertPublishPlugin p : toBeClosed) {
-            try {
-                p.close();
-            } catch (Exception e) {
-                LOG.error(String.format("Error when close publish plugin {}!", p.getClass().getCanonicalName()), e);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
deleted file mode 100644
index 0d60246..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import com.typesafe.config.Config;
-import com.ullink.slack.simpleslackapi.SlackAttachment;
-import com.ullink.slack.simpleslackapi.SlackChannel;
-import com.ullink.slack.simpleslackapi.SlackSession;
-import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory;
-import org.apache.commons.lang.StringUtils;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPluginProvider;
-import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @since Sep 14, 2016.
- */
-public class AlertSlackPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider {
-    private static final Logger LOG = LoggerFactory.getLogger(AlertSlackPublisher.class);
-
-    private SlackSession session;
-    private String slackChannels;
-    private String severitys;
-
-    @Override
-    public void init(Config config, Publishment publishment, Map conf) throws Exception {
-        super.init(config, publishment, conf);
-
-        if (publishment.getProperties() != null) {
-            Map<String, Object> slackConfig = new HashMap<>(publishment.getProperties());
-            final String token = ((String) slackConfig.get(PublishConstants.TOKEN)).trim();
-            slackChannels = ((String) slackConfig.get(PublishConstants.CHANNELS)).trim();
-            severitys = ((String) slackConfig.get(PublishConstants.SEVERITYS)).trim();
-
-            if (StringUtils.isNotEmpty(token)) {
-                LOG.debug(" Creating Slack Session... ");
-                session = SlackSessionFactory.createWebSocketSlackSession(token);
-                session.connect();
-            }
-        }
-
-    }
-
-    @Override
-    public void onAlert(AlertStreamEvent event) throws Exception {
-        if (session == null) {
-            LOG.warn("Slack session is null due to incorrect configurations!");
-            return;
-        }
-        List<AlertStreamEvent> outputEvents = dedup(event);
-        if (outputEvents == null) {
-            return;
-        }
-
-        PublishStatus status = new PublishStatus();
-        for (AlertStreamEvent outputEvent: outputEvents) {
-            String message = "";
-            String severity = "";
-            String color = "";
-            // only user defined severity level alert will send to Slack;
-            boolean publishToSlack = false;
-
-            StreamDefinition streamDefinition = outputEvent.getSchema();
-            for (int i = 0; i < outputEvent.getData().length; i++) {
-                if (i > streamDefinition.getColumns().size()) {
-                    if (LOG.isWarnEnabled()) {
-                        LOG.warn("output column does not found for event data, this indicate code error!");
-                    }
-                    continue;
-                }
-                String colName = streamDefinition.getColumns().get(i).getName();
-                if (colName.equalsIgnoreCase("severity")) {
-                    severity = outputEvent.getData()[i].toString();
-                    publishToSlack = severitys.contains(severity);
-                }
-                if (colName.equalsIgnoreCase("message")) {
-                    message = outputEvent.getData()[i].toString();
-                }
-            }
-
-            if (publishToSlack) {
-                try {
-                    // get hex color code from severity
-                    switch (severity) {
-                        case "CRITICAL":
-                            color = "#dd3333"; //red
-                            break;
-                        case "WARNING":
-                            color = "#ffc04c"; //yellow
-                            break;
-                        default:
-                            color = "#439FE0"; //blue
-                            break;
-                    }
-
-                    // here to be generic, only publish message like "CRITICAL  port-1 is down" to Slack
-                    String messageToSlack = String.format("%s %s", severity, message);
-                    SlackAttachment attachment = new SlackAttachment();
-                    attachment.setColor(color);
-                    attachment.setText(messageToSlack);
-
-                    for (String slackChannel: slackChannels.split(",")) {
-                        sendMessageToAChannel(session, slackChannel, null, attachment);
-                    }
-                } catch (Exception e) {
-                    status.successful = false;
-                    status.errorMessage = String.format("Failed to send message to slack channel %s, due to:%s", slackChannels, e);
-                    LOG.error(status.errorMessage, e);
-                }
-            }
-        }
-
-    }
-
-    @Override
-    public void close() {
-        try {
-            session.disconnect();
-        } catch (IOException e) {
-            LOG.error(e.getMessage());
-        }
-    }
-
-    @Override
-    protected Logger getLogger() {
-        return LOG;
-    }
-
-    private void sendMessageToAChannel(SlackSession session, String channelName, String message, SlackAttachment attachment) {
-        //get a channel
-        SlackChannel channel = session.findChannelByName(channelName);
-        session.sendMessage(channel, message, attachment);
-    }
-
-    @Override
-    public PublishmentType getPluginType() {
-        return new PublishmentType.Builder()
-                .name("Slack")
-                .type(getClass())
-                .description("Slack alert publisher")
-                .field("token")
-                .field("channels")
-                .field("severitys")
-                .field("urltemplate")
-                .build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
deleted file mode 100644
index ac99db3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class DefaultDeduplicator implements AlertDeduplicator {
-
-    private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
-
-    private long dedupIntervalSec;
-    private List<String> customDedupFields = new ArrayList<>();
-    private String dedupStateField;
-    private String dedupStateCloseValue;
-
-    private DedupCache dedupCache;
-
-    private Cache<EventUniq, String> withoutStatesCache;
-
-    public DefaultDeduplicator() {
-        this.dedupIntervalSec = 0;
-    }
-
-    public DefaultDeduplicator(String intervalMin) {
-        setDedupIntervalMin(intervalMin);
-    }
-
-    public DefaultDeduplicator(long intervalMin) {
-        this.dedupIntervalSec = intervalMin;
-    }
-
-    public DefaultDeduplicator(String intervalMin, List<String> customDedupFields,
-                               String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) {
-        setDedupIntervalMin(intervalMin);
-        if (customDedupFields != null) {
-            this.customDedupFields = customDedupFields;
-        }
-        if (StringUtils.isNotBlank(dedupStateField)) {
-            this.dedupStateField = dedupStateField;
-        }
-        if (StringUtils.isNotBlank(dedupStateCloseValue)) {
-            this.dedupStateCloseValue = dedupStateCloseValue;
-        }
-        this.dedupCache = dedupCache;
-
-        withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite(
-            this.dedupIntervalSec, TimeUnit.SECONDS).build();
-    }
-
-    /*
-     * @param key
-     * @return
-     */
-    public List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) {
-        if (StringUtils.isBlank(stateFiledValue)) {
-            // without state field, we cannot determine whether it is duplicated
-            // without custom filed values, we cannot determine whether it is duplicated
-            synchronized (withoutStatesCache) {
-                if (withoutStatesCache != null && withoutStatesCache.getIfPresent(key) != null) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Alert event {} with key {} is skipped since it is duplicated", event, key);
-                    }
-                    return null;
-                } else if (withoutStatesCache != null) {
-                    withoutStatesCache.put(key, "");
-                }
-            }
-            return Arrays.asList(event);
-        }
-        return dedupCache.dedup(event, key, dedupStateField, stateFiledValue, dedupStateCloseValue);
-    }
-
-    public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
-        if (event == null) {
-            return null;
-        }
-        // check custom field, and get the field values
-        StreamDefinition streamDefinition = event.getSchema();
-        HashMap<String, String> customFieldValues = new HashMap<>();
-        String stateFiledValue = null;
-        for (int i = 0; i < event.getData().length; i++) {
-            if (i > streamDefinition.getColumns().size()) {
-                if (LOG.isWarnEnabled()) {
-                    LOG.warn("output column does not found for event data, this indicate code error!");
-                }
-                continue;
-            }
-            String colName = streamDefinition.getColumns().get(i).getName();
-            Object colValue = event.getData()[i];
-
-            if (colName.equals(dedupStateField) && colValue != null) {
-                stateFiledValue = colValue.toString();
-            }
-
-            // make all of the field as unique key if no custom dedup field provided
-            if (colValue != null) {
-                if (customDedupFields == null || customDedupFields.size() <= 0) {
-                    customFieldValues.put(colName, colValue.toString());
-                } else {
-                    for (String field : customDedupFields) {
-                        if (colName.equals(field)) {
-                            customFieldValues.put(field, colValue.toString());
-                            break;
-                        }
-                    }
-                }
-            }
-        }
-
-        List<AlertStreamEvent> outputEvents = checkDedup(event, new EventUniq(event.getStreamId(),
-            event.getPolicyId(), event.getCreatedTime(), customFieldValues), stateFiledValue);
-        if (outputEvents != null && outputEvents.size() > 0) {
-            return outputEvents;
-        } else if (LOG.isInfoEnabled()) {
-            LOG.info("Alert event is skipped because it's duplicated: {}", event.toString());
-        }
-        return null;
-    }
-
-    @Override
-    public void setDedupIntervalMin(String newDedupIntervalMin) {
-        if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
-            dedupIntervalSec = 0;
-            return;
-        }
-        try {
-            Period period = Period.parse(newDedupIntervalMin);
-            this.dedupIntervalSec = period.toStandardSeconds().getSeconds();
-        } catch (Exception e) {
-            LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e);
-            this.dedupIntervalSec = 0;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
deleted file mode 100644
index 511abcd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-/**
- *
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import com.google.common.base.Joiner;
-
-import java.util.HashMap;
-
-/**
- * @since Mar 19, 2015.
- */
-public class EventUniq {
-    public String streamId;
-    public String policyId;
-    public Long timestamp;     // event's createTimestamp
-    public long createdTime; // created time, for cache removal;
-    public HashMap<String, String> customFieldValues;
-    public boolean removable = false;
-
-    public EventUniq(String streamId, String policyId, long timestamp) {
-        this.streamId = streamId;
-        this.timestamp = timestamp;
-        this.policyId = policyId;
-        this.createdTime = System.currentTimeMillis();
-    }
-
-    public EventUniq(String streamId, String policyId, long timestamp, HashMap<String, String> customFieldValues) {
-        this.streamId = streamId;
-        this.timestamp = timestamp;
-        this.policyId = policyId;
-        this.createdTime = System.currentTimeMillis();
-        this.customFieldValues = customFieldValues;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof EventUniq) {
-            EventUniq au = (EventUniq) obj;
-            boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId);
-            if (this.customFieldValues != null && au.customFieldValues != null) {
-                result = result & this.customFieldValues.equals(au.customFieldValues);
-            }
-            return result;
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId);
-
-        if (customFieldValues != null) {
-            builder.append(customFieldValues);
-        }
-        return builder.build();
-    }
-
-    @Override
-    public String toString() {
-        return String.format("EventUniq[streamId: %s, policyId: %s, timestamp: %s, removable: %s, customFieldValues: %s]",
-            streamId, policyId, timestamp, removable, Joiner.on(",").withKeyValueSeparator(">").join(customFieldValues));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
deleted file mode 100644
index aa22884..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import org.apache.eagle.alert.engine.codec.IEventSerializer;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @since Jul 9, 2016.
- */
-public class JsonEventSerializer implements IEventSerializer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JsonEventSerializer.class);
-
-    @SuppressWarnings("rawtypes")
-    public JsonEventSerializer(Map stormConf) throws Exception {
-    }
-
-    @Override
-    public Object serialize(AlertStreamEvent event) {
-        String result = streamEventToJson(event);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("serialized alert event : {}", result);
-        }
-        return result;
-    }
-
-    public String streamEventToJson(AlertStreamEvent event) {
-        Map<String, Object> jsonMap = new HashMap<String, Object>();
-        jsonMap.put("policyId", event.getPolicyId());
-        jsonMap.put("streamId", event.getStreamId());
-        jsonMap.put("createBy", event.getCreatedBy());
-        jsonMap.put("createTime", event.getCreatedTime());
-        // data
-        int size = event.getData().length;
-        List<StreamColumn> columns = event.getSchema().getColumns();
-        for (int i = 0; i < size; i++) {
-            if (columns.size() < i) {
-                // redundant check to log inconsistency
-                LOG.error(" stream event data have different lenght compare to column definition! ");
-            } else {
-                jsonMap.put(columns.get(i).getName(), event.getData()[i]);
-            }
-        }
-        return JsonUtils.writeValueAsString(jsonMap);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
deleted file mode 100644
index 0c7f04c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * The producer is thread safe and sharing a single producer instance across threads will generally be faster than
- * having multiple instances.
- */
-public class KafkaProducerManager {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerManager.class);
-
-    private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
-    private static final String STRING_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
-
-    private static final String VALUE_DESERIALIZER = "value.deserializer";
-    private static final String VALUE_DESERIALIZER_UNDERSCORE = "value_deserializer";
-
-    private static final String VALUE_SERIALIZER = "value.serializer";
-    private static final String VALUE_SERIALIZER_UNDERSCORE = "value_serializer";
-
-    private static final String KEY_DESERIALIZER = "key.deserializer";
-    private static final String KEY_DESERIALIZER_UNDERSCORE = "key_deserializer";
-
-    private static final String KEY_SERIALIZER = "key.serializer";
-    private static final String KEY_SERIALIZER_UNDERSCORE = "key_serializer";
-
-    private static final String REQUEST_REQUIRED_ACKS = "request.required.acks";
-    private static final String REQUEST_REQUIRED_ACKS_UNDERSCORE = "request_required_acks";
-    // the producer gets an acknowledgement after the leader replica has received the data
-    private static final String REQUEST_REQUIRED_ACKS_DEFAULT = "1";
-
-    private static final String PRODUCER_TYPE = "producer.type";
-
-    private static final String KEY_KAFKA_PROPERTIES = "kafka_client_config";
-
-    private static final String KEY_KAFKA_PROPERTY_NAME = "name";
-    private static final String KEY_KAFKA_PROPERTY_VALUE = "value";
-
-    public static final KafkaProducerManager INSTANCE = new KafkaProducerManager();
-
-    public KafkaProducer<String, Object> getProducer(String brokerList, Map<String, Object> publishConfig) {
-        Properties configMap = new Properties();
-        configMap.put("bootstrap.servers", brokerList);
-        configMap.put("metadata.broker.list", brokerList);
-
-        // key serializer
-        if (publishConfig.containsKey(KEY_SERIALIZER_UNDERSCORE)) {
-            configMap.put(KEY_SERIALIZER, publishConfig.get(KEY_SERIALIZER_UNDERSCORE));
-        } else {
-            configMap.put(KEY_SERIALIZER, STRING_SERIALIZER);
-        }
-
-        if (publishConfig.containsKey(KEY_DESERIALIZER_UNDERSCORE)) {
-            configMap.put(KEY_DESERIALIZER, publishConfig.get(KEY_DESERIALIZER_UNDERSCORE));
-        } else {
-            configMap.put(KEY_DESERIALIZER, STRING_DESERIALIZER);
-        }
-
-        // value serializer
-        if (publishConfig.containsKey(VALUE_SERIALIZER_UNDERSCORE)) {
-            configMap.put(VALUE_SERIALIZER, publishConfig.get(VALUE_SERIALIZER_UNDERSCORE));
-        } else {
-            configMap.put(VALUE_SERIALIZER, STRING_SERIALIZER);
-        }
-        String requestRequiredAcks = REQUEST_REQUIRED_ACKS_DEFAULT;
-        if (publishConfig.containsKey(REQUEST_REQUIRED_ACKS_UNDERSCORE)) {
-            requestRequiredAcks = (String) publishConfig.get(REQUEST_REQUIRED_ACKS_UNDERSCORE);
-        }
-        configMap.put(REQUEST_REQUIRED_ACKS, requestRequiredAcks);
-
-        // value deserializer
-        if (publishConfig.containsKey(VALUE_DESERIALIZER_UNDERSCORE)) {
-            configMap.put(VALUE_DESERIALIZER, publishConfig.get(VALUE_DESERIALIZER_UNDERSCORE));
-        } else {
-            configMap.put(VALUE_DESERIALIZER, STRING_DESERIALIZER);
-        }
-        // kafka config will overwrite the config defined in publishment properties
-        if (publishConfig.containsKey(KEY_KAFKA_PROPERTIES)) {
-            Map<String, Object> kafkaProperties = getKafkaProperties(publishConfig.get(KEY_KAFKA_PROPERTIES));
-            kafkaProperties.forEach((k, v) -> configMap.put(k, v));
-        }
-
-        if (LOG.isInfoEnabled()) {
-            LOG.info(" given kafka config {}, create producer config map {}", publishConfig, configMap);
-        }
-
-        KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
-        return producer;
-    }
-
-    public KafkaWriteMode getKafkaWriteMode(Map<String, Object> publishConfig) {
-        if (publishConfig.containsKey(KEY_KAFKA_PROPERTIES)) {
-            return KafkaWriteMode.fromString((String) getKafkaProperty(publishConfig.get(KEY_KAFKA_PROPERTIES), PRODUCER_TYPE));
-        }
-        return KafkaWriteMode.async;
-    }
-
-    private Map<String, Object> getKafkaProperties(Object kafkaProperties) {
-        Map<String, Object> result = new HashMap<String, Object>();
-        try {
-            @SuppressWarnings("unchecked")
-            List<Map<String, Object>> tempKafkaProperties = (List<Map<String, Object>>) kafkaProperties;
-            if (tempKafkaProperties != null) {
-                tempKafkaProperties.forEach(one -> {
-                    if (one.containsKey(KEY_KAFKA_PROPERTY_NAME) && one.containsKey(KEY_KAFKA_PROPERTY_VALUE)) {
-                        result.put((String) one.get(KEY_KAFKA_PROPERTY_NAME), one.get(KEY_KAFKA_PROPERTY_VALUE));
-                    }
-                });
-            }
-        } catch (ClassCastException e) {
-            LOG.warn("fail to cast kafka properties", e);
-        }
-        return result;
-    }
-
-    private Object getKafkaProperty(Object kafkaProperties, String propertyName) {
-        return getKafkaProperties(kafkaProperties).get(propertyName);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaWriteMode.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaWriteMode.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaWriteMode.java
deleted file mode 100644
index 6e59474..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaWriteMode.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-public enum KafkaWriteMode {
-
-    sync, async;
-
-    public static KafkaWriteMode fromString(String mode) {
-        for (KafkaWriteMode one : KafkaWriteMode.values()) {
-            if (one.name().equalsIgnoreCase(mode)) {
-                return one;
-            }
-        }
-        // default mode is async
-        return async;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
deleted file mode 100644
index 44f902c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-/**
- * Object that holds the status of Notification Posted to Notification Plugin.
- */
-public class PublishStatus {
-    public boolean successful;
-    public String errorMessage;
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
deleted file mode 100644
index 5eb444d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import org.apache.eagle.alert.engine.codec.IEventSerializer;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-
-import java.util.Map;
-
-/**
- * @since Jun 3, 2016.
- */
-public class StringEventSerializer implements IEventSerializer {
-
-    @SuppressWarnings("rawtypes")
-    public StringEventSerializer(Map stormConf) {
-    }
-
-    @Override
-    public Object serialize(AlertStreamEvent event) {
-        return event.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java
deleted file mode 100644
index 1fe318e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.template;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class AlertContextFields {
-    public static final String STREAM_ID = "STREAM_ID";
-    public static final String SITE_ID = "SITE_ID";
-    public static final String ALERT_ID = "ALERT_ID";
-    public static final String CREATED_BY = "CREATED_BY";
-    public static final String POLICY_ID = "POLICY_ID";
-    public static final String CREATED_TIMESTAMP = "CREATED_TIMESTAMP";
-    public static final String CREATED_TIME = "CREATED_TIME";
-    public static final String ALERT_TIMESTAMP = "ALERT_TIMESTAMP";
-    public static final String ALERT_TIME = "ALERT_TIME";
-    public static final String ALERT_SCHEMA = "ALERT_SCHEMA";
-    public static final String ALERT_EVENT = "ALERT_EVENT";
-    public static final String POLICY_DESC = "POLICY_DESC";
-    public static final String POLICY_TYPE = "POLICY_TYPE";
-    public static final String POLICY_DEFINITION = "POLICY_DEFINITION";
-    public static final String POLICY_HANDLER = "POLICY_HANDLER";
-
-    public static List<String> getAllContextFields() {
-        return Arrays.asList(
-            STREAM_ID, ALERT_ID, CREATED_BY, POLICY_ID, CREATED_TIMESTAMP, CREATED_TIME, ALERT_TIMESTAMP, ALERT_TIME, ALERT_SCHEMA, POLICY_DESC, POLICY_TYPE, POLICY_DEFINITION, POLICY_HANDLER
-        );
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java
deleted file mode 100644
index 760ec7c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.template;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-import java.util.Collection;
-
-/**
- * Alert Template Engine.
- */
-public interface AlertTemplateEngine extends AlertStreamFilter {
-    /**
-     * Initialize AlertTemplateEngine with Config.
-     */
-    void init(Config config);
-
-    /**
-     * Register policy with definition.
-     */
-    void register(PolicyDefinition policyDefinition);
-
-    /**
-     * Register policy by policyId.
-     */
-    void unregister(String policyId);
-
-    /**
-     * @return registered policy definitions.
-     */
-    Collection<PolicyDefinition> getPolicies();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java
deleted file mode 100644
index 875facb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.template;
-
-public class AlertTemplateProvider {
-    public static AlertTemplateEngine createAlertTemplateEngine() {
-        return new VelocityAlertTemplateEngine();
-    }
-}
\ No newline at end of file


Mime
View raw message