eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [23/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:08:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java
deleted file mode 100644
index e9c4a7c..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.config;
-
-import java.io.Closeable;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-
-
-/**
- * Abstraction of asynchronized configuration management
- * This is used for config change notification between processes, without this one process has to pull changes triggered by another process
- *
- * Config bus is similar to message bus, config change producer can publish config change(message) to config bus,
- *  while config change consumer can subscribe config change and do business logic in callback
- * 1. use zookeeper as media to notify config consumer of config changes
- * 2. each type of config is represented by topic
- * 3. each config change can contain actual value or contain reference Id which consumer uses to retrieve actual value. This mechanism will reduce zookeeper overhed
- *
- */
-public class ConfigBusBase implements Closeable{
-    protected String zkRoot;
-    protected CuratorFramework curator;
-
-    public ConfigBusBase(ZKConfig config) {
-        this.zkRoot = config.zkRoot;
-        curator = CuratorFrameworkFactory.newClient(
-                config.zkQuorum,
-                config.zkSessionTimeoutMs,
-                config.connectionTimeoutMs,
-                new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
-        );
-        curator.start();
-    }
-
-    @Override
-    public void close(){
-        curator.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
deleted file mode 100644
index 9abfff5..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.config;
-
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.slf4j.Logger;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * 1. When consumer is started, it always get notified of config
- * 2. When config is changed, consumer always get notified of config change
- *
- * Reliability issue:
- * TODO How to ensure config change message is always delivered to consumer
- */
-public class ConfigBusConsumer extends ConfigBusBase {
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusConsumer.class);
-
-    private NodeCache cache;
-    public ConfigBusConsumer(ZKConfig config, String topic, ConfigChangeCallback callback){
-        super(config);
-        String zkPath = zkRoot + "/" + topic;
-        LOG.info("monitor change for zkPath " + zkPath);
-        cache = new NodeCache(curator, zkPath);
-        cache.getListenable().addListener( () ->
-            {
-                // get node value and notify callback
-                byte[] value = curator.getData().forPath(zkPath);
-                ObjectMapper mapper = new ObjectMapper();
-                ConfigValue v = mapper.readValue(value, ConfigValue.class);
-                callback.onNewConfig(v);
-            }
-        );
-        try {
-            cache.start();
-        }catch(Exception ex){
-            LOG.error("error start NodeCache listener", ex);
-            throw new RuntimeException(ex);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java
deleted file mode 100644
index 8dcbee7..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.config;
-
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.Logger;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class ConfigBusProducer extends ConfigBusBase {
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusProducer.class);
-
-    public ConfigBusProducer(ZKConfig config){
-        super(config);
-    }
-
-    /**
-     * @param topic
-     * @param config
-     */
-    public void send(String topic, ConfigValue config){
-        // check if topic exists, create this topic if not existing
-        String zkPath = zkRoot + "/" + topic;
-        try {
-            if (curator.checkExists().forPath(zkPath) == null) {
-                curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(zkPath);
-            }
-            ObjectMapper mapper = new ObjectMapper();
-            byte[] content = mapper.writeValueAsBytes(config);
-            curator.setData().forPath(zkPath, content);
-        }catch(Exception ex){
-            LOG.error("error creating zkPath " + zkPath, ex);
-            throw new RuntimeException(ex);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java
deleted file mode 100644
index f3e09d3..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.config;
-
-public interface ConfigChangeCallback {
-    void onNewConfig(ConfigValue value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java
deleted file mode 100644
index 9774296..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java
+++ /dev/null
@@ -1,50 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.config;
-
-
-/**
- * Config body contains actual data for one topic
- * this is serialized with json format into zookeeper
- * value can be versionId which is used for referencing outside data
- * or value can be actual config value
- */
-public class ConfigValue {
-    private boolean isValueVersionId;
-    private Object value;
-
-    public boolean isValueVersionId() {
-        return isValueVersionId;
-    }
-
-    public void setValueVersionId(boolean valueVersionId) {
-        isValueVersionId = valueVersionId;
-    }
-
-    public Object getValue() {
-        return value;
-    }
-
-    public void setValue(Object value) {
-        this.value = value;
-    }
-
-    public String toString(){
-        return "isValueVersionId: " + isValueVersionId + ", value: " + value;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java
deleted file mode 100644
index d215d11..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.config;
-
-import java.io.Serializable;
-
-/**
- * Memory representation of key zookeeper configurations
- */
-public class ZKConfig implements Serializable{
-    private static final long serialVersionUID = -1287231022807492775L;
-
-    public String zkQuorum;
-    public String zkRoot;
-    public int zkSessionTimeoutMs;
-    public int connectionTimeoutMs;
-    public int zkRetryTimes;
-    public int zkRetryInterval;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java
deleted file mode 100644
index ba009e6..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.config;
-
-import com.typesafe.config.Config;
-
-/**
- * Since 4/28/16.
- */
-public class ZKConfigBuilder {
-    public static ZKConfig getZKConfig(Config config){
-        ZKConfig zkConfig = new ZKConfig();
-        zkConfig.zkQuorum = config.getString("zkConfig.zkQuorum");
-        zkConfig.zkRoot = config.getString("zkConfig.zkRoot");
-        zkConfig.zkSessionTimeoutMs = config.getInt("zkConfig.zkSessionTimeoutMs");
-        zkConfig.connectionTimeoutMs = config.getInt("zkConfig.connectionTimeoutMs");
-        zkConfig.zkRetryTimes = config.getInt("zkConfig.zkRetryTimes");
-        zkConfig.zkRetryInterval = config.getInt("zkConfig.zkRetryInterval");
-        return zkConfig;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
deleted file mode 100644
index 83d307c..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * The alert specification for topology bolts.
- * 
- * @since Apr 29, 2016
- */
-public class AlertBoltSpec {
-    private String version;
-    private String topologyName;
-
-    // mapping from boltId to list of PolicyDefinitions
-    @JsonIgnore
-    private Map<String, List<PolicyDefinition>> boltPoliciesMap = new HashMap<String, List<PolicyDefinition>>();
-
-    // mapping from boltId to list of PolicyDefinition's Ids
-    private Map<String, List<String>> boltPolicyIdsMap = new HashMap<String, List<String>>();
-
-    public AlertBoltSpec() {
-    }
-
-    public AlertBoltSpec(String topo) {
-        this.topologyName = topo;
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    public String getTopologyName() {
-        return topologyName;
-    }
-
-    public void setTopologyName(String topologyName) {
-        this.topologyName = topologyName;
-    }
-
-//    public List<PolicyDefinition> getBoltPolicy(String boltId) {
-//        return boltPoliciesMap.get(boltId);
-//    }
-//
-//    public void addBoltPolicy(String boltId, PolicyDefinition pd) {
-//        if (boltPoliciesMap.containsKey(boltId)) {
-//            boltPoliciesMap.get(boltId).add(pd);
-//        } else {
-//            List<PolicyDefinition> list = new ArrayList<PolicyDefinition>();
-//            boltPoliciesMap.put(boltId, list);
-//            list.add(pd);
-//        }
-//    }
-
-    public void addBoltPolicy(String boltId, String policyName) {
-        if (boltPolicyIdsMap.containsKey(boltId)) {
-            boltPolicyIdsMap.get(boltId).add(policyName);
-        } else {
-            List<String> list = new ArrayList<String>();
-            boltPolicyIdsMap.put(boltId, list);
-            list.add(policyName);
-        }
-    }
-
-    @JsonIgnore
-    public Map<String, List<PolicyDefinition>> getBoltPoliciesMap() {
-        return boltPoliciesMap;
-    }
-
-    @JsonIgnore
-    public void setBoltPoliciesMap(Map<String, List<PolicyDefinition>> boltPoliciesMap) {
-        this.boltPoliciesMap = boltPoliciesMap;
-    }
-
-    public Map<String, List<String>> getBoltPolicyIdsMap() {
-        return boltPolicyIdsMap;
-    }
-
-    public void setBoltPolicyIdsMap(Map<String, List<String>> boltPolicyIdsMap) {
-        this.boltPolicyIdsMap = boltPolicyIdsMap;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("version:%s-topo:%s, boltPolicyIdsMap %s", version, topologyName, boltPolicyIdsMap);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java
deleted file mode 100644
index 6c4f576..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.Map;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import com.google.common.base.Objects;
-
-/**
- * @since Apr 5, 2016
- * this metadata model controls how to convert kafka topic into tuple stream
- */
-public class Kafka2TupleMetadata {
-    private String type;
-    private String name; // data source name
-    private Map<String, String> properties;
-    private String topic;
-    private String schemeCls;
-
-    private Tuple2StreamMetadata codec;
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public void setSchemeCls(String schemeCls) {
-        this.schemeCls = schemeCls;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public Map<String, String> getProperties() {
-        return properties;
-    }
-
-    public void setProperties(Map<String, String> properties) {
-        this.properties = properties;
-    }
-
-    public Tuple2StreamMetadata getCodec() {
-        return codec;
-    }
-
-    public void setCodec(Tuple2StreamMetadata codec) {
-        this.codec = codec;
-    }
-
-    public String getTopic() {
-        return this.topic;
-    }
-    public String getSchemeCls() {
-        return this.schemeCls;
-    }
-
-    public int hashCode() {
-        return new HashCodeBuilder().append(name).append(type).build();
-    }
-
-    public boolean equals(Object obj) {
-        if (!(obj instanceof Kafka2TupleMetadata)) {
-            return false;
-        }
-        Kafka2TupleMetadata o = (Kafka2TupleMetadata) obj;
-        return Objects.equal(name, o.name) && Objects.equal(type, o.type);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java
deleted file mode 100644
index 43780f3..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-public class PolicyWorkerQueue {
-
-    private StreamPartition partition;
-    private List<WorkSlot> workers;
-
-    public PolicyWorkerQueue() {
-        workers = new ArrayList<>();
-    }
-
-    public PolicyWorkerQueue(List<WorkSlot> workers) {
-        this.workers = workers;
-    }
-
-    public PolicyWorkerQueue(StreamPartition partition, List<WorkSlot> workers) {
-        this.workers = workers;
-        this.partition = partition;
-    }
-
-    public StreamPartition getPartition() {
-        return partition;
-    }
-
-    public void setPartition(StreamPartition partition) {
-        this.partition = partition;
-    }
-
-    public List<WorkSlot> getWorkers() {
-        return workers;
-    }
-
-    public void setWorkers(List<WorkSlot> workers) {
-        this.workers = workers;
-    }
-
-    public String toString() {
-        return "[" + StringUtils.join(workers, ",") + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java
deleted file mode 100644
index 06e819a..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-
-/**
- * 
- * @since May 1, 2016
- *
- */
-public class PublishSpec {
-
-    private String topologyName;
-    // actually only publish spec for one topology
-    private String boltId;
-    private String version;
-
-    private List<Publishment> publishments = new ArrayList<Publishment>();
-
-    public PublishSpec() {
-    }
-
-    public PublishSpec(String topoName, String boltId) {
-        this.topologyName = topoName;
-        this.boltId = boltId;
-    }
-
-    public void addPublishment(Publishment p) {
-        this.publishments.add(p);
-    }
-
-    public String getTopologyName() {
-        return topologyName;
-    }
-
-    public String getBoltId() {
-        return boltId;
-    }
-
-    public List<Publishment> getPublishments() {
-        return publishments;
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    public void setTopologyName(String topologyName) {
-        this.topologyName = topologyName;
-    }
-
-    public void setBoltId(String boltId) {
-        this.boltId = boltId;
-    }
-
-    public void setPublishments(List<Publishment> publishments) {
-        this.publishments = publishments;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
deleted file mode 100644
index 5241920..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-/**
- * @since Apr 29, 2016
- *
- */
-public class RouterSpec {
-    private String version;
-    private String topologyName;
-
-    private List<StreamRouterSpec> routerSpecs;
-
-    public RouterSpec() {
-        routerSpecs = new ArrayList<StreamRouterSpec>();
-    }
-
-    public RouterSpec(String topoName) {
-        this();
-        this.topologyName = topoName;
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    public String getTopologyName() {
-        return topologyName;
-    }
-
-    public void setTopologyName(String topologyName) {
-        this.topologyName = topologyName;
-    }
-
-    @JsonIgnore
-    public void addRouterSpec(StreamRouterSpec routerSpec) {
-        routerSpecs.add(routerSpec);
-    }
-
-    public List<StreamRouterSpec> getRouterSpecs() {
-        return routerSpecs;
-    }
-
-    public void setRouterSpecs(List<StreamRouterSpec> routerSpecs) {
-        this.routerSpecs = routerSpecs;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("version:%s-topo:%s, boltSpec:%s", version, topologyName, routerSpecs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java
deleted file mode 100644
index 6036f29..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * A global wise of schedule status <br/>
- * <br/>
- * TODO/FIXME: The persistence simply deserial ScheduleState to Json. One
- * concern is that this string might become too big for store. <br/>
- * <br/>
- * The solution is in metadata resource, have specs/monitoredStreams/policy
- * assignments stored in different table/collections with tage version.
- * 
- * 
- * @since Apr 26, 2016
- *
- */
-public class ScheduleState {
-
-    // ScheduleSpec
-    private Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
-    private Map<String, AlertBoltSpec> alertSpecs = new HashMap<String, AlertBoltSpec>();
-    private Map<String, RouterSpec> groupSpecs = new HashMap<String, RouterSpec>();
-    private Map<String, PublishSpec> publishSpecs = new HashMap<String, PublishSpec>();
-
-    // ScheduleSnapshot
-    private List<VersionedPolicyDefinition> policySnapshots = new ArrayList<VersionedPolicyDefinition>();
-    private List<VersionedStreamDefinition> streamSnapshots = new ArrayList<VersionedStreamDefinition>();
-
-    // ScheduleResult
-    private List<MonitoredStream> monitoredStreams = new ArrayList<MonitoredStream>();
-    private List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>();
-
-    private String version;
-    // FIXME : should be date, can not make it simple in mongo..
-    private String generateTime;
-    private int code = 200;
-    private String message = "OK";
-
-    public ScheduleState() {
-    }
-
-    public ScheduleState(String version, 
-            Map<String, SpoutSpec> topoSpoutSpecsMap,
-            Map<String, RouterSpec> groupSpecsMap, 
-            Map<String, AlertBoltSpec> alertSpecsMap,
-            Map<String, PublishSpec> pubMap, 
-            Collection<PolicyAssignment> assignments,
-            Collection<MonitoredStream> monitoredStreams, 
-            Collection<PolicyDefinition> definitions,
-            Collection<StreamDefinition> streams) {
-        this.spoutSpecs = topoSpoutSpecsMap;
-        this.groupSpecs = groupSpecsMap;
-        this.alertSpecs = alertSpecsMap;
-        this.publishSpecs = pubMap;
-        this.version = version;
-        this.generateTime = String.valueOf(new Date().getTime());
-        this.assignments = new ArrayList<PolicyAssignment>(assignments);
-        this.monitoredStreams = new ArrayList<MonitoredStream>(monitoredStreams);
-        this.policySnapshots = new ArrayList<VersionedPolicyDefinition>();
-        this.streamSnapshots = new ArrayList<VersionedStreamDefinition>();
-
-        for (SpoutSpec ss : this.spoutSpecs.values()) {
-            ss.setVersion(version);
-        }
-
-        for (RouterSpec ss : this.groupSpecs.values()) {
-            ss.setVersion(version);
-        }
-
-        for (AlertBoltSpec ss : this.alertSpecs.values()) {
-            ss.setVersion(version);
-        }
-
-        for (PublishSpec ps : this.publishSpecs.values()) {
-            ps.setVersion(version);
-        }
-
-        for (MonitoredStream ms : this.monitoredStreams) {
-            ms.setVersion(version);
-        }
-        for (PolicyAssignment ps : this.assignments) {
-            ps.setVersion(version);
-        }
-        for (PolicyDefinition def : definitions) {
-            this.policySnapshots.add(new VersionedPolicyDefinition(version, def));
-        }
-        for (StreamDefinition sd :streams) {
-            this.streamSnapshots.add(new VersionedStreamDefinition(version, sd));
-        }
-    }
-
-    public Map<String, SpoutSpec> getSpoutSpecs() {
-        return spoutSpecs;
-    }
-
-    public void setSpoutSpecs(Map<String, SpoutSpec> spoutSpecs) {
-        this.spoutSpecs = spoutSpecs;
-    }
-
-    public Map<String, AlertBoltSpec> getAlertSpecs() {
-        return alertSpecs;
-    }
-
-    public void setAlertSpecs(Map<String, AlertBoltSpec> alertSpecs) {
-        this.alertSpecs = alertSpecs;
-    }
-
-    public Map<String, RouterSpec> getGroupSpecs() {
-        return groupSpecs;
-    }
-
-    public void setGroupSpecs(Map<String, RouterSpec> groupSpecs) {
-        this.groupSpecs = groupSpecs;
-    }
-
-    public Map<String, PublishSpec> getPublishSpecs() {
-        return publishSpecs;
-    }
-
-    public void setPublishSpecs(Map<String, PublishSpec> publishSpecs) {
-        this.publishSpecs = publishSpecs;
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    public int getCode() {
-        return code;
-    }
-
-    public void setCode(int code) {
-        this.code = code;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-
-    public void setMessage(String message) {
-        this.message = message;
-    }
-
-    public String getGenerateTime() {
-        return generateTime;
-    }
-
-    public void setGenerateTime(String generateTime) {
-        this.generateTime = generateTime;
-    }
-
-    public List<MonitoredStream> getMonitoredStreams() {
-        return monitoredStreams;
-    }
-
-    public List<PolicyAssignment> getAssignments() {
-        return assignments;
-    }
-
-    public List<VersionedPolicyDefinition> getPolicySnapshots() {
-        return policySnapshots;
-    }
-
-    public void setPolicySnapshots(List<VersionedPolicyDefinition> policySnapshots) {
-        this.policySnapshots = policySnapshots;
-    }
-
-    public void setMonitoredStreams(List<MonitoredStream> monitoredStreams) {
-        this.monitoredStreams = monitoredStreams;
-    }
-
-    public void setAssignments(List<PolicyAssignment> assignments) {
-        this.assignments = assignments;
-    }
-
-    public List<VersionedStreamDefinition> getStreamSnapshots() {
-        return streamSnapshots;
-    }
-
-    public void setStreamSnapshots(List<VersionedStreamDefinition> streamSnapshots) {
-        this.streamSnapshots = streamSnapshots;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java
deleted file mode 100644
index a197858..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-
-/**
- * SpoutSpec metadata control 3 phases for data transformation for one specific topic
- * phase 1: kafka topic to tuple, controlled by Kafka2TupleMetadata, i.e. Scheme
- * phase 2: tuple to stream, controlled by Tuple2StreamMetadata, i.e. stream name selector etc.
- * phase 3: stream repartition, controlled by StreamRepartitionMetadata, i.e. groupby spec
- * @since Apr 18, 2016
- *
- */
-public class SpoutSpec {
-    private String version;
-
-//    private String spoutId;
-    private String topologyId;
-
-    // topicName -> kafka2TupleMetadata
-    private Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<String, Kafka2TupleMetadata>();
-    // topicName -> Tuple2StreamMetadata
-    private Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap = new HashMap<String, Tuple2StreamMetadata>();
-    // topicName -> list of StreamRepartitionMetadata, here it is list because one topic(data source) may spawn multiple streams.
-    private Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap = new HashMap<String, List<StreamRepartitionMetadata>>();
-
-    public SpoutSpec(){}
-
-    public SpoutSpec(
-            String topologyId,
-//            String spoutId,
-            Map<String, List<StreamRepartitionMetadata>>  streamRepartitionMetadataMap,
-            Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap,
-            Map<String, Kafka2TupleMetadata>  kafka2TupleMetadataMap) {
-        this.topologyId = topologyId;
-//        this.spoutId = spoutId;
-        this.streamRepartitionMetadataMap = streamRepartitionMetadataMap;
-        this.tuple2StreamMetadataMap = tuple2StreamMetadataMap;
-        this.kafka2TupleMetadataMap = kafka2TupleMetadataMap;
-    }
-
-//    public String getSpoutId() {
-//        return spoutId;
-//    }
-//    public void setSpoutId(String spoutId) {
-//        this.spoutId = spoutId;
-//    }
-
-    public String getTopologyId() {
-        return topologyId;
-    }
-
-    public Map<String, List<StreamRepartitionMetadata>> getStreamRepartitionMetadataMap() {
-        return streamRepartitionMetadataMap;
-    }
-
-    public Map<String, Tuple2StreamMetadata> getTuple2StreamMetadataMap(){
-        return this.tuple2StreamMetadataMap;
-    }
-
-    public Map<String, Kafka2TupleMetadata> getKafka2TupleMetadataMap() {
-        return kafka2TupleMetadataMap;
-    }
-
-    @org.codehaus.jackson.annotate.JsonIgnore
-    public StreamRepartitionMetadata getStream(String streamName) {
-        for (List<StreamRepartitionMetadata> meta : this.streamRepartitionMetadataMap.values()) {
-            Optional<StreamRepartitionMetadata> m = meta.stream().filter((t) -> t.getStreamId().equalsIgnoreCase(streamName)).findFirst();
-            if (m.isPresent()) {
-                return m.get();
-            }
-        }
-        return null;
-    }
-
-    public void setTopologyId(String topologyId) {
-        this.topologyId = topologyId;
-    }
-
-    public void setKafka2TupleMetadataMap(Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap) {
-        this.kafka2TupleMetadataMap = kafka2TupleMetadataMap;
-    }
-
-    public void setTuple2StreamMetadataMap(Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap) {
-        this.tuple2StreamMetadataMap = tuple2StreamMetadataMap;
-    }
-
-    public void setStreamRepartitionMetadataMap(Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap) {
-        this.streamRepartitionMetadataMap = streamRepartitionMetadataMap;
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("version:%s-topo:%s ", version, this.topologyId);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java
deleted file mode 100644
index bc7952c..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.Map;
-
-/**
- * This metadata controls how to figure out stream name from incoming tuple
- */
-public interface StreamNameSelector {
-    /**
-     * field name to value mapping
-     * @param tuple
-     * @return
-     */
-    String getStreamName(Map<String, Object> tuple);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java
deleted file mode 100644
index dac04e8..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-/**
- * @since Apr 25, 2016
- * This meta-data controls how tuple streamId is repartitioned
- */
-public class StreamRepartitionMetadata {
-    private String topicName;
-    private String streamId;
-    /**
-     * each stream may have multiple different grouping strategies,for example groupby some fields or even shuffling
-     */
-    public List<StreamRepartitionStrategy> groupingStrategies = new ArrayList<StreamRepartitionStrategy>();
-
-    public StreamRepartitionMetadata(){}
-
-    public StreamRepartitionMetadata(String topicName, String stream) {
-        this.topicName = topicName;
-        this.streamId = stream;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public String getTopicName() {
-        return topicName;
-    }
-    public void setTopicName(String topicName) {
-        this.topicName = topicName;
-    }
-
-    public List<StreamRepartitionStrategy> getGroupingStrategies() {
-        return groupingStrategies;
-    }
-
-    @JsonIgnore
-    public void addGroupStrategy(StreamRepartitionStrategy gs) {
-        this.groupingStrategies.add(gs);
-    }
-
-    public void setGroupingStrategies(List<StreamRepartitionStrategy> groupingStrategies) {
-        this.groupingStrategies = groupingStrategies;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java
deleted file mode 100644
index 203114e..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-public class StreamRepartitionStrategy {
-    public StreamPartition partition ;
-
-    public int numTotalParticipatingRouterBolts = 0;      // how many group-by bolts participate policy evaluation
-    public int startSequence = 0;            // what is the sequence for the first bolt in this topology among all bolts
-    public List<String> totalTargetBoltIds = new ArrayList<String>();
-    
-    public int hashCode() {
-        int hashcode = 1 * 31;
-        hashcode += partition.hashCode();
-        for (String str : totalTargetBoltIds) {
-            hashcode += str.hashCode();
-        }
-        return hashcode;
-    }
-    
-    public boolean equals(Object obj) {
-        if (!(obj instanceof StreamRepartitionStrategy)) {
-            return false;
-        }
-        StreamRepartitionStrategy o = (StreamRepartitionStrategy) obj;
-        return partition.equals(o.partition)
-                && CollectionUtils.isEqualCollection(totalTargetBoltIds, o.totalTargetBoltIds);
-    }
-
-    public StreamPartition getPartition() {
-        return partition;
-    }
-
-    public void setPartition(StreamPartition partition) {
-        this.partition = partition;
-    }
-
-    public int getNumTotalParticipatingRouterBolts() {
-        return numTotalParticipatingRouterBolts;
-    }
-
-    public void setNumTotalParticipatingRouterBolts(int numTotalParticipatingRouterBolts) {
-        this.numTotalParticipatingRouterBolts = numTotalParticipatingRouterBolts;
-    }
-
-    public int getStartSequence() {
-        return startSequence;
-    }
-
-    public void setStartSequence(int startSequence) {
-        this.startSequence = startSequence;
-    }
-
-    public List<String> getTotalTargetBoltIds() {
-        return totalTargetBoltIds;
-    }
-
-    public void setTotalTargetBoltIds(List<String> totalTargetBoltIds) {
-        this.totalTargetBoltIds = totalTargetBoltIds;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java
deleted file mode 100644
index 773ae56..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-/**
- * One RouteSpec means one rule mapping [streamId -> StreamPartition ->
- * PolicyExecutionQueue]
- *
- * Key is StreamPartition
- */
-public class StreamRouterSpec {
-    private String streamId;
-    private StreamPartition partition; // The meta-data to build
-                                       // StreamPartitioner
-    private List<PolicyWorkerQueue> targetQueue = new ArrayList<PolicyWorkerQueue>();
-
-    public StreamPartition getPartition() {
-        return partition;
-    }
-
-    public void setPartition(StreamPartition partition) {
-        this.partition = partition;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().append(this.streamId).append(this.partition).append(targetQueue).build();
-    }
-
-    public List<PolicyWorkerQueue> getTargetQueue() {
-        return targetQueue;
-    }
-
-    public void addQueue(PolicyWorkerQueue queue) {
-        this.targetQueue.add(queue);
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public void setTargetQueue(List<PolicyWorkerQueue> targetQueue) {
-        this.targetQueue = targetQueue;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("StreamRouterSpec[streamId=%s,partition=%s, queue=[%s]]", this.getStreamId(),
-                this.getPartition(), this.getTargetQueue());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
deleted file mode 100644
index dc9d1ba..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Convert incoming tuple to stream
- * incoming tuple consists of 2 fields, topic and map of key/value
- * output stream consists of 3 fields, stream name, timestamp, and map of key/value
- */
-public class Tuple2StreamConverter {
-    private static final Logger LOG = LoggerFactory.getLogger(Tuple2StreamConverter.class);
-    private Tuple2StreamMetadata metadata;
-    private StreamNameSelector cachedSelector;
-    public Tuple2StreamConverter(Tuple2StreamMetadata metadata){
-        this.metadata = metadata;
-        try {
-            cachedSelector = (StreamNameSelector)Class.forName(metadata.getStreamNameSelectorCls()).
-                    getConstructor(Properties.class).
-                    newInstance(metadata.getStreamNameSelectorProp());
-        }catch(Exception ex){
-            LOG.error("error initializing StreamNameSelector object", ex);
-            throw new IllegalStateException(ex);
-        }
-    }
-
-    /**
-     * Assume tuple is composed of topic + map of key/value
-     * @param tuple
-     * @return
-     */
-    @SuppressWarnings({ "unchecked" })
-    public List<Object> convert(List<Object> tuple){
-        Map<String, Object> m = (Map<String, Object>)tuple.get(1);
-        String streamName = cachedSelector.getStreamName(m);
-        if(!metadata.getActiveStreamNames().contains(streamName)) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("streamName {} is not within activeStreamNames {}", streamName, metadata.getActiveStreamNames());
-            }
-            return null;
-        }
-
-        Object timeObject = m.get(metadata.getTimestampColumn());
-        long timestamp = 0L;
-        if(timeObject instanceof Number){
-            timestamp = ((Number) timeObject).longValue();
-        }else{
-            String timestampFieldValue = (String) m.get(metadata.getTimestampColumn());
-            try {
-                SimpleDateFormat sdf = new SimpleDateFormat(metadata.getTimestampFormat());
-                timestamp = sdf.parse(timestampFieldValue).getTime();
-            } catch (Exception ex) {
-                LOG.error("continue with current timestamp because error happens while parsing timestamp column " + metadata.getTimestampColumn() + " with format " + metadata.getTimestampFormat());
-                timestamp = System.currentTimeMillis();
-            }
-        }
-        return Arrays.asList(tuple.get(0), streamName, timestamp, tuple.get(1));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
deleted file mode 100644
index bde4fe3..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * @Since 4/25/16. This metadata controls how tuple is transformed to stream for
- *        example raw data consists of {"metric" : "cpuUsage", "host" :
- *        "xyz.com", "timestamp" : 1346846400, "value" : "0.9"} field "metric"
- *        is used for creating stream name, here "cpuUsage" is stream name
- *
- *        metric could be "cpuUsage", "diskUsage", "memUsage" etc, so
- *        activeStreamNames are subset of all metric names
- *
- *        All other messages which are not one of activeStreamNames will be
- *        filtered out
- */
-public class Tuple2StreamMetadata {
-    /**
-     * only messages belonging to activeStreamNames will be kept while
-     * transforming tuple into stream
-     */
-    private Set<String> activeStreamNames = new HashSet<String>();
-    // the specific stream name selector
-    private Properties streamNameSelectorProp;
-    private String streamNameSelectorCls;
-    private String timestampColumn;
-    private String timestampFormat;
-
-    public Set<String> getActiveStreamNames() {
-        return activeStreamNames;
-    }
-
-    public void setActiveStreamNames(Set<String> activeStreamNames) {
-        this.activeStreamNames = activeStreamNames;
-    }
-
-    public Properties getStreamNameSelectorProp() {
-        return streamNameSelectorProp;
-    }
-
-    public void setStreamNameSelectorProp(Properties streamNameSelectorProp) {
-        this.streamNameSelectorProp = streamNameSelectorProp;
-    }
-
-    public String getStreamNameSelectorCls() {
-        return streamNameSelectorCls;
-    }
-
-    public void setStreamNameSelectorCls(String streamNameSelectorCls) {
-        this.streamNameSelectorCls = streamNameSelectorCls;
-    }
-
-    public String getTimestampColumn() {
-        return timestampColumn;
-    }
-
-    public void setTimestampColumn(String timestampColumn) {
-        this.timestampColumn = timestampColumn;
-    }
-
-    public String getTimestampFormat() {
-        return timestampFormat;
-    }
-
-    public void setTimestampFormat(String timestampFormat) {
-        this.timestampFormat = timestampFormat;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
deleted file mode 100644
index f4b8ccb..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-/**
- * @since May 25, 2016
- *
- */
-public class VersionedPolicyDefinition {
-    private String version;
-    private PolicyDefinition definition;
-
-    public VersionedPolicyDefinition() {
-    }
-
-    public VersionedPolicyDefinition(String version, PolicyDefinition def) {
-        this.version = version;
-        this.definition = def;
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    public PolicyDefinition getDefinition() {
-        return definition;
-    }
-
-    public void setDefinition(PolicyDefinition definition) {
-        this.definition = definition;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
deleted file mode 100644
index 2770aa1..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * @since May 25, 2016
- *
- */
-public class VersionedStreamDefinition {
-    private String version;
-    private StreamDefinition definition;
-
-    public VersionedStreamDefinition() {
-    }
-
-    public VersionedStreamDefinition(String version, StreamDefinition def) {
-        this.version = version;
-        this.definition = def;
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    public StreamDefinition getDefinition() {
-        return definition;
-    }
-
-    public void setDefinition(StreamDefinition definition) {
-        this.definition = definition;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
deleted file mode 100644
index 96016f4..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model;
-
-
-/**
- * A slot is simply a bolt.
- */
-public class WorkSlot {
-    public String topologyName;
-    public String boltId;
-
-    public String getTopologyName() {
-        return topologyName;
-    }
-
-    public void setTopologyName(String topologyName) {
-        this.topologyName = topologyName;
-    }
-
-    public String getBoltId() {
-        return boltId;
-    }
-
-    public void setBoltId(String boltId) {
-        this.boltId = boltId;
-    }
-
-    public WorkSlot() {
-
-    }
-
-    public WorkSlot(String topo, String boltId) {
-        this.topologyName = topo;
-        this.boltId = boltId;
-    }
-
-    public String toString() {
-        return "(" + topologyName + ":" + boltId + ")";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
deleted file mode 100644
index beda896..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model.internal;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-/**
- * A monitored stream is the unique data set in the system.
- * 
- * It's a combination of stream and the specific grp-by on it.
- * 
- * For correlation stream, it means multiple stream for a given monitored stream.
- * 
- * 
- * @since Apr 27, 2016
- *
- */
-public class MonitoredStream {
-
-    private String version;
-
-    // the stream group that this monitored stream stands for
-    private StreamGroup streamGroup = new StreamGroup();
-    private List<StreamWorkSlotQueue> queues = new ArrayList<StreamWorkSlotQueue>();
-    
-    public MonitoredStream() {
-    }
-
-    public MonitoredStream(StreamGroup par) {
-        this.streamGroup = par;
-    }
-
-    public StreamGroup getStreamGroup() {
-        return streamGroup;
-    }
-
-    public List<StreamWorkSlotQueue> getQueues() {
-        return queues;
-    }
-
-    public synchronized void addQueues(StreamWorkSlotQueue queue) {
-        queues.add(queue);
-    }
-
-    public synchronized boolean removeQueue(StreamWorkSlotQueue queue) {
-        return this.queues.remove(queue);
-    }
-
-    public int hashCode() {
-        return new HashCodeBuilder().append(streamGroup).build();
-    }
-
-    public boolean equals(Object other) {
-        if (!(other instanceof MonitoredStream)) {
-            return false;
-        }
-        MonitoredStream o = (MonitoredStream) other;
-        return Objects.equals(streamGroup, o.streamGroup);
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    public void setQueues(List<StreamWorkSlotQueue> queues) {
-        this.queues = queues;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
deleted file mode 100644
index 3e956ca..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model.internal;
-
-/**
- * monitor metadata
- * 
- * @since Apr 27, 2016
- *
- */
-public class PolicyAssignment {
-
-    private String version;
-
-    private String policyName;
-    private String queueId;
-
-    public PolicyAssignment() {
-    }
-
-    public PolicyAssignment(String policyName, String queueId) {
-        this.policyName = policyName;
-        this.queueId = queueId;
-    }
-
-    public String getPolicyName() {
-        return policyName;
-    }
-
-    public String getQueueId() {
-        return queueId;
-    }
-
-    public void setPolicyName(String policyName) {
-        this.policyName = policyName;
-    }
-
-    public void setQueueId(String queueId) {
-        this.queueId = queueId;
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("PolicyAssignment of policy %s, queueId %s, version %s !", policyName, queueId, version);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
deleted file mode 100644
index 9cbd841..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model.internal;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.base.Objects;
-
-/**
- * @since May 6, 2016
- *
- */
-public class StreamGroup {
-
-    private List<StreamPartition> streamPartitions = new ArrayList<StreamPartition>();
-    
-    public StreamGroup() {
-    }
-    
-    public List<StreamPartition> getStreamPartitions() {
-        return streamPartitions;
-    }
-
-    public void addStreamPartition(StreamPartition sp) {
-        this.streamPartitions.add(sp);
-    }
-
-    public void addStreamPartitions(List<StreamPartition> sps) {
-        this.streamPartitions.addAll(sps);
-    }
-
-    @org.codehaus.jackson.annotate.JsonIgnore
-    @JsonIgnore
-    public String getStreamId() {
-        StringBuilder sb = new StringBuilder("SG[");
-        for (StreamPartition sp : streamPartitions) {
-            sb.append(sp.getStreamId()).append("-");
-        }
-        sb.append("]");
-        return sb.toString();
-    }
-
-    @Override
-    public int hashCode() {
-        // implicitly all groups in stream groups will be built for hash code
-        return new HashCodeBuilder().append(streamPartitions).build();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof StreamGroup)) {
-            return false;
-        }
-        StreamGroup o = (StreamGroup) obj;
-        return Objects.equal(this.streamPartitions, o.streamPartitions);
-    }
-
-    @Override
-    public String toString() {
-        return String.format("StreamGroup partitions=: %s ", streamPartitions);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
deleted file mode 100644
index fab6217..0000000
--- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model.internal;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * A work queue for given one monitored stream.
- * 
- * Analog to storm's "tasks for given bolt".
- * 
- * @since Apr 27, 2016
- *
- */
-public class StreamWorkSlotQueue {
-    private String queueId;
-
-    private final List<WorkSlot> workingSlots = new LinkedList<WorkSlot>();
-    private boolean dedicated;
-    // some dedicated option, like dedicated userId/tenantId/policyId.
-    private Map<String, Object> dedicateOption;
-    
-    private int numberOfGroupBolts;
-    private Map<String, Integer> topoGroupStartIndex = new HashMap<String, Integer>(); 
-
-    public StreamWorkSlotQueue() {
-    }
-    
-    public StreamWorkSlotQueue(StreamGroup par, boolean isDedicated, Map<String, Object> options,
-            List<WorkSlot> slots) {
-        this.queueId = par.getStreamId() + System.currentTimeMillis();// simply generate a queue
-        this.dedicated = isDedicated;
-        dedicateOption = new HashMap<String, Object>();
-        dedicateOption.putAll(options);
-        this.workingSlots.addAll(slots);
-    }
-
-    public Map<String, Object> getDedicateOption() {
-        return dedicateOption;
-    }
-
-    public void setDedicateOption(Map<String, Object> dedicateOption) {
-        this.dedicateOption = dedicateOption;
-    }
-
-    public List<WorkSlot> getWorkingSlots() {
-        return workingSlots;
-    }
-
-    public boolean isDedicated() {
-        return dedicated;
-    }
-
-    public void setDedicated(boolean dedicated) {
-        this.dedicated = dedicated;
-    }
-
-    @org.codehaus.jackson.annotate.JsonIgnore
-    @JsonIgnore
-    public int getQueueSize() {
-        return workingSlots.size();
-    }
-
-//    @org.codehaus.jackson.annotate.JsonIgnore
-//    @JsonIgnore
-//    public void placePolicy(PolicyDefinition pd) {
-//        policies.add(pd.getName());
-//    }
-
-    public int getNumberOfGroupBolts() {
-        return numberOfGroupBolts;
-    }
-
-    public void setNumberOfGroupBolts(int numberOfGroupBolts) {
-        this.numberOfGroupBolts = numberOfGroupBolts;
-    }
-
-    public Map<String, Integer> getTopoGroupStartIndex() {
-        return topoGroupStartIndex;
-    }
-
-    public void setTopoGroupStartIndex(Map<String, Integer> topoGroupStartIndex) {
-        this.topoGroupStartIndex = topoGroupStartIndex;
-    }
-
-    @org.codehaus.jackson.annotate.JsonIgnore
-    @JsonIgnore
-    public int getTopologyGroupStartIndex(String topo) {
-        if (topoGroupStartIndex.containsKey(topo)) {
-            return this.topoGroupStartIndex.get(topo);
-        }
-        return -1;
-    }
-
-    public String getQueueId() {
-        return queueId;
-    }
-
-    public void setQueueId(String queueId) {
-        this.queueId = queueId;
-    }
-
-}



Mime
View raw message