eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [11/18] incubator-eagle git commit: EAGLE-324 Init branch-v0.5
Date Wed, 01 Jun 2016 06:00:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
new file mode 100755
index 0000000..8d8a4d2
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.impl;
+
+import java.util.Arrays;
+
+import org.apache.eagle.alert.engine.AlertStreamCollector;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+import backtype.storm.task.OutputCollector;
+
+public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
+    private final OutputCollector delegate;
+    private final Object outputLock;
+    private final StreamContext streamContext;
+
+    public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, Object outputLock, StreamContext streamContext){
+        this.delegate = outputCollector;
+        this.outputLock = outputLock;
+        this.streamContext = streamContext;
+    }
+
+    @Override
+    public void emit(AlertStreamEvent event) {
+        synchronized (outputLock) {
+            streamContext.counter().scope("alert_count").incr();
+            delegate.emit(Arrays.asList(event.getStreamId(), event));
+        }
+    }
+
+    @Override
+    public void flush() {
+        // do nothing
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
new file mode 100644
index 0000000..f063618
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.AlertStreamCollector;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
+    private static final long serialVersionUID = -5499413193675985288L;
+
+    private final static Logger LOG = LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class);
+
+    private AlertStreamCollector collector;
+    // mapping from policy name to PolicyDefinition
+    private volatile Map<String,PolicyDefinition> policyDefinitionMap = new HashMap<>();
+    // mapping from policy name to PolicyStreamHandler
+    private volatile Map<String,PolicyStreamHandler> policyStreamHandlerMap = new HashMap<>();
+    private String policyEvaluatorId;
+    private StreamContext context;
+
+    public PolicyGroupEvaluatorImpl(String policyEvaluatorId){
+        this.policyEvaluatorId = policyEvaluatorId;
+    }
+
+    public void init(StreamContext context, AlertStreamCollector collector) {
+        this.collector = collector;
+        this.policyStreamHandlerMap = new HashMap<>();
+        this.context = context;
+        Thread.currentThread().setName(policyEvaluatorId);
+    }
+
+    public void nextEvent(PartitionedEvent event) {
+        this.context.counter().scope("receive_count").incr();
+        dispatch(event);
+    }
+
+    @Override
+    public String getName() {
+        return this.policyEvaluatorId;
+    }
+
+    public void close() {
+        for(PolicyStreamHandler handler: policyStreamHandlerMap.values()){
+            try {
+                handler.close();
+            } catch (Exception e) {
+                LOG.error("Failed to close handler {}",handler.toString(),e);
+            }
+        }
+    }
+
+    /**
+     * fixme make selection of PolicyStreamHandler to be more efficient
+     * @param partitionedEvent PartitionedEvent
+     */
+    private void dispatch(PartitionedEvent partitionedEvent){
+        boolean handled = false;
+        for(Map.Entry<String,PolicyStreamHandler> policyStreamHandler: policyStreamHandlerMap.entrySet()){
+            if(isAcceptedByPolicy(partitionedEvent,policyDefinitionMap.get(policyStreamHandler.getKey()))){
+                try {
+                    handled = true;
+                    this.context.counter().scope("eval_count").incr();
+                    policyStreamHandler.getValue().send(partitionedEvent.getEvent());
+                } catch (Exception e) {
+                    this.context.counter().scope("fail_count").incr();
+                    LOG.error("{} failed to handle {}",policyStreamHandler.getValue(), partitionedEvent.getEvent());
+                }
+            }
+        }
+        if(!handled){
+            this.context.counter().scope("drop_count").incr();
+            LOG.warn("Drop stream non-matched event {}, which should not be sent to evaluator", partitionedEvent);
+        } else {
+            this.context.counter().scope("accept_count").incr();
+        }
+    }
+
+    private static boolean isAcceptedByPolicy(PartitionedEvent event, PolicyDefinition policy){
+        return policy.getInputStreams() != null
+                && policy.getInputStreams().contains(event.getEvent().getStreamId())
+                && policy.getPartitionSpec().contains(event.getPartition());
+    }
+
+    @Override
+    public void onPolicyChange(List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) {
+        Map<String,PolicyDefinition> copyPolicies = new HashMap<>(policyDefinitionMap);
+        Map<String,PolicyStreamHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap);
+        for(PolicyDefinition pd : added){
+            inplaceAdd(copyPolicies, copyHandlers, pd, sds);
+        }
+        for(PolicyDefinition pd : removed){
+            inplaceRemove(copyPolicies, copyHandlers, pd);
+        }
+        for(PolicyDefinition pd : modified){
+            inplaceRemove(copyPolicies, copyHandlers, pd);
+            inplaceAdd(copyPolicies, copyHandlers, pd, sds);
+        }
+
+        // logging
+        LOG.info("Policy metadata updated with added={}, removed={}, modified={}", added, removed, modified);
+
+        // switch reference
+        this.policyDefinitionMap = copyPolicies;
+        this.policyStreamHandlerMap = copyHandlers;
+    }
+
+    private void inplaceAdd(Map<String, PolicyDefinition> policies, Map<String, PolicyStreamHandler> handlers, PolicyDefinition policy, Map<String, StreamDefinition> sds) {
+        if(handlers.containsKey(policy.getName())){
+            LOG.error("metadata calculation error, try to add existing PolicyDefinition " + policy);
+        }else {
+            policies.put(policy.getName(), policy);
+            PolicyStreamHandler handler = PolicyStreamHandlers.createHandler(policy.getDefinition().type, sds);
+            try {
+                PolicyHandlerContext context = new PolicyHandlerContext();
+                context.setPolicyCounter(this.context.counter());
+                context.setPolicyDefinition(policy);
+                context.setParentEvaluator(this);
+                context.setPolicyEvaluatorId(policyEvaluatorId);
+                handler.prepare(collector, context);
+                handlers.put(policy.getName(), handler);
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+                policies.remove(policy.getName());
+                handlers.remove(policy.getName());
+            }
+        }
+    }
+
+    private void inplaceRemove(Map<String, PolicyDefinition> policies, Map<String, PolicyStreamHandler> handlers, PolicyDefinition policy)  {
+        if(handlers.containsKey(policy.getName())) {
+            PolicyStreamHandler handler = handlers.get(policy.getName());
+            try {
+                handler.close();
+            } catch (Exception e) {
+                LOG.error("Failed to close handler {}",handler,e);
+            }finally {
+                policies.remove(policy.getName());
+                handlers.remove(policy.getName());
+                LOG.info("Removed policy: {}",policy);
+            }
+        } else {
+            LOG.error("metadata calculation error, try to remove nonexisting PolicyDefinition: "+policy);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
new file mode 100644
index 0000000..5b83abb
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+
+import com.google.common.base.Preconditions;
+
+public class SiddhiDefinitionAdapter {
+    private final static Logger LOG = LoggerFactory.getLogger(SiddhiDefinitionAdapter.class);
+    public final static String DEFINE_STREAM_TEMPLATE =  "define stream %s ( %s );";
+
+    public static String buildStreamDefinition(StreamDefinition streamDefinition){
+        List<String> columns = new ArrayList<>();
+        Preconditions.checkNotNull(streamDefinition,"StreamDefinition is null");
+        if(streamDefinition.getColumns()!=null) {
+            for (StreamColumn column : streamDefinition.getColumns()) {
+                columns.add(String.format("%s %s", column.getName(), convertToSiddhiAttributeType(column.getType()).toString().toLowerCase()));
+            }
+        }else{
+            LOG.warn("No columns found for stream {}"+streamDefinition.getStreamId());
+        }
+        return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getStreamId(),StringUtils.join(columns,","));
+    }
+
+    public static Attribute.Type convertToSiddhiAttributeType(StreamColumn.Type type){
+        if(_EAGLE_SIDDHI_TYPE_MAPPING.containsKey(type)){
+            return _EAGLE_SIDDHI_TYPE_MAPPING.get(type);
+        }
+
+        throw new IllegalArgumentException("Unknown stream type: "+type);
+    }
+
+    public static Class<?> convertToJavaAttributeType(StreamColumn.Type type){
+        if(_EAGLE_JAVA_TYPE_MAPPING.containsKey(type)){
+            return _EAGLE_JAVA_TYPE_MAPPING.get(type);
+        }
+
+        throw new IllegalArgumentException("Unknown stream type: "+type);
+    }
+
+    public static StreamColumn.Type convertFromJavaAttributeType(Class<?> type){
+        if(_JAVA_EAGLE_TYPE_MAPPING.containsKey(type)){
+            return _JAVA_EAGLE_TYPE_MAPPING.get(type);
+        }
+
+        throw new IllegalArgumentException("Unknown stream type: "+type);
+    }
+
+    public static StreamColumn.Type convertFromSiddhiAttributeType(Attribute.Type type){
+        if(_SIDDHI_EAGLE_TYPE_MAPPING.containsKey(type)){
+            return _SIDDHI_EAGLE_TYPE_MAPPING.get(type);
+        }
+
+        throw new IllegalArgumentException("Unknown siddhi type: "+type);
+    }
+
+    /**
+     * public enum Type {
+     *   STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT
+     * }
+     */
+    private final static Map<StreamColumn.Type,Attribute.Type> _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>();
+    private final static Map<StreamColumn.Type,Class<?>> _EAGLE_JAVA_TYPE_MAPPING = new HashMap<>();
+    private final static Map<Class<?>,StreamColumn.Type> _JAVA_EAGLE_TYPE_MAPPING = new HashMap<>();
+    private final static Map<Attribute.Type,StreamColumn.Type> _SIDDHI_EAGLE_TYPE_MAPPING = new HashMap<>();
+
+    static {
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING,Attribute.Type.STRING);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT,Attribute.Type.INT);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG,Attribute.Type.LONG);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT,Attribute.Type.FLOAT);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE,Attribute.Type.DOUBLE);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL,Attribute.Type.BOOL);
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT,Attribute.Type.OBJECT);
+
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.STRING,String.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.INT,Integer.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.LONG,Long.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.FLOAT,Float.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE,Double.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.BOOL,Boolean.class);
+        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.OBJECT,Object.class);
+
+        _JAVA_EAGLE_TYPE_MAPPING.put(String.class,StreamColumn.Type.STRING);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Integer.class,StreamColumn.Type.INT);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Long.class,StreamColumn.Type.LONG);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Float.class,StreamColumn.Type.FLOAT);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Double.class,StreamColumn.Type.DOUBLE);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Boolean.class,StreamColumn.Type.BOOL);
+        _JAVA_EAGLE_TYPE_MAPPING.put(Object.class,StreamColumn.Type.OBJECT);
+
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.STRING,StreamColumn.Type.STRING);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.INT,StreamColumn.Type.INT);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.LONG,StreamColumn.Type.LONG);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.FLOAT,StreamColumn.Type.FLOAT);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.DOUBLE,StreamColumn.Type.DOUBLE);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.BOOL,StreamColumn.Type.BOOL);
+        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.OBJECT,StreamColumn.Type.OBJECT);
+    }
+
+    public static StreamDefinition convertFromSiddiDefinition(AbstractDefinition siddhiDefinition){
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setStreamId(siddhiDefinition.getId());
+        List<StreamColumn> columns = new ArrayList<>(siddhiDefinition.getAttributeNameArray().length);
+        for(Attribute attribute:siddhiDefinition.getAttributeList()){
+            StreamColumn column = new StreamColumn();
+            column.setType(convertFromSiddhiAttributeType(attribute.getType()));
+            column.setName(attribute.getName());
+            columns.add(column);
+        }
+        streamDefinition.setColumns(columns);
+        streamDefinition.setTimeseries(true);
+        streamDefinition.setDescription("Auto-generated stream schema from siddhi for "+siddhiDefinition.getId());
+        return streamDefinition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
new file mode 100755
index 0000000..dfa5612
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.impl;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+public class SiddhiPolicyHandler implements PolicyStreamHandler {
+    private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyHandler.class);
+    private ExecutionPlanRuntime executionRuntime;
+    private SiddhiManager siddhiManager;
+    private Map<String, StreamDefinition> sds;
+    private PolicyDefinition policy;
+    private PolicyHandlerContext context;
+
+    public SiddhiPolicyHandler(Map<String, StreamDefinition> sds){
+        this.sds = sds;
+    }
+
+    private static String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamDefinitionNotFoundException {
+        StringBuilder builder = new StringBuilder();
+        for(String inputStream:policyDefinition.getInputStreams()) {
+            builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
+            builder.append("\n");
+        }
+        builder.append(policyDefinition.getDefinition().value);
+        if(LOG.isDebugEnabled()) LOG.debug("Generated siddhi execution plan: {} from policy: {}", builder.toString(),policyDefinition);
+        return builder.toString();
+    }
+
+    private static class AlertStreamCallback extends StreamCallback{
+        private final String outputStream;
+        private final Collector<AlertStreamEvent> collector;
+        private final PolicyHandlerContext context;
+        private final StreamDefinition definition;
+
+        public AlertStreamCallback(String outputStream, StreamDefinition streamDefinition, Collector<AlertStreamEvent> collector, PolicyHandlerContext context){
+            this.outputStream = outputStream;
+            this.collector = collector;
+            this.context = context;
+            this.definition = streamDefinition;
+        }
+
+        /**
+         * Possibly more than one event will be triggered for alerting
+         * @param events
+         */
+        @Override
+        public void receive(Event[] events) {
+            LOG.info("Generated {} alerts from policy '{}' in {}", events.length,context.getPolicyDefinition().getName(), context.getPolicyEvaluatorId());
+            for(Event e : events) {
+                AlertStreamEvent event = new AlertStreamEvent();
+                event.setTimestamp(e.getTimestamp());
+                event.setData(e.getData());
+                event.setStreamId(outputStream);
+                event.setPolicy(context.getPolicyDefinition());
+                if (this.context.getParentEvaluator() != null) {
+                    event.setCreatedBy(context.getParentEvaluator().getName());
+                }
+                event.setCreatedTime(System.currentTimeMillis());
+                event.setSchema(definition);
+                if(LOG.isDebugEnabled())
+                    LOG.debug("Generate new alert event: {}", event);
+                collector.emit(event);
+            }
+            context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"alert_count")).incrBy(events.length);
+        }
+    }
+
+    @Override
+    public void prepare(final Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+        LOG.info("Initializing handler for policy {}: {}",context.getPolicyDefinition(),this);
+        this.policy = context.getPolicyDefinition();
+        this.siddhiManager = new SiddhiManager();
+        String plan = generateExecutionPlan(policy, sds);
+        try {
+            this.executionRuntime = siddhiManager.createExecutionPlanRuntime(plan);
+            LOG.info("Created siddhi runtime {}",executionRuntime.getName());
+        }catch (Exception parserException){
+            LOG.error("Failed to create siddhi runtime for policy: {}, siddhi plan: \n\n{}\n",context.getPolicyDefinition().getName(),plan,parserException);
+            throw parserException;
+        }
+        for(final String outputStream:policy.getOutputStreams()){
+            if(executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) {
+                this.executionRuntime.addCallback(outputStream,
+                        new AlertStreamCallback(
+                        outputStream, SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream))
+                        ,collector, context));
+            } else {
+                throw new IllegalStateException("Undefined output stream "+outputStream);
+            }
+        }
+        this.executionRuntime.start();
+        this.context = context;
+        LOG.info("Initialized policy handler for policy: {}",policy.getName());
+    }
+
+    public void send(StreamEvent event) throws Exception {
+        context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"receive_count")).incr();
+        String streamId = event.getStreamId();
+        InputHandler inputHandler = executionRuntime.getInputHandler(streamId);
+        if(inputHandler != null){
+            context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"eval_count")).incr();
+            inputHandler.send(event.getTimestamp(),event.getData());
+        }else{
+            context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"drop_count")).incr();
+            LOG.warn("No input handler found for stream {}",streamId);
+        }
+    }
+
+    public void close() {
+        LOG.info("Closing handler for policy {}",this.policy.getName());
+        this.executionRuntime.shutdown();
+        LOG.info("Shutdown siddhi runtime {}",this.executionRuntime.getName());
+        this.siddhiManager.shutdown();
+        LOG.info("Shutdown siddhi manager {}",this.siddhiManager);
+        LOG.info("Closed handler for policy {}",this.policy.getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
new file mode 100644
index 0000000..b894cc0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+/**
+ * Dedup Eagle entities.
+ */
+public interface AlertDeduplicator {
+
+	AlertStreamEvent dedup(AlertStreamEvent event);
+
+	void setDedupIntervalMin(String intervalMin);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
new file mode 100644
index 0000000..8f1c248
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
@@ -0,0 +1,28 @@
+package org.apache.eagle.alert.engine.publisher;
+
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface AlertPublishListener {
+    void onPublishChange(List<Publishment> added,
+                         List<Publishment> removed,
+                         List<Publishment> afterModified,
+                         List<Publishment> beforeModified);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
new file mode 100644
index 0000000..644fe2b
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.impl.PublishStatus;
+
+import com.typesafe.config.Config;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * Created on 2/10/16.
+ * Notification Plug-in interface which provide abstraction layer to notify to different system
+ */
+public interface AlertPublishPlugin extends Closeable {
+    /**
+     * for initialization
+     * @throws Exception
+     */
+    void init(Config config, Publishment publishment) throws Exception;
+
+    void update(String dedupIntervalMin, Map<String, String> pluginProperties);
+
+    void close();
+
+    void onAlert(AlertStreamEvent event) throws Exception;
+
+    AlertStreamEvent dedup(AlertStreamEvent event);
+
+    PublishStatus getStatus();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
new file mode 100644
index 0000000..fc1fc28
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
@@ -0,0 +1,26 @@
+package org.apache.eagle.alert.engine.publisher;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.PublishSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface AlertPublishSpecListener {
+    void onAlertPublishSpecChange(PublishSpec spec, Map<String, StreamDefinition> sds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
new file mode 100644
index 0000000..7a44009
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
@@ -0,0 +1,31 @@
+package org.apache.eagle.alert.engine.publisher;
+
+
+import java.io.Serializable;
+
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+import com.typesafe.config.Config;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface AlertPublisher extends AlertPublishListener, Serializable {
+    void init(Config config);
+    String getName();
+    void nextEvent(AlertStreamEvent event);
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
new file mode 100644
index 0000000..91c9296
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
@@ -0,0 +1,33 @@
+package org.apache.eagle.alert.engine.publisher;
+
+import java.io.Serializable;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface AlertSink extends Serializable {
+    /**
+     *
+     * @throws Exception
+     */
+    void open() throws Exception;
+
+    /**
+     *
+     * @throws Exception
+     */
+    void close() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
new file mode 100644
index 0000000..54a9afa
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher;
+
+public class PublishConstants {
+    public static final String NOTIFICATION_TYPE = "type";
+    public static final String EMAIL_NOTIFICATION = "email";
+    public static final String KAFKA_STORE = "kafka";
+    public static final String EAGLE_STORE = "eagleStore";
+
+    // email specific constants
+    public static final String SUBJECT = "subject";
+    public static final String SENDER = "sender";
+    public static final String RECIPIENTS = "recipients";
+    public static final String TEMPLATE = "template";
+
+    // kafka specific constants
+    public static final String TOPIC = "topic";
+    public static final String BROKER_LIST = "kafka_broker";
+
+    public static final String ALERT_EMAIL_TIME_PROPERTY = "timestamp";
+    public static final String ALERT_EMAIL_COUNT_PROPERTY = "count";
+    public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList";
+    public static final String ALERT_EMAIL_ORIGIN_PROPERTY = "alertEmailOrigin";
+
+    public static final String ALERT_EMAIL_MESSAGE = "alertMessage";
+    public static final String ALERT_EMAIL_STREAM = "streamId";
+    public static final String ALERT_EMAIL_TIMESTAMP = "alertTime";
+    public static final String ALERT_EMAIL_POLICY = "policyId";
+    public static final String ALERT_EMAIL_CREATOR = "creator";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
new file mode 100644
index 0000000..7d31bb5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.email;
+
+public class AlertEmailConstants {
+
+    public static final String MAIL_AUTH = "mail.smtp.auth";
+    public static final String MAIL_HOST = "mail.smtp.host";
+    public static final String MAIL_PORT = "mail.smtp.port";
+
+    public static final String CONN_PLAINTEXT = "plaintext";
+    public static final String CONN_TLS = "tls";
+    public static final String CONN_SSL = "ssl";
+
+    public static final String CONF_MAIL_SERVER = "smtp.server";
+    public static final String CONF_MAIL_PORT = "smtp.port";
+    public static final String CONF_MAIL_CONN = "connection";
+    public static final String CONF_MAIL_DEBUG = "mailDebug";
+    public static final String CONF_MAIL_AUTH = "smtp.auth.enable";
+    public static final String CONF_AUTH_USER = "auth.username";
+    public static final String CONF_AUTH_PASSWORD = "auth.password";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
new file mode 100644
index 0000000..8723283
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.email;
+
+import java.util.Map;
+
+/**
+ * alert email bean
+ * one email consists of a list of email component
+ */
+public class AlertEmailContext {
+    private Map<String, String> alertContext;
+    private String sender;
+    private String subject;
+    private String recipients;
+    private String velocityTplFile;
+    private String cc;
+
+    public Map<String, String> getAlertContext() {
+        return alertContext;
+    }
+
+    public void setAlertContext(Map<String, String> alertContext) {
+        this.alertContext = alertContext;
+    }
+    public String getVelocityTplFile() {
+        return velocityTplFile;
+    }
+    public void setVelocityTplFile(String velocityTplFile) {
+        this.velocityTplFile = velocityTplFile;
+    }
+    public String getRecipients() {
+        return recipients;
+    }
+    public void setRecipients(String recipients) {
+        this.recipients = recipients;
+    }
+    public String getSender() {
+        return sender;
+    }
+    public void setSender(String sender) {
+        this.sender = sender;
+    }
+    public String getSubject() {
+        return subject;
+    }
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+    public String getCc() {
+        return cc;
+    }
+    public void setCc(String cc) {
+        this.cc = cc;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
new file mode 100644
index 0000000..0c68629
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+/**
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.email;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertEmailGenerator {
+    private String tplFile;
+    private String sender;
+    private String recipients;
+    private String subject;
+    private Map<String, String> properties;
+
+    private ThreadPoolExecutor executorPool;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
+
+    private final static long MAX_TIMEOUT_MS =60000;
+
+    public boolean sendAlertEmail(AlertStreamEvent entity) {
+        return sendAlertEmail(entity, recipients, null);
+    }
+
+    public boolean sendAlertEmail(AlertStreamEvent entity, String recipients) {
+        return sendAlertEmail(entity, recipients, null);
+    }
+
+    public boolean sendAlertEmail(AlertStreamEvent event, String recipients, String cc) {
+        AlertEmailContext email = new AlertEmailContext();
+        Map<String, String> alertContext = buildAlertContext(event);
+        email.setAlertContext(alertContext);
+        email.setVelocityTplFile(tplFile);
+        email.setSubject(subject);
+        email.setSender(sender);
+        email.setRecipients(recipients);
+        email.setCc(cc);
+        
+        /** asynchronized email sending */
+        AlertEmailSender thread = new AlertEmailSender(email, properties);
+
+        if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
+
+        LOG.info("Sending email  in asynchronous to: " + recipients + ", cc: " + cc);
+        Future<?> future = this.executorPool.submit(thread);
+        Boolean status;
+        try {
+            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            status = true;
+            //LOG.info(String.format("Successfully send email to %s", recipients));
+        } catch (InterruptedException | ExecutionException e) {
+            status = false;
+            LOG.error(String.format("Failed to send email to %s, due to:%s", recipients, e),e);
+        } catch (TimeoutException e) {
+            status = false;
+            LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ", recipients, MAX_TIMEOUT_MS),e);
+        }
+        return status;
+    }
+
+    private Map<String, String> buildAlertContext(AlertStreamEvent event) {
+        Map<String, String> alertContext = new HashMap<>();
+        alertContext.put(PublishConstants.ALERT_EMAIL_MESSAGE, event.toString());
+        alertContext.put(PublishConstants.ALERT_EMAIL_POLICY, event.getPolicyId());
+        alertContext.put(PublishConstants.ALERT_EMAIL_TIMESTAMP, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
+        alertContext.put(PublishConstants.ALERT_EMAIL_STREAM, event.getStreamId());
+        alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, event.getCreatedBy());
+        return alertContext;
+    }
+
+    public String getTplFile() {
+        return tplFile;
+    }
+
+    public void setTplFile(String tplFile) {
+        this.tplFile = tplFile;
+    }
+
+    public String getSender() {
+        return sender;
+    }
+
+    public void setSender(String sender) {
+        this.sender = sender;
+    }
+
+    public String getRecipients() {
+        return recipients;
+    }
+
+    public void setRecipients(String recipients) {
+        this.recipients = recipients;
+    }
+
+    public String getSubject() {
+        return subject;
+    }
+
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+    public void setExecutorPool(ThreadPoolExecutor executorPool) {
+        this.executorPool = executorPool;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
new file mode 100644
index 0000000..e9635dd
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.email;
+
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+
+public class AlertEmailGeneratorBuilder {
+    private AlertEmailGenerator generator;
+    private AlertEmailGeneratorBuilder(){
+        generator = new AlertEmailGenerator();
+    }
+    public static AlertEmailGeneratorBuilder newBuilder(){
+        return new AlertEmailGeneratorBuilder();
+    }
+    public AlertEmailGeneratorBuilder withSubject(String subject){
+        generator.setSubject(subject);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withSender(String sender){
+        generator.setSender(sender);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withRecipients(String recipients){
+        generator.setRecipients(recipients);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withTplFile(String tplFile){
+        generator.setTplFile(tplFile);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withMailProps(Map<String, String> mailProps) {
+        generator.setProperties(mailProps);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
+        generator.setExecutorPool(threadPoolExecutor);
+        return this;
+    }
+
+    public AlertEmailGenerator build(){
+        return this.generator;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
new file mode 100644
index 0000000..d0e5cf6
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.email;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.velocity.VelocityContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertEmailSender implements Runnable {
+
+    protected final List<Map<String, String>> alertContexts = new ArrayList<Map<String, String>>();
+    protected final String configFileName;
+    protected final String subject;
+    protected final String sender;
+    protected final String recipients;
+    protected final String cc;
+    protected final String origin;
+    protected boolean sentSuccessfully = false;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class);
+    private final static int MAX_RETRY_COUNT = 3;
+
+
+
+    private Map<String, String> mailProps;
+
+
+    private String threadName;
+    /**
+     * Derived class may have some additional context properties to add
+     * @param context velocity context
+     * @param env environment
+     */
+    protected void additionalContext(VelocityContext context, String env) {
+        // By default there's no additional context added
+    }
+
+    public AlertEmailSender(AlertEmailContext alertEmail){
+        this.recipients = alertEmail.getRecipients();
+        this.configFileName = alertEmail.getVelocityTplFile();
+        this.subject = alertEmail.getSubject();
+        this.sender = alertEmail.getSender();
+        this.cc = alertEmail.getCc();
+
+        this.alertContexts.add(alertEmail.getAlertContext());
+        String tmp = ManagementFactory.getRuntimeMXBean().getName();
+        this.origin = tmp.split("@")[1] + "(pid:" + tmp.split("@")[0] + ")";
+        threadName = Thread.currentThread().getName();
+        LOG.info("Initialized "+threadName+": origin is : " + this.origin+", recipient of the email: " + this.recipients +", velocity TPL file: " + this.configFileName);
+    }
+
+    public AlertEmailSender(AlertEmailContext alertEmail, Map<String, String> mailProps){
+        this(alertEmail);
+        this.mailProps = mailProps;
+    }
+
+    private Properties parseMailClientConfig(Map<String, String> mailProps) {
+        if (mailProps == null) return null;
+        Properties props = new Properties();
+        String mailHost = mailProps.get(AlertEmailConstants.CONF_MAIL_SERVER);
+        String mailPort = mailProps.get(AlertEmailConstants.CONF_MAIL_PORT);
+        if (mailHost == null || mailPort == null || mailHost.isEmpty()) {
+            LOG.warn("SMTP server is unset, will exit");
+            return null;
+        }
+        props.put(AlertEmailConstants.MAIL_HOST, mailHost);
+        props.put(AlertEmailConstants.MAIL_PORT, mailPort);
+
+        String smtpAuth = mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_AUTH, "false");
+        props.put(AlertEmailConstants.MAIL_AUTH, smtpAuth);
+        if (Boolean.parseBoolean(smtpAuth)) {
+            props.put(AlertEmailConstants.CONF_AUTH_USER, mailProps.get(AlertEmailConstants.CONF_AUTH_USER));
+            props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailProps.get(AlertEmailConstants.CONF_AUTH_PASSWORD));
+        }
+
+        String smtpConn = mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_CONN, AlertEmailConstants.CONN_PLAINTEXT);
+        if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_TLS)) {
+            props.put("mail.smtp.starttls.enable", "true");
+        }
+        if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) {
+            props.put("mail.smtp.socketFactory.port", "465");
+            props.put("mail.smtp.socketFactory.class",
+                    "javax.net.ssl.SSLSocketFactory");
+        }
+        props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_DEBUG, "false"));
+        return props;
+    }
+
+    @Override
+    public void run() {
+        int count = 0;
+        boolean success = false;
+        while(count++ < MAX_RETRY_COUNT && !success){
+            LOG.info("Sending email, tried: " + count+", max: " + MAX_RETRY_COUNT);
+            try {
+                final EagleMailClient client;
+                if (mailProps != null) {
+                    Properties props = parseMailClientConfig(mailProps);
+                    client = new EagleMailClient(props);
+                }
+                else {
+                    client = new EagleMailClient();
+                }
+
+                final VelocityContext context = new VelocityContext();
+                generateCommonContext(context);
+                LOG.info("After calling generateCommonContext...");
+
+                if (recipients == null || recipients.equals("")) {
+                    LOG.error("Recipients is null, skip sending emails ");
+                    return;
+                }
+                String title = subject;
+
+                success = client.send(sender, recipients, cc, title, configFileName, context, null);
+                LOG.info("Success of sending email: " + success);
+                if(!success && count < MAX_RETRY_COUNT) {
+                    LOG.info("Sleep for a while before retrying");
+                    Thread.sleep(10 * 1000);
+                }
+            }
+            catch (Exception e){
+                LOG.warn("Sending mail exception", e);
+            }
+        }
+        if (success) {
+            sentSuccessfully = true;
+            LOG.info(String.format("Successfully send email, thread: %s", threadName));
+        } else{
+            LOG.warn(String.format("Fail sending email after tries %s times, thread: %s", MAX_RETRY_COUNT, threadName));
+        }
+    }
+
+    private void generateCommonContext(VelocityContext context) {
+        context.put(PublishConstants.ALERT_EMAIL_TIME_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
+        context.put(PublishConstants.ALERT_EMAIL_COUNT_PROPERTY, alertContexts.size());
+        context.put(PublishConstants.ALERT_EMAIL_ALERTLIST_PROPERTY, alertContexts);
+        context.put(PublishConstants.ALERT_EMAIL_ORIGIN_PROPERTY, origin);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
new file mode 100755
index 0000000..61194a2
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.email;
+
+import java.io.File;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
+import javax.activation.FileDataSource;
+import javax.mail.Authenticator;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Multipart;
+import javax.mail.PasswordAuthentication;
+import javax.mail.Session;
+import javax.mail.Transport;
+import javax.mail.internet.AddressException;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeBodyPart;
+import javax.mail.internet.MimeMessage;
+import javax.mail.internet.MimeMultipart;
+
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.VelocityEngine;
+import org.apache.velocity.exception.ResourceNotFoundException;
+import org.apache.velocity.runtime.RuntimeConstants;
+import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EagleMailClient {
+//	private static final String CONFIG_FILE = "config.properties";
+	private static final String BASE_PATH = "templates/";
+
+	private VelocityEngine velocityEngine;
+	private Session session;
+	private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class);
+
+	public EagleMailClient() {
+		this(new Properties());
+	}
+	
+	public EagleMailClient(final Properties config) {
+		try {
+			velocityEngine = new VelocityEngine();
+			velocityEngine.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
+			velocityEngine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
+			velocityEngine.init();
+
+			config.put("mail.transport.protocol", "smtp");
+			if(Boolean.parseBoolean(config.getProperty(AlertEmailConstants.CONF_MAIL_AUTH))){
+				session = Session.getDefaultInstance(config, new Authenticator() {
+					protected PasswordAuthentication getPasswordAuthentication() {
+						return new PasswordAuthentication(config.getProperty(AlertEmailConstants.CONF_AUTH_USER), config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD));
+					}
+				});
+			}
+			else session = Session.getDefaultInstance(config, new Authenticator() {});
+			final String debugMode =  config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false");
+			final boolean debug =  Boolean.parseBoolean(debugMode);
+			session.setDebug(debug);
+		} catch (Exception e) {
+            LOG.error("Failed connect to smtp server",e);
+		}
+	}
+
+	private boolean _send(String from, String to, String cc, String title,
+			String content) {
+		Message msg = new MimeMessage(session);
+		try {
+			msg.setFrom(new InternetAddress(from));
+			msg.setSubject(title);
+			if (to != null) {
+				msg.setRecipients(Message.RecipientType.TO,
+						InternetAddress.parse(to));
+			}
+			if (cc != null) {
+				msg.setRecipients(Message.RecipientType.CC,
+						InternetAddress.parse(cc));
+			}
+			//msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
+			msg.setContent(content, "text/html;charset=utf-8");
+			LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title));
+			Transport.send(msg);
+			return true;
+		} catch (AddressException e) {
+			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			return false;
+		} catch (MessagingException e) {
+			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			return false;
+		}
+	}
+
+	private boolean _send(String from,String to,String cc,String title,String content,List<MimeBodyPart> attachments){
+		MimeMessage  mail = new MimeMessage(session);
+		try {
+			mail.setFrom(new InternetAddress(from));
+			mail.setSubject(title);
+			if (to != null) {
+				mail.setRecipients(Message.RecipientType.TO,
+						InternetAddress.parse(to));
+			}
+			if (cc != null) {
+				mail.setRecipients(Message.RecipientType.CC,
+						InternetAddress.parse(cc));
+			}
+			
+			//mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
+
+			MimeBodyPart mimeBodyPart = new MimeBodyPart();
+			mimeBodyPart.setContent(content,"text/html;charset=utf-8");
+
+			Multipart  multipart = new MimeMultipart();
+			multipart.addBodyPart(mimeBodyPart);
+
+			for(MimeBodyPart attachment:attachments){
+				multipart.addBodyPart(attachment);
+			}
+
+			mail.setContent(multipart);
+//			mail.setContent(content, "text/html;charset=utf-8");
+			LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title));
+			Transport.send(mail);
+			return true;
+		} catch (AddressException e) {
+			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			return false;
+		} catch (MessagingException e) {
+			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			return false;
+		}
+	}
+
+	public boolean send(String from, String to, String cc, String title,
+			String content) {
+		return this._send(from, to, cc, title, content);
+	}
+
+	public boolean send(String from, String to, String cc, String title,
+			String templatePath, VelocityContext context) {
+		Template t = null;
+		try {
+			t = velocityEngine.getTemplate(BASE_PATH + templatePath);
+		} catch (ResourceNotFoundException ex) {
+		}
+		if (t == null) {
+			try {
+				t = velocityEngine.getTemplate(templatePath);
+			} catch (ResourceNotFoundException e) {
+				t = velocityEngine.getTemplate("/" + templatePath);
+			}
+		}
+		final StringWriter writer = new StringWriter();
+		t.merge(context, writer);
+		if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
+		return this.send(from, to, cc, title, writer.toString());
+	}
+
+	public boolean send(String from, String to, String cc, String title,
+	                    String templatePath, VelocityContext context, Map<String,File> attachments) {
+		if (attachments == null || attachments.isEmpty()) {
+			return send(from, to, cc, title, templatePath, context);
+		}
+		Template t = null;
+
+		List<MimeBodyPart> mimeBodyParts = new ArrayList<MimeBodyPart>();
+		Map<String,String> cid = new HashMap<String,String>();
+
+		for (Map.Entry<String,File> entry : attachments.entrySet()) {
+			final String attachment = entry.getKey();
+			final File attachmentFile  = entry.getValue();
+			final MimeBodyPart mimeBodyPart = new MimeBodyPart();
+			if(attachmentFile !=null && attachmentFile.exists()){
+				DataSource source = new FileDataSource(attachmentFile);
+				try {
+					mimeBodyPart.setDataHandler(new DataHandler(source));
+					mimeBodyPart.setFileName(attachment);
+					mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT);
+					mimeBodyPart.setContentID(attachment);
+					cid.put(attachment,mimeBodyPart.getContentID());
+					mimeBodyParts.add(mimeBodyPart);
+				} catch (MessagingException e) {
+					LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(), e);
+				}
+			}else{
+				LOG.error("Attachment: " + attachment + " is null or not exists");
+			}
+		}
+		//TODO remove cid, because not used at all
+		if(LOG.isDebugEnabled()) LOG.debug("Cid maps: "+cid);
+		context.put("cid", cid);
+
+		try {
+			t = velocityEngine.getTemplate(BASE_PATH + templatePath);
+		} catch (ResourceNotFoundException ex) {
+//			LOGGER.error("Template not found:"+BASE_PATH + templatePath, ex);
+		}
+
+		if (t == null) {
+			try {
+				t = velocityEngine.getTemplate(templatePath);
+			} catch (ResourceNotFoundException e) {
+				try {
+					t = velocityEngine.getTemplate("/" + templatePath);
+				}
+				catch (Exception ex) {
+					LOG.error("Template not found:"+ "/" + templatePath, ex);
+				}
+			}
+		}
+
+		final StringWriter writer = new StringWriter();
+		t.merge(context, writer);
+		if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
+		return this._send(from, to, cc, title, writer.toString(), mimeBodyParts);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
new file mode 100644
index 0000000..2a4e332
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.List;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * Alert API entity Persistor
+ */
+public class AlertEagleStorePersister {
+	private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePersister.class);
+	private Config config;
+
+	public AlertEagleStorePersister(Config config) {
+		this.config = config;
+	}
+
+	/**
+	 * Persist passes list of Entities
+	 * @param list
+	 * @return
+     */
+	public boolean doPersist(List<? extends StreamEvent> list) {
+		if (list.isEmpty()) return false;
+		LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
+		try {
+			IMetadataServiceClient client = new MetadataServiceClientImpl(config);
+			// TODO: metadata service support
+		}
+		catch (Exception ex) {
+			LOG.error("Got an exception in persisting entities", ex);
+			return false;
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
new file mode 100644
index 0000000..807aacc
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * Plugin to persist alerts to Eagle Storage
+ */
+public class AlertEagleStorePublisher implements AlertPublishPlugin {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePublisher.class);
+    private PublishStatus status;
+    private AlertEagleStorePersister persist;
+    private AlertDeduplicator deduplicator;
+
+    @Override
+    public void init(Config config, Publishment publishment) throws Exception {
+        this.persist = new AlertEagleStorePersister(config);
+        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+        LOG.info("initialized plugin for EagleStorePlugin");
+    }
+
+    @Override
+    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
+        deduplicator.setDedupIntervalMin(dedupIntervalMin);
+    }
+
+    @Override
+    public PublishStatus getStatus() {
+        return this.status;
+    }
+
+    @Override
+    public AlertStreamEvent dedup(AlertStreamEvent event) {
+        return deduplicator.dedup(event);
+    }
+
+    /**
+     * Persist AlertEntity to alert_details table
+     * @param event
+     */
+    @Override
+    public void onAlert(AlertStreamEvent event) {
+        LOG.info("write alert to eagle storage " + event);
+        event = dedup(event);
+        if(event == null) {
+            return;
+        }
+        PublishStatus status = new PublishStatus();
+        try{
+            boolean result = persist.doPersist(Arrays.asList(event));
+            if(result) {
+                status.successful = true;
+                status.errorMessage = "";
+            }else{
+                status.successful = false;
+                status.errorMessage = "";
+            }
+        }catch (Exception ex ){
+            status.successful = false;
+            status.errorMessage = ex.getMessage();
+            LOG.error("Fail writing alert entity to Eagle Store", ex);
+        }
+        this.status = status;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public int hashCode(){
+        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o){
+        if(o == this)
+            return true;
+        if(!(o instanceof AlertEagleStorePublisher))
+            return false;
+        return true;
+    }
+}
\ No newline at end of file


Mime
View raw message