eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [27/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:35 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
deleted file mode 100755
index 2ffccaa..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator.impl;
-
-import org.apache.eagle.alert.config.ConfigBusConsumer;
-import org.apache.eagle.alert.config.ConfigChangeCallback;
-import org.apache.eagle.alert.config.ConfigValue;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.coordination.model.*;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * <b>TODO</b>: performance tuning: It is not JVM level service, so it may cause
- * zookeeper burden in case of too many listeners This does not support
- * dynamically adding topic, all topics should be available when service object
- * is created.
- * ZK path format is as following:
- * <ul>
- * <li>/alert/topology_1/spout</li>
- * <li>/alert/topology_1/router</li>
- * <li>/alert/topology_1/alert</li>
- * <li>/alert/topology_1/publisher</li>
- * </ul>
- */
-public class ZKMetadataChangeNotifyService extends AbstractMetadataChangeNotifyService implements ConfigChangeCallback {
-    private static final long serialVersionUID = -1509237694501235144L;
-    private static final Logger LOG = LoggerFactory.getLogger(ZKMetadataChangeNotifyService.class);
-    private ZKConfig zkConfig;
-    private String topologyId;
-    private ConfigBusConsumer consumer;
-
-    private transient IMetadataServiceClient client;
-
-    public ZKMetadataChangeNotifyService(ZKConfig config, String topologyId) {
-        this.zkConfig = config;
-        this.topologyId = topologyId;
-    }
-
-    @Override
-    public void init(Config config, MetadataType type) {
-        super.init(config, type);
-        client = new MetadataServiceClientImpl(config);
-        consumer = new ConfigBusConsumer(zkConfig, topologyId + "/" + getMetadataTopicSuffix(), this);
-        LOG.info("init called for client");
-    }
-
-    @Override
-    public void activateFetchMetaData() throws Exception {
-        this.onNewConfig(consumer.getConfigValue());
-    }
-
-    private String getMetadataTopicSuffix() {
-        switch (type) {
-            case ALERT_BOLT:
-                return "alert";
-            case ALERT_PUBLISH_BOLT:
-                return "publisher";
-            case SPOUT:
-                return "spout";
-            case STREAM_ROUTER_BOLT:
-                return "router";
-            default:
-                throw new RuntimeException(String.format("unexpected metadata type: %s !", type));
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        consumer.close();
-        LOG.info("Config consumer closed");
-    }
-
-    @Override
-    public void onNewConfig(ConfigValue value) {
-        LOG.info("Metadata changed {}", value);
-
-        if (client == null) {
-            LOG.error("OnNewConfig trigger, but metadata service client is null. Metadata type {}", type);
-            return;
-        }
-
-        // analyze config value and notify corresponding listeners
-        String version = value.getValue().toString();
-        // brute-force load all: this might introduce load's on metadata service.
-        // FIXME : after ScheduleState persisted with better normalization, load
-        // state based on type and version
-        ScheduleState state = client.getVersionedSpec(version);
-        if (state == null) {
-            LOG.error("Failed to load schedule state of version {}, this is possibly a bug, pls check coordinator log !", version);
-            return;
-        }
-        Map<String, StreamDefinition> sds = getStreams(state.getStreamSnapshots());
-        switch (type) {
-            case ALERT_BOLT:
-                // we might query metadata service query get metadata snapshot and StreamDefinition
-                AlertBoltSpec alertSpec = state.getAlertSpecs().get(topologyId);
-                if (alertSpec == null) {
-                    LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
-                } else {
-                    prePopulate(alertSpec, state.getPolicySnapshots());
-                    notifyAlertBolt(alertSpec, sds);
-                }
-                break;
-            case ALERT_PUBLISH_BOLT:
-                PublishSpec pubSpec = state.getPublishSpecs().get(topologyId);
-                if (pubSpec == null) {
-                    LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
-                } else {
-                    notifyAlertPublishBolt(pubSpec, sds);
-                    if (state.getAlertSpecs().get(topologyId) != null) {
-                        notifyAlertPublishBolt(listToMap(state.getPolicySnapshots()), sds);
-                    }
-                }
-                break;
-            case SPOUT:
-                SpoutSpec spoutSpec = state.getSpoutSpecs().get(topologyId);
-                if (spoutSpec == null) {
-                    LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
-                } else {
-                    notifySpout(spoutSpec, sds);
-                }
-                break;
-            case STREAM_ROUTER_BOLT:
-                RouterSpec gSpec = state.getGroupSpecs().get(topologyId);
-                if (gSpec == null) {
-                    LOG.error(" alert spec for version {} not found for topology {} !", version, topologyId);
-                } else {
-                    notifyStreamRouterBolt(gSpec, sds);
-                }
-                break;
-            default:
-                LOG.error("unexpected metadata type: {} ", type);
-        }
-    }
-
-    private void prePopulate(AlertBoltSpec alertSpec, List<VersionedPolicyDefinition> list) {
-        Map<String, PolicyDefinition> policyMap = listToMap(list);
-        for (Entry<String, List<String>> policyEntry : alertSpec.getBoltPolicyIdsMap().entrySet()) {
-            List<PolicyDefinition> pds = alertSpec.getBoltPoliciesMap().get(policyEntry.getKey());
-            if (pds == null) {
-                pds = new ArrayList<PolicyDefinition>();
-                alertSpec.getBoltPoliciesMap().put(policyEntry.getKey(), pds);
-            }
-            for (String policyName : policyEntry.getValue()) {
-                if (policyMap.containsKey(policyName)) {
-                    pds.add(policyMap.get(policyName));
-                }
-            }
-        }
-    }
-
-    private Map<String, StreamDefinition> getStreams(List<VersionedStreamDefinition> streamSnapshots) {
-        Map<String, StreamDefinition> result = new HashMap<String, StreamDefinition>();
-        for (VersionedStreamDefinition vsd : streamSnapshots) {
-            result.put(vsd.getDefinition().getStreamId(), vsd.getDefinition());
-        }
-        return result;
-    }
-
-    private Map<String, PolicyDefinition> listToMap(List<VersionedPolicyDefinition> listStreams) {
-        Map<String, PolicyDefinition> result = new HashMap<String, PolicyDefinition>();
-        for (VersionedPolicyDefinition sd : listStreams) {
-            result.put(sd.getDefinition().getName(), sd.getDefinition());
-        }
-        return result;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
deleted file mode 100644
index d90fd9c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/CompositePolicyHandler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created on 7/27/16.
- */
-public class CompositePolicyHandler implements PolicyStreamHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(CompositePolicyHandler.class);
-
-    private PolicyStreamHandler policyHandler;
-    private PolicyStreamHandler stateHandler;
-    private List<PolicyStreamHandler> handlers = new ArrayList<>();
-
-    private Collector<AlertStreamEvent> collector;
-
-    private Map<String, StreamDefinition> sds;
-
-    public CompositePolicyHandler(Map<String, StreamDefinition> sds) {
-        this.sds = sds;
-    }
-
-    @Override
-    public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
-        this.collector = collector;
-        // TODO: create two handlers
-        policyHandler = PolicyStreamHandlers.createHandler(context.getPolicyDefinition().getDefinition(), sds);
-        policyHandler.prepare(collector, context);
-        handlers.add(policyHandler);
-
-        if (context.getPolicyDefinition().getStateDefinition() != null) {
-            stateHandler = PolicyStreamHandlers.createStateHandler(context.getPolicyDefinition().getStateDefinition().type, sds);
-            stateHandler.prepare(collector, context);
-            handlers.add(stateHandler);
-        }
-    }
-
-    @Override
-    public void send(StreamEvent event) throws Exception {
-        // policyHandler.send(event);
-        send(event, 0);
-    }
-
-    // send event to index of stream handler
-    public void send(StreamEvent event, int idx) throws Exception {
-        if (handlers.size() > idx) {
-            handlers.get(idx).send(event);
-        } else if (event instanceof AlertStreamEvent) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Emit new alert event: {}", event);
-            }
-            collector.emit((AlertStreamEvent) event); // for alert stream events, emit if no handler found.
-        } else {
-            // nothing found. LOG, and throw exception
-            LOG.error("non-alert-stream-event {} send with index {}, but the handler is not found!", event, idx);
-            throw new Exception(String.format("event %s send with idx %d can not found expecting handler!", event, idx));
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        for (PolicyStreamHandler handler : handlers) {
-            try {
-                handler.close();
-            } catch (Exception e) {
-                LOG.error("close handler {} failed, continue to run.", handler);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
deleted file mode 100644
index 20c2e1d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyChangeListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import java.util.List;
-import java.util.Map;
-
-public interface PolicyChangeListener {
-    void onPolicyChange(String version,
-                        List<PolicyDefinition> added,
-                        List<PolicyDefinition> removed,
-                        List<PolicyDefinition> modified, Map<String, StreamDefinition> sds);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
deleted file mode 100644
index e970ddd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyGroupEvaluator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.eagle.alert.engine.evaluator;
-
-import org.apache.eagle.alert.engine.AlertStreamCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-import java.io.Serializable;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * policy group refers to the policies which belong to the same MonitoredStream
- * 3 lifecycle steps are involved in PolicyGroupEvaluator
- * Step 1: create object. Be aware that in distributed environment, this object may be serialized and transferred across network
- * Step 2: init. This normally is invoked only once before nextEvent is invoked
- * Step 3: nextEvent
- * Step 4: close
- */
-public interface PolicyGroupEvaluator extends PolicyChangeListener, Serializable {
-    void init(StreamContext context, AlertStreamCollector collector);
-
-    /**
-     * Evaluate event.
-     */
-    void nextEvent(PartitionedEvent event);
-
-    String getName();
-
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
deleted file mode 100644
index 59d9e1f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.evaluator;
-
-import org.apache.eagle.alert.engine.StreamCounter;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import com.typesafe.config.Config;
-
-public class PolicyHandlerContext {
-    private PolicyDefinition policyDefinition;
-    private PolicyGroupEvaluator policyEvaluator;
-    private StreamCounter policyCounter;
-    private String policyEvaluatorId;
-    private Config config;
-
-    public PolicyDefinition getPolicyDefinition() {
-        return policyDefinition;
-    }
-
-    public void setPolicyDefinition(PolicyDefinition policyDefinition) {
-        this.policyDefinition = policyDefinition;
-    }
-
-    public PolicyGroupEvaluator getPolicyEvaluator() {
-        return policyEvaluator;
-    }
-
-    public void setPolicyEvaluator(PolicyGroupEvaluator policyEvaluator) {
-        this.policyEvaluator = policyEvaluator;
-    }
-
-    public void setPolicyCounter(StreamCounter metric) {
-        this.policyCounter = metric;
-    }
-
-    public StreamCounter getPolicyCounter() {
-        return policyCounter;
-    }
-
-    public String getPolicyEvaluatorId() {
-        return policyEvaluatorId;
-    }
-
-    public void setPolicyEvaluatorId(String policyEvaluatorId) {
-        this.policyEvaluatorId = policyEvaluatorId;
-    }
-
-    public Config getConfig() {
-        return config;
-    }
-
-    public void setConfig(Config config) {
-        this.config = config;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
deleted file mode 100755
index 7b457b0..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandler.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-public interface PolicyStreamHandler {
-    void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception;
-
-    void send(StreamEvent event) throws Exception;
-
-    void close() throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
deleted file mode 100644
index 116f633..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyStateHandler;
-import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * TODO/FIXME: to support multiple stage definition in single policy. The methods in this class is not good to understand now.(Hard code of 0/1).
- */
-public class PolicyStreamHandlers {
-    private static final Logger LOG = LoggerFactory.getLogger(PolicyStreamHandlers.class);
-
-    public static final String SIDDHI_ENGINE = "siddhi";
-    public static final String NO_DATA_ALERT_ENGINE = "nodataalert";
-    public static final String ABSENCE_ALERT_ENGINE = "absencealert";
-    public static final String CUSTOMIZED_ENGINE = "Custom";
-
-    public static PolicyStreamHandler createHandler(PolicyDefinition.Definition definition, Map<String, StreamDefinition> sds) {
-        if (SIDDHI_ENGINE.equals(definition.getType())) {
-            return new SiddhiPolicyHandler(sds, 0);// // FIXME: 8/2/16 
-        } else if (NO_DATA_ALERT_ENGINE.equals(definition.getType())) {
-            // no data for an entire stream won't trigger gap alert  (use local time & batch window instead)
-            return new NoDataPolicyTimeBatchHandler(sds);
-        } else if (ABSENCE_ALERT_ENGINE.equals(definition.getType())) {
-            return new AbsencePolicyHandler(sds);
-        } else if (CUSTOMIZED_ENGINE.equals(definition.getType())) {
-            try {
-                Class<?> handlerClz = Class.forName(definition.getHandlerClass());
-                PolicyStreamHandler handler = (PolicyStreamHandler) handlerClz.getConstructor(Map.class).newInstance(sds);
-                return handler;
-            } catch (Exception e) {
-                LOG.error("Not able to create policy handler for handler class " + definition.getHandlerClass(), e);
-                throw new IllegalArgumentException("Illegal extended policy handler class!" + definition.getHandlerClass());
-            }
-        }
-        throw new IllegalArgumentException("Illegal policy stream handler type " + definition.getType());
-    }
-
-    public static PolicyStreamHandler createStateHandler(String type, Map<String, StreamDefinition> sds) {
-        if (SIDDHI_ENGINE.equals(type)) {
-            return new SiddhiPolicyStateHandler(sds, 1); //// FIXME: 8/2/16
-        }
-        throw new IllegalArgumentException("Illegal policy state handler type " + type);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
deleted file mode 100644
index f53139f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.absence;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * this assumes that event comes in time order.
- * Since 7/7/16.
- */
-public class AbsenceAlertDriver {
-    private static final Logger LOG = LoggerFactory.getLogger(AbsenceAlertDriver.class);
-    private List<Object> expectedAttrs;
-    private AbsenceWindowProcessor processor;
-    private AbsenceWindowGenerator windowGenerator;
-
-    public AbsenceAlertDriver(List<Object> expectedAttrs, AbsenceWindowGenerator windowGenerator) {
-        this.expectedAttrs = expectedAttrs;
-        this.windowGenerator = windowGenerator;
-    }
-
-    public boolean process(List<Object> appearAttrs, long occurTime) {
-        // initialize window
-        if (processor == null) {
-            processor = nextProcessor(occurTime);
-            LOG.info("initialized a new window {}", processor);
-        }
-        processor.process(appearAttrs, occurTime);
-        AbsenceWindowProcessor.OccurStatus status = processor.checkStatus();
-        boolean expired = processor.checkExpired();
-        boolean isAbsenceAlert = false;
-        if (expired) {
-            if (status == AbsenceWindowProcessor.OccurStatus.absent) {
-                // send alert
-                LOG.info("===================");
-                LOG.info("|| Absence Alert ||");
-                LOG.info("===================");
-                isAbsenceAlert = true;
-                // figure out next window and set the new window
-            }
-            processor = nextProcessor(occurTime);
-            LOG.info("created a new window {}", processor);
-        }
-
-        return isAbsenceAlert;
-    }
-
-    /**
-     * calculate absolute time range based on current timestamp.
-     *
-     * @param currTime milliseconds
-     * @return
-     */
-    private AbsenceWindowProcessor nextProcessor(long currTime) {
-        AbsenceWindow window = windowGenerator.nextWindow(currTime);
-        return new AbsenceWindowProcessor(expectedAttrs, window);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
deleted file mode 100644
index db4be7c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.absence;
-
-/**
- * Since 7/7/16.
- */
-public class AbsenceDailyRule implements AbsenceRule {
-    public static final long DAY_MILLI_SECONDS = 86400 * 1000L;
-    public long startOffset;
-    public long endOffset;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
deleted file mode 100644
index c372411..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.absence;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.utils.AlertStreamUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-/**
- * Since 7/6/16.
- * * policy would be like:
- * {
- * "name": "absenceAlertPolicy",
- * "description": "absenceAlertPolicy",
- * "inputStreams": [
- * "absenceAlertStream"
- * ],
- * "outputStreams": [
- * "absenceAlertStream_out"
- * ],
- * "definition": {
- * "type": "absencealert",
- * "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
- * },
- * "partitionSpec": [
- * {
- * "streamId": "absenceAlertStream",
- * "type": "GROUPBY",
- * "columns" : ["jobID"]
- * }
- * ],
- * "parallelismHint": 2
- * }
- */
-public class AbsencePolicyHandler implements PolicyStreamHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(AbsencePolicyHandler.class);
-    private Map<String, StreamDefinition> sds;
-    private volatile PolicyDefinition policyDef;
-    private volatile Collector<AlertStreamEvent> collector;
-    private volatile PolicyHandlerContext context;
-    private volatile List<Integer> expectFieldIndices = new ArrayList<>();
-    private volatile List<Object> expectValues = new ArrayList<>();
-    private AbsenceAlertDriver driver;
-
-    public AbsencePolicyHandler(Map<String, StreamDefinition> sds) {
-        this.sds = sds;
-    }
-
-    @Override
-    public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
-        this.collector = collector;
-        this.context = context;
-        this.policyDef = context.getPolicyDefinition();
-        List<String> inputStreams = policyDef.getInputStreams();
-        // validate inputStreams has to contain only one stream
-        if (inputStreams.size() != 1) {
-            throw new IllegalArgumentException("policy inputStream size has to be 1 for absence alert");
-        }
-        // validate outputStream has to contain only one stream
-        if (policyDef.getOutputStreams().size() != 1) {
-            throw new IllegalArgumentException("policy outputStream size has to be 1 for absence alert");
-        }
-
-        String is = inputStreams.get(0);
-        StreamDefinition sd = sds.get(is);
-
-        String policyValue = policyDef.getDefinition().getValue();
-
-        // Assume that absence alert policy value consists of
-        // "numOfFields, f1_name, f2_name, f1_value, f2_value, absence_window_rule_type, startTimeOffset, endTimeOffset"
-        String[] segments = policyValue.split(",\\s*");
-        int offset = 0;
-        // populate wisb field names
-        int numOfFields = Integer.parseInt(segments[offset++]);
-        for (int i = offset; i < offset + numOfFields; i++) {
-            String fn = segments[i];
-            expectFieldIndices.add(sd.getColumnIndex(fn));
-        }
-        offset += numOfFields;
-        for (int i = offset; i < offset + numOfFields; i++) {
-            String fn = segments[i];
-            expectValues.add(fn);
-        }
-        offset += numOfFields;
-        String absenceWindowRuleType = segments[offset++];
-        AbsenceDailyRule rule = new AbsenceDailyRule();
-        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
-        sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
-        Date t1 = sdf.parse(segments[offset++]);
-        rule.startOffset = t1.getTime();
-        Date t2 = sdf.parse(segments[offset++]);
-        rule.endOffset = t2.getTime();
-        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
-        driver = new AbsenceAlertDriver(expectValues, generator);
-    }
-
-    @Override
-    public void send(StreamEvent event) throws Exception {
-        Object[] data = event.getData();
-        List<Object> columnValues = new ArrayList<>();
-        for (int i = 0; i < expectFieldIndices.size(); i++) {
-            Object o = data[expectFieldIndices.get(i)];
-            // convert value to string
-            columnValues.add(o.toString());
-        }
-
-        boolean isAbsenceAlert = driver.process(columnValues, event.getTimestamp());
-
-        // Publishing alerts.
-        if (isAbsenceAlert) {
-            AlertStreamEvent alertEvent = AlertStreamUtils.createAlertEvent(event, context, sds);
-            collector.emit(alertEvent);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
deleted file mode 100644
index 272d5cf..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.absence;
-
-/**
- * Since 7/7/16.
- */
-public interface AbsenceRule {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
deleted file mode 100644
index 9958dc7..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.absence;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-
-/**
- * Since 7/7/16.
- */
-public class AbsenceWindow {
-    public long startTime;
-    public long endTime;
-
-    public String toString() {
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
-        String t1 = sdf.format(new Date(startTime));
-        String t2 = sdf.format(new Date(endTime));
-        String format = "startTime=%d (%s), endTime=%d (%s)";
-        return String.format(format, startTime, t1, endTime, t2);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
deleted file mode 100644
index dfde09a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.absence;
-
-/**
- * Since 7/7/16.
- */
-public class AbsenceWindowGenerator {
-    private AbsenceRule rule;
-
-    public AbsenceWindowGenerator(AbsenceRule rule) {
-        this.rule = rule;
-    }
-
-    /**
-     * nextWindow.
-     *
-     * @param currTime current timestamp
-     */
-    public AbsenceWindow nextWindow(long currTime) {
-        AbsenceWindow window = new AbsenceWindow();
-        if (rule instanceof AbsenceDailyRule) {
-            AbsenceDailyRule r = (AbsenceDailyRule) rule;
-            long adjustment = 0; // if today's window already expires, then adjust to tomorrow's window
-            if (currTime % AbsenceDailyRule.DAY_MILLI_SECONDS > r.startOffset) {
-                adjustment = AbsenceDailyRule.DAY_MILLI_SECONDS;
-            }
-            // use current timestamp to round down to day
-            long day = currTime - currTime % AbsenceDailyRule.DAY_MILLI_SECONDS;
-            day += adjustment;
-            window.startTime = day + r.startOffset;
-            window.endTime = day + r.endOffset;
-            return window;
-        } else {
-            throw new UnsupportedOperationException("not supported rule " + rule);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
deleted file mode 100644
index 8f00b31..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.absence;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Since 7/6/16.
- * To process each incoming event
- * internally maintain state machine to trigger alert when some attribute does not occur within this window
- */
-public class AbsenceWindowProcessor {
-    private static final Logger LOG = LoggerFactory.getLogger(AbsenceWindowProcessor.class);
-    private List<Object> expectAttrs;
-    private AbsenceWindow window;
-    private boolean expired; // to mark if the time range has been went through
-    private OccurStatus status = OccurStatus.not_sure;
-
-    public enum OccurStatus {
-        not_sure,
-        occured,
-        absent
-    }
-
-    public AbsenceWindowProcessor(List<Object> expectAttrs, AbsenceWindow window) {
-        this.expectAttrs = expectAttrs;
-        this.window = window;
-        expired = false;
-    }
-
-    /**
-     * return true if it is certain that expected attributes don't occur during startTime and endTime, else return false.
-     */
-    public void process(List<Object> appearAttrs, long occurTime) {
-        if (expired) {
-            throw new IllegalStateException("Expired window can't recieve events");
-        }
-        switch (status) {
-            case not_sure:
-                if (occurTime < window.startTime) {
-                    break;
-                } else if (occurTime >= window.startTime
-                    && occurTime <= window.endTime) {
-                    if (expectAttrs.equals(appearAttrs)) {
-                        status = OccurStatus.occured;
-                    }
-                    break;
-                } else {
-                    status = OccurStatus.absent;
-                    break;
-                }
-            case occured:
-                if (occurTime > window.endTime) {
-                    expired = true;
-                }
-                break;
-            default:
-                break;
-        }
-        // reset status
-        if (status == OccurStatus.absent) {
-            expired = true;
-        }
-    }
-
-    public OccurStatus checkStatus() {
-        return status;
-    }
-
-    public boolean checkExpired() {
-        return expired;
-    }
-
-    public AbsenceWindow currWindow() {
-        return window;
-    }
-
-    public String toString() {
-        return "expectAttrs=" + expectAttrs + ", status=" + status + ", expired=" + expired + ", window=[" + window + "]";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
deleted file mode 100755
index 185853d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import org.apache.eagle.alert.engine.AlertStreamCollector;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.router.StreamOutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * <h2>Thread Safe Mechanism.</h2>
- * <ul>
- * <li>
- * emit() method is thread-safe enough to be called anywhere asynchronously in multi-thread
- * </li>
- * <li>
- * flush() method must be called synchronously, because Storm OutputCollector is not thread-safe
- * </li>
- * </ul>
- */
-public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCollector {
-    private final StreamOutputCollector delegate;
-    private final LinkedBlockingQueue<AlertStreamEvent> queue;
-    private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class);
-    private final AtomicLong lastFlushTime = new AtomicLong(System.currentTimeMillis());
-    private final AutoAlertFlusher flusher;
-    private static final int MAX_ALERT_DELAY_SECS = 10;
-
-    public AlertBoltOutputCollectorThreadSafeWrapper(StreamOutputCollector outputCollector) {
-        this.delegate = outputCollector;
-        this.queue = new LinkedBlockingQueue<>();
-        this.flusher = new AutoAlertFlusher(this);
-        this.flusher.setName(Thread.currentThread().getName() + "-alertFlusher");
-        this.flusher.start();
-    }
-
-    private static class AutoAlertFlusher extends Thread {
-        private final AlertBoltOutputCollectorThreadSafeWrapper collector;
-        private boolean stopped = false;
-        private static final Logger LOG = LoggerFactory.getLogger(AutoAlertFlusher.class);
-
-        private AutoAlertFlusher(AlertBoltOutputCollectorThreadSafeWrapper collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void run() {
-            LOG.info("Starting");
-            while (!this.stopped) {
-                if (System.currentTimeMillis() - collector.lastFlushTime.get() >= MAX_ALERT_DELAY_SECS * 1000L) {
-                    this.collector.flush();
-                }
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException ignored) {
-                    // ignored
-                }
-            }
-            LOG.info("Stopped");
-        }
-
-        public void shutdown() {
-            LOG.info("Stopping");
-            this.stopped = true;
-        }
-    }
-
-    /**
-     * Emit method can be called in multi-thread.
-     *
-     * @param event
-     */
-    @Override
-    public void emit(AlertStreamEvent event) {
-        try {
-            queue.put(event);
-        } catch (InterruptedException e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * Flush will be called in synchronous way like StormBolt.execute() as Storm OutputCollector is not thread-safe
-     */
-    @Override
-    public void flush() {
-        if (!queue.isEmpty()) {
-            List<AlertStreamEvent> events = new ArrayList<>();
-            queue.drainTo(events);
-            events.forEach((event) -> delegate.emit(Arrays.asList(event.getStreamId(), event)));
-            LOG.info("Flushed {} alerts", events.size());
-        }
-        lastFlushTime.set(System.currentTimeMillis());
-    }
-
-    @Override
-    public void close() {
-        this.flusher.shutdown();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
deleted file mode 100755
index 606ddce..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import org.apache.eagle.alert.engine.AlertStreamCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.router.StreamOutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-
-public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorWrapper.class);
-
-    private final StreamOutputCollector delegate;
-    private final Object outputLock;
-    private final StreamContext streamContext;
-
-    private volatile Set<PublishPartition> publishPartitions;
-
-    public AlertBoltOutputCollectorWrapper(StreamOutputCollector outputCollector, Object outputLock,
-                                           StreamContext streamContext) {
-        this.delegate = outputCollector;
-        this.outputLock = outputLock;
-        this.streamContext = streamContext;
-
-        this.publishPartitions = new HashSet<>();
-    }
-
-    @Override
-    public void emit(AlertStreamEvent event) {
-        Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions);
-        for (PublishPartition publishPartition : clonedPublishPartitions) {
-            // skip the publish partition which is not belong to this policy and also check streamId
-            PublishPartition cloned = publishPartition.clone();
-            Optional.ofNullable(event)
-                .filter(x -> x != null
-                    && x.getSchema() != null
-                    && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId())
-                    && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId())
-                    || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT)))
-                .ifPresent(x -> {
-                    cloned.getColumns().stream()
-                        .filter(y -> event.getSchema().getColumnIndex(y) >= 0
-                            && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size())
-                        .map(y -> event.getData()[event.getSchema().getColumnIndex(y)])
-                        .filter(y -> y != null)
-                        .forEach(y -> cloned.getColumnValues().add(y));
-                    synchronized (outputLock) {
-                        streamContext.counter().incr("alert_count");
-                        delegate.emit(Arrays.asList(cloned, event));
-                    }
-                });
-        }
-    }
-
-    @Override
-    public void flush() {
-        // do nothing
-    }
-
-    @Override
-    public void close() {
-    }
-
-    public synchronized void onAlertBoltSpecChange(Collection<PublishPartition> addedPublishPartitions,
-                                                   Collection<PublishPartition> removedPublishPartitions,
-                                                   Collection<PublishPartition> modifiedPublishPartitions) {
-        Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions);
-        clonedPublishPartitions.addAll(addedPublishPartitions);
-        clonedPublishPartitions.removeAll(removedPublishPartitions);
-        clonedPublishPartitions.addAll(modifiedPublishPartitions);
-        publishPartitions = clonedPublishPartitions;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
deleted file mode 100644
index 25ebfca..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.CompositePolicyHandler;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-/**
- * Created on 8/2/16.
- */
-public class AlertStreamCallback extends StreamCallback {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AlertStreamCallback.class);
-    private final String outputStream;
-    private final Collector<AlertStreamEvent> collector;
-    private final PolicyHandlerContext context;
-    private final StreamDefinition definition;
-
-    private int currentIndex;
-
-    public AlertStreamCallback(String outputStream,
-                               StreamDefinition streamDefinition,
-                               Collector<AlertStreamEvent> collector,
-                               PolicyHandlerContext context,
-                               int currentIndex) {
-        this.outputStream = outputStream;
-        this.collector = collector;
-        this.context = context;
-        this.definition = streamDefinition;
-        this.currentIndex = currentIndex;
-    }
-
-    /**
-     * Possibly more than one event will be triggered for alerting.
-     */
-    @Override
-    public void receive(Event[] events) {
-        String policyName = context.getPolicyDefinition().getName();
-        String siteId = context.getPolicyDefinition().getSiteId();
-        CompositePolicyHandler handler = ((PolicyGroupEvaluatorImpl) context.getPolicyEvaluator()).getPolicyHandler(policyName);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Generated {} alerts from policy '{}' in {}, index of definiton {} ", events.length, policyName, context.getPolicyEvaluatorId(), currentIndex);
-        }
-        for (Event e : events) {
-            AlertStreamEvent event = new AlertStreamEvent();
-            event.setSiteId(siteId);
-            event.setTimestamp(e.getTimestamp());
-            event.setData(e.getData());
-            event.setStreamId(outputStream);
-            event.setPolicyId(context.getPolicyDefinition().getName());
-            if (this.context.getPolicyEvaluator() != null) {
-                event.setCreatedBy(context.getPolicyEvaluator().getName());
-            }
-            event.setCreatedTime(System.currentTimeMillis());
-            event.setSchema(definition);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Generate new alert event: {}", event);
-            }
-            try {
-                if (handler == null) {
-                    // extreme case: the handler is removed from the evaluator. Just emit.
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(" handler not found when callback received event, directly emit. policy removed? ");
-                    }
-                    collector.emit(event);
-                } else {
-                    handler.send(event, currentIndex + 1);
-                }
-            } catch (Exception ex) {
-                LOG.error(String.format("send event %s to index %d failed with exception. ", event, currentIndex), ex);
-            }
-        }
-        context.getPolicyCounter().incrBy(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "alert_count"), events.length);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
deleted file mode 100644
index af35551..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import org.apache.eagle.alert.engine.AlertStreamCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.CompositePolicyHandler;
-import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
-    private static final long serialVersionUID = -5499413193675985288L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class);
-
-    private AlertStreamCollector collector;
-    // mapping from policy name to PolicyDefinition
-    private volatile Map<String, PolicyDefinition> policyDefinitionMap = new HashMap<>();
-    // mapping from policy name to PolicyStreamHandler
-    private volatile Map<String, CompositePolicyHandler> policyStreamHandlerMap = new HashMap<>();
-    private String policyEvaluatorId;
-    private StreamContext context;
-
-    public PolicyGroupEvaluatorImpl(String policyEvaluatorId) {
-        this.policyEvaluatorId = policyEvaluatorId;
-    }
-
-    public void init(StreamContext context, AlertStreamCollector collector) {
-        this.collector = collector;
-        this.policyStreamHandlerMap = new HashMap<>();
-        this.context = context;
-        Thread.currentThread().setName(policyEvaluatorId);
-    }
-
-    public void nextEvent(PartitionedEvent event) {
-        this.context.counter().incr("receive_count");
-        dispatch(event);
-    }
-
-    @Override
-    public String getName() {
-        return this.policyEvaluatorId;
-    }
-
-    public void close() {
-        for (PolicyStreamHandler handler : policyStreamHandlerMap.values()) {
-            try {
-                handler.close();
-            } catch (Exception e) {
-                LOG.error("Failed to close handler {}", handler.toString(), e);
-            }
-        }
-    }
-
-    /**
-     * fixme make selection of PolicyStreamHandler to be more efficient.
-     *
-     * @param partitionedEvent PartitionedEvent
-     */
-    private void dispatch(PartitionedEvent partitionedEvent) {
-        boolean handled = false;
-        for (Map.Entry<String, CompositePolicyHandler> policyStreamHandler : policyStreamHandlerMap.entrySet()) {
-            if (isAcceptedByPolicy(partitionedEvent, policyDefinitionMap.get(policyStreamHandler.getKey()))) {
-                try {
-                    handled = true;
-                    this.context.counter().incr("eval_count");
-                    policyStreamHandler.getValue().send(partitionedEvent.getEvent());
-                } catch (Exception e) {
-                    this.context.counter().incr("fail_count");
-                    LOG.error("{} failed to handle {}", policyStreamHandler.getValue(), partitionedEvent.getEvent(), e);
-                }
-            }
-        }
-        if (!handled) {
-            this.context.counter().incr("drop_count");
-            LOG.warn("Drop stream non-matched event {}, which should not be sent to evaluator", partitionedEvent);
-        } else {
-            this.context.counter().incr("accept_count");
-        }
-    }
-
-    private static boolean isAcceptedByPolicy(PartitionedEvent event, PolicyDefinition policy) {
-        return policy.getPartitionSpec().contains(event.getPartition())
-            && (policy.getInputStreams().contains(event.getEvent().getStreamId())
-            || policy.getDefinition().getInputStreams().contains(event.getEvent().getStreamId()));
-    }
-
-    @Override
-    public void onPolicyChange(String version, List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) {
-        Map<String, PolicyDefinition> copyPolicies = new HashMap<>(policyDefinitionMap);
-        Map<String, CompositePolicyHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap);
-        for (PolicyDefinition pd : added) {
-            inplaceAdd(copyPolicies, copyHandlers, pd, sds);
-        }
-        for (PolicyDefinition pd : removed) {
-            inplaceRemove(copyPolicies, copyHandlers, pd);
-        }
-        for (PolicyDefinition pd : modified) {
-            inplaceRemove(copyPolicies, copyHandlers, pd);
-            inplaceAdd(copyPolicies, copyHandlers, pd, sds);
-        }
-
-        // logging
-        LOG.info("{} with {} Policy metadata updated with added={}, removed={}, modified={}", policyEvaluatorId, version, added, removed, modified);
-
-        // switch reference
-        this.policyDefinitionMap = copyPolicies;
-        this.policyStreamHandlerMap = copyHandlers;
-    }
-
-    private void inplaceAdd(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy, Map<String, StreamDefinition> sds) {
-        if (handlers.containsKey(policy.getName())) {
-            LOG.error("metadata calculation error, try to add existing PolicyDefinition " + policy);
-        } else {
-            policies.put(policy.getName(), policy);
-            CompositePolicyHandler handler = new CompositePolicyHandler(sds);
-            try {
-                PolicyHandlerContext handlerContext = new PolicyHandlerContext();
-                handlerContext.setPolicyCounter(this.context.counter());
-                handlerContext.setPolicyDefinition(policy);
-                handlerContext.setPolicyEvaluator(this);
-                handlerContext.setPolicyEvaluatorId(policyEvaluatorId);
-                handlerContext.setConfig(this.context.config());
-                handler.prepare(collector, handlerContext);
-                handlers.put(policy.getName(), handler);
-            } catch (Exception e) {
-                LOG.error(e.getMessage(), e);
-                policies.remove(policy.getName());
-                handlers.remove(policy.getName());
-            }
-        }
-    }
-
-    private void inplaceRemove(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy) {
-        if (handlers.containsKey(policy.getName())) {
-            PolicyStreamHandler handler = handlers.get(policy.getName());
-            try {
-                handler.close();
-            } catch (Exception e) {
-                LOG.error("Failed to close handler {}", handler, e);
-            } finally {
-                policies.remove(policy.getName());
-                handlers.remove(policy.getName());
-                LOG.info("Removed policy: {}", policy);
-            }
-        } else {
-            LOG.error("metadata calculation error, try to remove nonexisting PolicyDefinition: " + policy);
-        }
-    }
-
-
-    public CompositePolicyHandler getPolicyHandler(String policy) {
-        return policyStreamHandlerMap.get(policy);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
deleted file mode 100644
index a732e66..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.api.definition.Attribute;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SiddhiDefinitionAdapter {
-    private static final Logger LOG = LoggerFactory.getLogger(SiddhiDefinitionAdapter.class);
-    public static final String DEFINE_STREAM_TEMPLATE = "define stream %s ( %s );";
-
-    public static String buildStreamDefinition(StreamDefinition streamDefinition) {
-        List<String> columns = new ArrayList<>();
-        Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
-        if (streamDefinition.getColumns() != null) {
-            for (StreamColumn column : streamDefinition.getColumns()) {
-                columns.add(String.format("%s %s", column.getName(), convertToSiddhiAttributeType(column.getType()).toString().toLowerCase()));
-            }
-        } else {
-            LOG.warn("No columns found for stream {}" + streamDefinition.getStreamId());
-        }
-        return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getStreamId(), StringUtils.join(columns, ","));
-    }
-
-    public static Attribute.Type convertToSiddhiAttributeType(StreamColumn.Type type) {
-        if (_EAGLE_SIDDHI_TYPE_MAPPING.containsKey(type)) {
-            return _EAGLE_SIDDHI_TYPE_MAPPING.get(type);
-        }
-
-        throw new IllegalArgumentException("Unknown stream type: " + type);
-    }
-
-    public static Class<?> convertToJavaAttributeType(StreamColumn.Type type) {
-        if (_EAGLE_JAVA_TYPE_MAPPING.containsKey(type)) {
-            return _EAGLE_JAVA_TYPE_MAPPING.get(type);
-        }
-
-        throw new IllegalArgumentException("Unknown stream type: " + type);
-    }
-
-    public static StreamColumn.Type convertFromJavaAttributeType(Class<?> type) {
-        if (_JAVA_EAGLE_TYPE_MAPPING.containsKey(type)) {
-            return _JAVA_EAGLE_TYPE_MAPPING.get(type);
-        }
-
-        throw new IllegalArgumentException("Unknown stream type: " + type);
-    }
-
-    public static StreamColumn.Type convertFromSiddhiAttributeType(Attribute.Type type) {
-        if (_SIDDHI_EAGLE_TYPE_MAPPING.containsKey(type)) {
-            return _SIDDHI_EAGLE_TYPE_MAPPING.get(type);
-        }
-
-        throw new IllegalArgumentException("Unknown siddhi type: " + type);
-    }
-
-    public static String buildSiddhiExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) {
-        StringBuilder builder = new StringBuilder();
-        PolicyDefinition.Definition coreDefinition = policyDefinition.getDefinition();
-        // init if not present
-        List<String> inputStreams = coreDefinition.getInputStreams();
-        if (inputStreams == null || inputStreams.isEmpty()) {
-            inputStreams = policyDefinition.getInputStreams();
-        }
-
-        for (String inputStream : inputStreams) {
-            builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
-            builder.append("\n");
-        }
-        builder.append(coreDefinition.value);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Generated siddhi execution plan: {} from definition: {}", builder.toString(), coreDefinition);
-        }
-        return builder.toString();
-    }
-
-    public static String buildSiddhiExecutionPlan(String policyDefinition, Map<String, StreamDefinition> inputStreamDefinitions) {
-        StringBuilder builder = new StringBuilder();
-        for (Map.Entry<String,StreamDefinition> entry: inputStreamDefinitions.entrySet()) {
-            builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(entry.getValue()));
-            builder.append("\n");
-        }
-        builder.append(policyDefinition);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Generated siddhi execution plan: {}", builder.toString());
-        }
-        return builder.toString();
-    }
-
-    /**
-     * public enum Type {
-     * STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT
-     * }.
-     */
-    private static final Map<StreamColumn.Type, Attribute.Type> _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>();
-    private static final Map<StreamColumn.Type, Class<?>> _EAGLE_JAVA_TYPE_MAPPING = new HashMap<>();
-    private static final Map<Class<?>, StreamColumn.Type> _JAVA_EAGLE_TYPE_MAPPING = new HashMap<>();
-    private static final Map<Attribute.Type, StreamColumn.Type> _SIDDHI_EAGLE_TYPE_MAPPING = new HashMap<>();
-
-    static {
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING, Attribute.Type.STRING);
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT, Attribute.Type.INT);
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG, Attribute.Type.LONG);
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, Attribute.Type.FLOAT);
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, Attribute.Type.DOUBLE);
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL, Attribute.Type.BOOL);
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, Attribute.Type.OBJECT);
-
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.STRING, String.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.INT, Integer.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.LONG, Long.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, Float.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, Double.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.BOOL, Boolean.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, Object.class);
-
-        _JAVA_EAGLE_TYPE_MAPPING.put(String.class, StreamColumn.Type.STRING);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Integer.class, StreamColumn.Type.INT);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Long.class, StreamColumn.Type.LONG);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Float.class, StreamColumn.Type.FLOAT);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Double.class, StreamColumn.Type.DOUBLE);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Boolean.class, StreamColumn.Type.BOOL);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Object.class, StreamColumn.Type.OBJECT);
-
-        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.STRING, StreamColumn.Type.STRING);
-        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.INT, StreamColumn.Type.INT);
-        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.LONG, StreamColumn.Type.LONG);
-        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.FLOAT, StreamColumn.Type.FLOAT);
-        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.DOUBLE, StreamColumn.Type.DOUBLE);
-        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.BOOL, StreamColumn.Type.BOOL);
-        _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.OBJECT, StreamColumn.Type.OBJECT);
-    }
-
-    public static StreamDefinition convertFromSiddiDefinition(AbstractDefinition siddhiDefinition) {
-        StreamDefinition streamDefinition = new StreamDefinition();
-        streamDefinition.setStreamId(siddhiDefinition.getId());
-        List<StreamColumn> columns = new ArrayList<>(siddhiDefinition.getAttributeNameArray().length);
-        for (Attribute attribute : siddhiDefinition.getAttributeList()) {
-            StreamColumn column = new StreamColumn();
-            column.setType(convertFromSiddhiAttributeType(attribute.getType()));
-            column.setName(attribute.getName());
-            columns.add(column);
-        }
-        streamDefinition.setColumns(columns);
-        streamDefinition.setTimeseries(true);
-        streamDefinition.setDescription("Auto-generated stream schema from siddhi for " + siddhiDefinition.getId());
-        return streamDefinition;
-    }
-}
\ No newline at end of file


Mime
View raw message