eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [33/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:41 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
deleted file mode 100644
index 0cf4a5d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-
-/**
- * @since Mar 24, 2016.
- */
-public interface IPolicyScheduler {
-
-    void init(IScheduleContext context, TopologyMgmtService mgmtService);
-
-    /**
-     * Build the assignments for all.
-     */
-    ScheduleState schedule(ScheduleOption option);
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
deleted file mode 100644
index b21948b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import java.util.Map;
-
-/**
- * @since Mar 28, 2016.
- */
-public interface IScheduleContext {
-
-    Map<String, Topology> getTopologies();
-
-    Map<String, PolicyDefinition> getPolicies();
-
-    // data source
-    Map<String, Kafka2TupleMetadata> getDataSourceMetadata();
-
-    Map<String, StreamDefinition> getStreamSchemas();
-
-    Map<String, TopologyUsage> getTopologyUsages();
-
-    Map<String, PolicyAssignment> getPolicyAssignments();
-
-    Map<StreamGroup, MonitoredStream> getMonitoredStreams();
-
-    Map<String, Publishment> getPublishments();
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
deleted file mode 100644
index 29e8cac..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-
-public class LockWebApplicationException extends WebApplicationException {
-
-    private static final long serialVersionUID = 3441072187262776401L;
-
-    public LockWebApplicationException() {
-        super(Response.Status.INTERNAL_SERVER_ERROR);
-    }
-
-    public LockWebApplicationException(String message) {
-        super(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(message).type("text/plain").build());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
deleted file mode 100644
index c1bc726..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler;
-
-/**
- * @since Mar 24, 2016.
- */
-public class PolicySchedulerFactory {
-
-    public static IPolicyScheduler createScheduler() {
-        return new GreedyPolicyScheduler();
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
deleted file mode 100644
index 6fe27c5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-/**
- * A runtime option for one schedule processing.
- * <p>Could used for configuration override.</p>
- *
- * @since Apr 19, 2016
- */
-public class ScheduleOption {
-    private int policiesPerBolt;
-    private int boltParallelism;
-    private int policyDefaultParallelism;
-    private double boltLoadUpbound;
-    private double topoLoadUpbound;
-
-    public int getPoliciesPerBolt() {
-        return policiesPerBolt;
-    }
-
-    public void setPoliciesPerBolt(int policiesPerBolt) {
-        this.policiesPerBolt = policiesPerBolt;
-    }
-
-    public int getBoltParallelism() {
-        return boltParallelism;
-    }
-
-    public void setBoltParallelism(int boltParallelism) {
-        this.boltParallelism = boltParallelism;
-    }
-
-    public int getPolicyDefaultParallelism() {
-        return policyDefaultParallelism;
-    }
-
-    public void setPolicyDefaultParallelism(int policyDefaultParallelism) {
-        this.policyDefaultParallelism = policyDefaultParallelism;
-    }
-
-    public double getBoltLoadUpbound() {
-        return boltLoadUpbound;
-    }
-
-    public void setBoltLoadUpbound(double boltLoadUpbound) {
-        this.boltLoadUpbound = boltLoadUpbound;
-    }
-
-    public double getTopoLoadUpbound() {
-        return topoLoadUpbound;
-    }
-
-    public void setTopoLoadUpbound(double topoLoadUpbound) {
-        this.topoLoadUpbound = topoLoadUpbound;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
deleted file mode 100644
index 4ede29d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.NUM_OF_ALERT_BOLTS_PER_TOPOLOGY;
-
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * @since Mar 29, 2016.
- */
-public class TopologyMgmtService {
-
-    public static class TopologyMeta {
-        public String topologyId;
-        public Topology topology;
-        public TopologyUsage usage;
-
-        public String clusterId;
-        public String nimbusHost;
-        public String nimbusPort;
-
-    }
-
-    public static class StormClusterMeta {
-        public String clusterId;
-        public String nimbusHost;
-        public String nimbusPort;
-        public String stormVersion;
-    }
-
-    @SuppressWarnings("unused")
-    private int boltParallelism = 0;
-    private int numberOfBoltsPerTopology = 0;
-
-    public TopologyMgmtService() {
-        Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR);
-        //boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM);
-        numberOfBoltsPerTopology = config.getInt(NUM_OF_ALERT_BOLTS_PER_TOPOLOGY);
-    }
-
-    public int getNumberOfAlertBoltsInTopology() {
-        return numberOfBoltsPerTopology;
-    }
-
-    /**
-     * TODO: call topology mgmt API to create a topology.
-     */
-    public TopologyMeta creatTopology() {
-        // TODO
-        throw new UnsupportedOperationException("not supported yet!");
-    }
-
-    public List<TopologyMeta> listTopologies() {
-        // TODO
-        return Collections.emptyList();
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java
deleted file mode 100644
index 9dc177b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import java.util.*;
-
-/**
- * Created on 10/1/16.
- */
-public class ValidateState {
-
-    private boolean isOk = false;
-
-    private List<String> unusedDataSources = new ArrayList<>();
-    private List<String> unusedStreams = new ArrayList<>();
-    private List<String> unPublishedPolicies = new ArrayList<>();
-
-    /*
-     * Includes validation of extension class existence
-     * Policy expression validation
-     * Inter-Reference validation
-     */
-    private Map<String, List<String>> dataSourcesValidation = new HashMap<>();
-    private Map<String, List<String>> streamsValidation = new HashMap<>();
-    private Map<String, List<String>> policiesValidation = new HashMap<>();
-    private Map<String, List<String>> publishmentValidation = new HashMap<>();
-    private Map<String, List<String>> topoMetaValidation = new HashMap<>();
-
-    public void appendUnusedDatasource(String ds) {
-        unusedDataSources.add(ds);
-    }
-
-    public void appendUnusedStreams(String s) {
-        unusedStreams.add(s);
-    }
-
-    public void appendUnPublishedPolicies(String s) {
-        unPublishedPolicies.add(s);
-    }
-
-    public void appendDataSourceValidation(String name, String msg) {
-        if (!dataSourcesValidation.containsKey(name)) {
-            dataSourcesValidation.putIfAbsent(name, new LinkedList<>());
-        }
-        dataSourcesValidation.get(name).add(msg);
-    }
-
-    public void appendStreamValidation(String name, String msg) {
-        if (!streamsValidation.containsKey(name)) {
-            streamsValidation.putIfAbsent(name, new LinkedList<>());
-        }
-        streamsValidation.get(name).add(msg);
-    }
-
-    public void appendPolicyValidation(String name, String msg) {
-        if (!policiesValidation.containsKey(name)) {
-            policiesValidation.putIfAbsent(name, new LinkedList<>());
-        }
-        policiesValidation.get(name).add(msg);
-    }
-
-    public void appendPublishemtnValidation(String name, String msg) {
-        if (!publishmentValidation.containsKey(name)) {
-            publishmentValidation.putIfAbsent(name, new LinkedList<>());
-        }
-        publishmentValidation.get(name).add(msg);
-    }
-
-    public void appendTopoMetaValidation(String name, String msg) {
-        if (!topoMetaValidation.containsKey(name)) {
-            topoMetaValidation.putIfAbsent(name, new LinkedList<>());
-        }
-        topoMetaValidation.get(name).add(msg);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
deleted file mode 100644
index e50b64c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.*;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.*;
-import org.apache.eagle.alert.coordinator.IPolicyScheduler;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.ScheduleOption;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.utils.JsonUtils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * A simple greedy assigner. <br/>
- *
- * <p>A greedy assigner simply loop the policies, find the most suitable topology
- * to locate the policy first, then assign the topics to corresponding
- * spouts/group-by bolts.</p>
- *
- * <p>For each given policy, the greedy steps are</p>
- *
- * <ul>
- * <li>1. Find the same topology that already serve the policy without exceed the load</li>
- * <li>2. Find the topology that already take the source traffic without exceed the load</li>
- * <li>3. Find the topology that available to place source topic without exceed the load</li>
- * <li>4. Create a new topology and locate the policy</li>
- * <li>Route table generated after all policies assigned</li>
- * </ul>
- * @since Mar 24, 2016
- */
-public class GreedyPolicyScheduler implements IPolicyScheduler {
-
-    private static final Logger LOG = LoggerFactory.getLogger(GreedyPolicyScheduler.class);
-
-    private int policiesPerBolt;
-    private int policyDefaultParallelism;
-    private int initialQueueSize;
-    private double boltLoadUpbound;
-
-    // copied context for scheduling
-    private IScheduleContext context;
-
-    private TopologyMgmtService mgmtService;
-
-    private ScheduleState state;
-
-    public GreedyPolicyScheduler() {
-        Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR);
-        policiesPerBolt = config.getInt(POLICIES_PER_BOLT);
-        policyDefaultParallelism = config.getInt(POLICY_DEFAULT_PARALLELISM);
-        initialQueueSize = policyDefaultParallelism;
-        boltLoadUpbound = config.getDouble(CONFIG_ITEM_BOLT_LOAD_UPBOUND);
-    }
-
-    public synchronized ScheduleState schedule(ScheduleOption option) {
-        // FIXME: never re-assign right now: sticky mode
-        // TODO: how to identify the over-heat nodes? not yet done #Scale of policies
-        // Answer: Use configured policiesPerBolt and configured bolt load up-bound
-        // FIXME: Here could be strategy to define the topology priorities
-        List<WorkItem> workSets = findWorkingSets();
-        /**
-         * <pre>
-         * <ul>
-         * <li>how to support take multiple "dumped" topology that consuming the same input as one available sets?</li>
-         * Answer: spout spec generated after policy assigned
-         * <li>How to define the input traffic partition?</li>
-         * Answer: input traffic might not be supported right now.
-         * <li>How to support traffic partition between topology?</li> 
-         * Answer: two possible place: a global route table will be generated, those target not in current topology tuples will be dropped. This make the partition for tuple to alert
-         * <li>How to support add topology on demand by evaluate the available topology bandwidth(need topology level load)?</li>
-         * Answer: Use configured topology load up-bound, when topology load is available, will adopt
-         * </ul>
-         * </pre>
-         */
-        List<ScheduleResult> results = new ArrayList<ScheduleResult>();
-        Map<String, PolicyAssignment> newAssignments = new HashMap<String, PolicyAssignment>();
-        for (WorkItem item : workSets) {
-            ScheduleResult r = schedulePolicy(item, newAssignments);
-            results.add(r);
-        }
-
-        state = generateMonitorMetadata(workSets, newAssignments);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("calculated schedule state: {}", JsonUtils.writeValueAsString(state));
-        }
-        return state;
-    }
-
-    private List<WorkItem> findWorkingSets() {
-        // find the unassigned definition
-        List<WorkItem> workSets = new LinkedList<WorkItem>();
-        for (PolicyDefinition def : context.getPolicies().values()) {
-            int expectParal = def.getParallelismHint();
-            if (expectParal == 0) {
-                expectParal = policyDefaultParallelism;
-            }
-            // how to handle expand of an policy in a smooth transition manner
-            // TODO policy fix
-            PolicyAssignment assignment = context.getPolicyAssignments().get(def.getName());
-            if (assignment != null) {
-                LOG.info("policy {} already allocated", def.getName());
-                continue;
-            }
-
-            WorkItem item = new WorkItem(def, expectParal);
-            workSets.add(item);
-        }
-        LOG.info("work set calculation: {}", workSets);
-        return workSets;
-    }
-
-    private ScheduleState generateMonitorMetadata(List<WorkItem> expandworkSets,
-                                                  Map<String, PolicyAssignment> newAssignments) {
-        MonitorMetadataGenerator generator = new MonitorMetadataGenerator(context);
-        return generator.generate(expandworkSets);
-    }
-
-    private void placePolicy(PolicyDefinition def, AlertBoltUsage alertBoltUsage, Topology targetTopology,
-                             TopologyUsage usage) {
-        String policyName = def.getName();
-
-        // topology usage update
-        alertBoltUsage.addPolicies(def);
-
-        // update alert policy
-        usage.getPolicies().add(policyName);
-
-        // update source topics
-        updateDataSource(usage, def);
-
-        // update group-by
-        updateGrouping(usage, def);
-    }
-
-    private void updateGrouping(TopologyUsage usage, PolicyDefinition def) {
-        // groupByMeta is removed since groupspec generate doesnt need it now. 
-        //        List<StreamPartition> policyPartitionSpec = def.getPartitionSpec();
-        //        Map<String, List<StreamPartition>> groupByMeta = usage.getGroupByMeta();
-        //        for (StreamPartition par : policyPartitionSpec) {
-        //            List<StreamPartition> partitions = groupByMeta.get(par.getStreamId());
-        //            if (partitions == null) {
-        //                partitions = new ArrayList<StreamPartition>();
-        //                // de-dup of the partition on the list?
-        //                groupByMeta.put(par.getStreamId(), partitions);
-        //            }
-        //            if (!partitions.contains(par)) {
-        //                partitions.add(par);
-        //            }
-        //        }
-    }
-
-    private void updateDataSource(TopologyUsage usage, PolicyDefinition def) {
-        List<String> datasources = findDatasource(def);
-        usage.getDataSources().addAll(datasources);
-    }
-
-    private List<String> findDatasource(PolicyDefinition def) {
-        List<String> result = new ArrayList<String>();
-
-        List<String> inputStreams = def.getInputStreams();
-        Map<String, StreamDefinition> schemaMaps = context.getStreamSchemas();
-        for (String is : inputStreams) {
-            StreamDefinition ss = schemaMaps.get(is);
-            result.add(ss.getDataSource());
-        }
-        return result;
-    }
-
-    /**
-     * For each given policy, the greedy steps are
-     * <ul>
-     * <li>1. Find the same topology that already serve the policy</li>
-     * <li>2. Find the topology that already take the source traffic</li>
-     * <li>3. Find the topology that available to place source topic</li>
-     * <li>4. Create a new topology and locate the policy</li>
-     * <li>Route table generated after all policies assigned</li>
-     * </ul>
-     * <br/>
-     *
-     * @param newAssignments
-     */
-    private ScheduleResult schedulePolicy(WorkItem item, Map<String, PolicyAssignment> newAssignments) {
-        LOG.info(" schedule for {}", item);
-
-        String policyName = item.def.getName();
-        StreamGroup policyStreamPartition = new StreamGroup();
-        if (item.def.getPartitionSpec().isEmpty()) {
-            LOG.error(" policy {} partition spec is empty! ", policyName);
-            ScheduleResult result = new ScheduleResult();
-            result.policyName = policyName;
-            result.code = 400;
-            result.message = "policy doesn't have partition spec";
-            return result;
-        }
-        policyStreamPartition.addStreamPartitions(item.def.getPartitionSpec(), item.def.isDedicated());
-
-        MonitoredStream targetdStream = context.getMonitoredStreams().get(policyStreamPartition);
-        if (targetdStream == null) {
-            targetdStream = new MonitoredStream(policyStreamPartition);
-            context.getMonitoredStreams().put(policyStreamPartition, targetdStream);
-        }
-
-        ScheduleResult result = new ScheduleResult();
-        result.policyName = policyName;
-
-        StreamWorkSlotQueue queue = findWorkSlotQueue(targetdStream, item.def);
-        if (queue == null) {
-            result.code = 400;
-            result.message = String.format("unable to allocate work queue resource for policy %s !", policyName);
-        } else {
-            placePolicyToQueue(item.def, queue, newAssignments);
-            result.code = 200;
-            result.message = "OK";
-        }
-
-        LOG.info(" schedule result : {}", result);
-        return result;
-    }
-
-    private void placePolicyToQueue(PolicyDefinition def, StreamWorkSlotQueue queue,
-                                    Map<String, PolicyAssignment> newAssignments) {
-        for (WorkSlot slot : queue.getWorkingSlots()) {
-            Topology targetTopology = context.getTopologies().get(slot.getTopologyName());
-            TopologyUsage usage = context.getTopologyUsages().get(slot.getTopologyName());
-            AlertBoltUsage alertBoltUsage = usage.getAlertBoltUsage(slot.getBoltId());
-            placePolicy(def, alertBoltUsage, targetTopology, usage);
-        }
-        // queue.placePolicy(def);
-        PolicyAssignment assignment = new PolicyAssignment(def.getName(), queue.getQueueId());
-        context.getPolicyAssignments().put(def.getName(), assignment);
-        newAssignments.put(def.getName(), assignment);
-    }
-
-    private StreamWorkSlotQueue findWorkSlotQueue(MonitoredStream targetdStream, PolicyDefinition def) {
-        StreamWorkSlotQueue targetQueue = null;
-        for (StreamWorkSlotQueue queue : targetdStream.getQueues()) {
-            if (isQueueAvailable(queue, def)) {
-                targetQueue = queue;
-                break;
-            }
-        }
-
-        if (targetQueue == null) {
-            WorkQueueBuilder builder = new WorkQueueBuilder(context, mgmtService);
-            // TODO : get the properties from policy definiton
-            targetQueue = builder.createQueue(targetdStream, def.isDedicated(), getQueueSize(def.getParallelismHint()),
-                new HashMap<String, Object>());
-        }
-        return targetQueue;
-    }
-
-    /**
-     * Some strategy to generate correct size in Startegy of queue builder.
-     */
-    private int getQueueSize(int hint) {
-        if (hint == 0) {
-            // some policies require single bolt to execute
-            return 1;
-        }
-        return initialQueueSize * ((hint + initialQueueSize - 1) / initialQueueSize);
-    }
-
-    private boolean isQueueAvailable(StreamWorkSlotQueue queue, PolicyDefinition def) {
-        if (queue.getQueueSize() < def.getParallelismHint()) {
-            return false;
-        }
-
-        for (WorkSlot slot : queue.getWorkingSlots()) {
-            TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName());
-            AlertBoltUsage usage = u.getAlertBoltUsage(slot.getBoltId());
-            if (!isBoltAvailable(usage, def)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private boolean isBoltAvailable(AlertBoltUsage boltUsage, PolicyDefinition def) {
-        // overload or over policy # or already contains
-        if (boltUsage == null || boltUsage.getLoad() > boltLoadUpbound
-            || boltUsage.getPolicies().size() >= policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
-            return false;
-        }
-        return true;
-    }
-
-    public void init(IScheduleContext context, TopologyMgmtService mgmtService) {
-        this.context = new InMemScheduleConext(context);
-        this.mgmtService = mgmtService;
-    }
-
-    public IScheduleContext getContext() {
-        return context;
-    }
-
-    public ScheduleState getState() {
-        return state;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java
deleted file mode 100644
index 5d7eeb1..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.ValidateState;
-import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
-import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.SiddhiManager;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-/**
- * Created on 10/1/16.
- */
-public class MetadataValdiator {
-    private static final Logger LOG = LoggerFactory.getLogger(MetadataValdiator.class);
-
-    private static final Map<StreamColumn.Type, String> _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>();
-
-    static {
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING, "STRING");
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT, "INT");
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG, "LONG");
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, "FLOAT");
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, "DOUBLE");
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL, "BOOL");
-        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, "OBJECT");
-    }
-
-    private IScheduleContext context;
-    private final ValidateState state;
-
-    public MetadataValdiator(IMetadataServiceClient client) {
-        List<Topology> topologies = client.listTopologies();
-        List<Kafka2TupleMetadata> datasources = client.listDataSources();
-        List<StreamDefinition> streams = client.listStreams();
-        // filter out disabled policies
-        List<PolicyDefinition> enabledPolicies = client.listPolicies();
-        List<Publishment> publishments = client.listPublishment();
-
-        context = new InMemScheduleConext(ScheduleContextBuilder.listToMap(topologies), new HashMap<>(),
-            ScheduleContextBuilder.listToMap(datasources),
-            ScheduleContextBuilder.listToMap(enabledPolicies),
-            ScheduleContextBuilder.listToMap(publishments),
-            ScheduleContextBuilder.listToMap(streams), new HashMap<>(), new HashMap<>());
-        this.state = new ValidateState();
-    }
-
-    public MetadataValdiator(IScheduleContext context) {
-        this.context = context;
-        this.state = new ValidateState();
-    }
-
-
-    public ValidateState validate() {
-
-        validateTopology();
-
-        validateDataSources();
-
-        validateStreams();
-
-        validatePolicies();
-
-        validatePublishments();
-
-        return state;
-    }
-
-    private void validatePolicies() {
-        Collection<Publishment> pubs = context.getPublishments().values();
-        for (PolicyDefinition pd : context.getPolicies().values()) {
-            if (!pubs.stream().anyMatch(p -> p.getPolicyIds().contains(pd.getName()))) {
-                state.appendUnPublishedPolicies(pd.getName());
-            }
-
-            boolean isStreamMiss = false;
-            StringBuilder builder = new StringBuilder();
-            for (String inputStream : pd.getInputStreams()) {
-                if (context.getStreamSchemas().get(inputStream) == null) {
-                    state.appendPublishemtnValidation(pd.getName(), String.format("policy %s contains unknown stream %s!", pd.getName(), inputStream));
-                    isStreamMiss = true;
-                    break;
-                }
-                builder.append(buildStreamDefinition(context.getStreamSchemas().get(inputStream)));
-                builder.append("\n");
-            }
-
-            if (isStreamMiss) {
-                continue;
-            }
-            builder.append(pd.getDefinition().getValue());
-
-            // now evaluate
-            try {
-                SiddhiManager sm = new SiddhiManager();
-                sm.createExecutionPlanRuntime(builder.toString());
-            } catch (Exception e) {
-                LOG.error(String.format("siddhi creation failed! %s ", builder.toString()), e);
-                state.appendPolicyValidation(pd.getName(), e.getMessage());
-            }
-        }
-    }
-
-    private String buildStreamDefinition(StreamDefinition streamDefinition) {
-        List<String> columns = new ArrayList<>();
-        if (streamDefinition.getColumns() != null) {
-            for (StreamColumn column : streamDefinition.getColumns()) {
-                columns.add(String.format("%s %s", column.getName(), _EAGLE_SIDDHI_TYPE_MAPPING.get(column.getType().toString().toLowerCase())));
-            }
-        } else {
-            LOG.warn("No columns found for stream {}" + streamDefinition.getStreamId());
-        }
-        return String.format("define stream %s( %s );", streamDefinition.getStreamId(), StringUtils.join(columns, ","));
-    }
-
-
-    private void validatePublishments() {
-        Collection<PolicyDefinition> definitions = context.getPolicies().values();
-
-        for (Publishment p : context.getPublishments().values()) {
-            //TODO: check type; check serializer types; check dedup fields existence; check extend deduplicator...
-            Set<String> unknown = p.getPolicyIds().stream().filter(pid -> definitions.stream().anyMatch(pd -> pd.getName().equals(pid))).collect(Collectors.toSet());
-            if (unknown.size() > 0) {
-                state.appendPublishemtnValidation(p.getName(), String.format("publishment %s reference unknown/uneabled policy %s!", p.getName(), unknown));
-            }
-        }
-    }
-
-    private void validateStreams() {
-        Collection<Kafka2TupleMetadata> datasources = context.getDataSourceMetadata().values();
-        Collection<PolicyDefinition> definitions = context.getPolicies().values();
-        for (StreamDefinition sd : context.getStreamSchemas().values()) {
-            if (!datasources.stream().anyMatch(d -> d.getName().equals(sd.getDataSource()))) {
-                state.appendStreamValidation(sd.getStreamId(), String.format("stream %s reference unknown data source %s !", sd.getStreamId(), sd.getDataSource()));
-            }
-            if (!definitions.stream().anyMatch(p -> p.getInputStreams().contains(sd.getStreamId()))) {
-                state.appendUnusedStreams(sd.getStreamId());
-            }
-            // more on columns
-            if (sd.getColumns() == null || sd.getColumns().size() == 0) {
-                state.appendStreamValidation(sd.getStreamId(), String.format("stream %s have empty columns!", sd.getStreamId()));
-            }
-        }
-    }
-
-    private void validateDataSources() {
-        Collection<StreamDefinition> sds = context.getStreamSchemas().values();
-        for (Kafka2TupleMetadata ds : context.getDataSourceMetadata().values()) {
-            // simply do a O(^2) loop
-            if (!sds.stream().anyMatch(t -> t.getDataSource().equals(ds.getName()))) {
-                state.appendUnusedDatasource(ds.getName());
-            }
-
-            if (!"KAFKA".equalsIgnoreCase(ds.getType())) {
-                state.appendDataSourceValidation(ds.getName(), String.format(" unsupported data source type %s !", ds.getType()));
-            }
-
-            //scheme
-            //            String schemeCls = ds.getSchemeCls();
-            //            try {
-            //                Object scheme = Class.forName(schemeCls).getConstructor(String.class, Map.class).newInstance(ds.getTopic(), new HashMap<>());// coul only mock empty map
-            //                if (!(scheme instanceof MultiScheme || scheme instanceof Scheme)) {
-            //                    throw new IllegalArgumentException(" scheme class not subclass of Scheme or MultiScheme !");
-            //                }
-            //            } catch (Exception e) {
-            //                state.appendDataSourceValidation(ds.getName(), String.format("schemeCls %s expected to be qualified sub class name of %s or %s with given constructor signature!"
-            //                      +"Message: %s !",
-            //                    schemeCls, Scheme.class.getCanonicalName(), MultiScheme.class.getCanonicalName(), e.getMessage()));
-            //            }
-
-            // codec
-            if (ds.getCodec() == null) {
-                state.appendDataSourceValidation(ds.getName(), String.format("codec of datasource must *not* be null!"));
-                continue;
-            }
-            //            String selectCls = ds.getCodec().getStreamNameSelectorCls();
-            //            try {
-            //                StreamNameSelector cachedSelector = (StreamNameSelector) Class.forName(selectCls).getConstructor(Properties.class)
-            //                    .newInstance(ds.getCodec().getStreamNameSelectorProp());
-            //            } catch (Exception e) {
-            //                state.appendDataSourceValidation(ds.getName(), String.format("streamNameSelectorCls %s expected to be subclass of %s and with given constructor signature! Message: %s !",
-            //                    selectCls, StreamNameSelector.class.getCanonicalName(), e.getMessage()));
-            //            }
-
-        }
-    }
-
-    private void validateTopology() {
-        for (Topology t : context.getTopologies().values()) {
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
deleted file mode 100644
index fb20e66..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import org.apache.eagle.alert.coordination.model.*;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Given current policy placement, figure out monitor metadata
- * <p>TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create....
- * FIXME: too many duplicated code logic : check null; add list to map; add to list..</p>
- *
- * @since Apr 26, 2016
- */
-public class MonitorMetadataGenerator {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MonitorMetadataGenerator.class);
-
-    private IScheduleContext context;
-
-    public MonitorMetadataGenerator(IScheduleContext context) {
-        this.context = context;
-    }
-
-    public ScheduleState generate(List<WorkItem> expandworkSets) {
-        // topologyId -> SpoutSpec
-        Map<String, SpoutSpec> topoSpoutSpecsMap = generateSpoutMonitorMetadata();
-
-        // grp-by meta spec(sort & grp)
-        Map<String, RouterSpec> groupSpecsMap = generateGroupbyMonitorMetadata();
-
-        // alert bolt spec
-        Map<String, AlertBoltSpec> alertSpecsMap = generateAlertMonitorMetadata();
-
-        Map<String, PublishSpec> publishSpecsMap = generatePublishMetadata();
-
-        String uniqueVersion = generateVersion();
-        ScheduleState status = new ScheduleState(uniqueVersion,
-            topoSpoutSpecsMap,
-            groupSpecsMap,
-            alertSpecsMap,
-            publishSpecsMap,
-            context.getPolicyAssignments().values(),
-            context.getMonitoredStreams().values(),
-            context.getPolicies().values(),
-            context.getStreamSchemas().values());
-        return status;
-    }
-
-    private Map<String, PublishSpec> generatePublishMetadata() {
-        Map<String, PublishSpec> pubSpecs = new HashMap<String, PublishSpec>();
-        // prebuild policy to publishment map
-        Map<String, List<Publishment>> policyToPub = new HashMap<String, List<Publishment>>();
-        for (Publishment pub : context.getPublishments().values()) {
-            for (String policyId : pub.getPolicyIds()) {
-                List<Publishment> policyPubs = policyToPub.get(policyId);
-                if (policyPubs == null) {
-                    policyPubs = new ArrayList<>();
-                    policyToPub.put(policyId, policyPubs);
-                }
-                policyPubs.add(pub);
-            }
-        }
-
-        // build per topology
-        for (TopologyUsage u : context.getTopologyUsages().values()) {
-            PublishSpec pubSpec = pubSpecs.get(u.getTopoName());
-            if (pubSpec == null) {
-                pubSpec = new PublishSpec(u.getTopoName(), context.getTopologies().get(u.getTopoName()).getPubBoltId());
-                pubSpecs.put(u.getTopoName(), pubSpec);
-            }
-
-            for (String p : u.getPolicies()) {
-                PolicyDefinition definition = context.getPolicies().get(p);
-                if (definition == null) {
-                    continue;
-                }
-                if (policyToPub.containsKey(p)) {
-                    for (Publishment pub : policyToPub.get(p)) {
-                        pubSpec.addPublishment(pub);
-                    }
-                }
-            }
-        }
-        return pubSpecs;
-    }
-
-    /**
-     * FIXME: add auto-increment version number?.
-     */
-    private String generateVersion() {
-        return "spec_version_" + System.currentTimeMillis();
-    }
-
-    private Map<String, AlertBoltSpec> generateAlertMonitorMetadata() {
-        Map<String, AlertBoltSpec> alertSpecs = new HashMap<String, AlertBoltSpec>();
-        for (TopologyUsage u : context.getTopologyUsages().values()) {
-            AlertBoltSpec alertSpec = alertSpecs.get(u.getTopoName());
-            if (alertSpec == null) {
-                alertSpec = new AlertBoltSpec(u.getTopoName());
-                alertSpecs.put(u.getTopoName(), alertSpec);
-            }
-            for (AlertBoltUsage boltUsage : u.getAlertUsages().values()) {
-                for (String policyName : boltUsage.getPolicies()) {
-                    PolicyDefinition definition = context.getPolicies().get(policyName);
-                    alertSpec.addBoltPolicy(boltUsage.getBoltId(), definition.getName());
-
-                    for (Publishment publish : context.getPublishments().values()) {
-                        if (!publish.getPolicyIds().contains(definition.getName())) {
-                            continue;
-                        }
-
-                        List<String> streamIds = new ArrayList<>();
-                        // add the publish to the bolt
-                        if (publish.getStreamIds() == null || publish.getStreamIds().size() <= 0) {
-                            streamIds.add(Publishment.STREAM_NAME_DEFAULT);
-                        } else {
-                            streamIds.addAll(publish.getStreamIds());
-                        }
-                        for (String streamId : streamIds) {
-                            alertSpec.addPublishPartition(streamId, policyName, publish.getName(), publish.getPartitionColumns());
-                        }
-                    }
-                }
-            }
-        }
-        return alertSpecs;
-    }
-
-    private Map<String, RouterSpec> generateGroupbyMonitorMetadata() {
-        Map<String, RouterSpec> groupSpecsMap = new HashMap<String, RouterSpec>();
-        for (TopologyUsage u : context.getTopologyUsages().values()) {
-            RouterSpec spec = groupSpecsMap.get(u.getTopoName());
-            if (spec == null) {
-                spec = new RouterSpec(u.getTopoName());
-                groupSpecsMap.put(u.getTopoName(), spec);
-            }
-
-            for (MonitoredStream ms : u.getMonitoredStream()) {
-                // mutiple stream on the same policy group : for correlation group case:
-                for (StreamPartition partiton : ms.getStreamGroup().getStreamPartitions()) {
-                    StreamRouterSpec routeSpec = new StreamRouterSpec();
-                    routeSpec.setPartition(partiton);
-                    routeSpec.setStreamId(partiton.getStreamId());
-
-                    for (StreamWorkSlotQueue sq : ms.getQueues()) {
-                        PolicyWorkerQueue queue = new PolicyWorkerQueue();
-                        queue.setWorkers(sq.getWorkingSlots());
-                        queue.setPartition(partiton);
-                        routeSpec.addQueue(queue);
-                    }
-
-                    spec.addRouterSpec(routeSpec);
-                }
-            }
-        }
-
-        return groupSpecsMap;
-    }
-
-    private Map<String, SpoutSpec> generateSpoutMonitorMetadata() {
-        Map<String, StreamWorkSlotQueue> queueMap = buildQueueMap();
-
-        Map<String, SpoutSpec> topoSpoutSpecsMap = new HashMap<String, SpoutSpec>();
-        // streamName -> StreamDefinition
-        Map<String, StreamDefinition> streamSchemaMap = context.getStreamSchemas();
-        Map<String, Kafka2TupleMetadata> datasourcesMap = context.getDataSourceMetadata();
-        for (TopologyUsage usage : context.getTopologyUsages().values()) {
-            Topology topo = context.getTopologies().get(usage.getTopoName());
-
-            // based on data source schemas
-            // generate topic -> Kafka2TupleMetadata
-            // generate topic -> Tuple2StreamMetadata (actually the schema selector)
-            Map<String, Kafka2TupleMetadata> dss = new HashMap<String, Kafka2TupleMetadata>();
-            Map<String, Tuple2StreamMetadata> tss = new HashMap<String, Tuple2StreamMetadata>();
-            for (String dataSourceId : usage.getDataSources()) {
-                Kafka2TupleMetadata ds = datasourcesMap.get(dataSourceId);
-                dss.put(ds.getTopic(), ds);
-                tss.put(ds.getTopic(), ds.getCodec());
-            }
-
-            // generate topicId -> StreamRepartitionMetadata
-            Map<String, List<StreamRepartitionMetadata>> streamsMap = new HashMap<String, List<StreamRepartitionMetadata>>();
-            for (String policyName : usage.getPolicies()) {
-                PolicyDefinition def = context.getPolicies().get(policyName);
-
-                PolicyAssignment assignment = context.getPolicyAssignments().get(policyName);
-                if (assignment == null) {
-                    LOG.error(" can not find assignment for policy {} ! ", policyName);
-                    continue;
-                }
-
-                for (StreamPartition policyStreamPartition : def.getPartitionSpec()) {
-                    String stream = policyStreamPartition.getStreamId();
-                    StreamDefinition schema = streamSchemaMap.get(stream);
-                    String topic = datasourcesMap.get(schema.getDataSource()).getTopic();
-
-                    // add stream name to tuple metadata
-                    if (tss.containsKey(topic)) {
-                        Tuple2StreamMetadata tupleMetadata = tss.get(topic);
-                        tupleMetadata.getActiveStreamNames().add(stream);
-                    }
-
-                    // grouping strategy
-                    StreamRepartitionStrategy gs = new StreamRepartitionStrategy();
-                    gs.partition = policyStreamPartition;
-                    gs.numTotalParticipatingRouterBolts = queueMap.get(assignment.getQueueId()).getNumberOfGroupBolts();
-                    gs.startSequence = queueMap.get(assignment.getQueueId()).getTopologyGroupStartIndex(topo.getName());
-                    gs.totalTargetBoltIds = new ArrayList<String>(topo.getGroupNodeIds());
-
-                    // add to map
-                    addGroupingStrategy(streamsMap, stream, schema, topic, schema.getDataSource(), gs);
-                }
-            }
-
-            SpoutSpec spoutSpec = new SpoutSpec(topo.getName(), streamsMap, tss, dss);
-            topoSpoutSpecsMap.put(topo.getName(), spoutSpec);
-        }
-        return topoSpoutSpecsMap;
-    }
-
-    /**
-     * Work queue not a root level object, thus we need to build a map from
-     * MonitoredStream for later quick lookup.
-     */
-    private Map<String, StreamWorkSlotQueue> buildQueueMap() {
-        Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
-        for (MonitoredStream ms : context.getMonitoredStreams().values()) {
-            for (StreamWorkSlotQueue queue : ms.getQueues()) {
-                queueMap.put(queue.getQueueId(), queue);
-            }
-        }
-        return queueMap;
-    }
-
-    private void addGroupingStrategy(Map<String, List<StreamRepartitionMetadata>> streamsMap, String stream,
-                                     StreamDefinition schema, String topicName, String datasourceName, StreamRepartitionStrategy gs) {
-        List<StreamRepartitionMetadata> dsStreamMeta;
-        if (streamsMap.containsKey(topicName)) {
-            dsStreamMeta = streamsMap.get(topicName);
-        } else {
-            dsStreamMeta = new ArrayList<StreamRepartitionMetadata>();
-            streamsMap.put(topicName, dsStreamMeta);
-        }
-        StreamRepartitionMetadata targetSm = null;
-        for (StreamRepartitionMetadata sm : dsStreamMeta) {
-            if (stream.equalsIgnoreCase(sm.getStreamId())) {
-                targetSm = sm;
-                break;
-            }
-        }
-        if (targetSm == null) {
-            targetSm = new StreamRepartitionMetadata(topicName, schema.getStreamId());
-            dsStreamMeta.add(targetSm);
-        }
-        if (!targetSm.groupingStrategies.contains(gs)) {
-            targetSm.addGroupStrategy(gs);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
deleted file mode 100644
index a46537d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import java.util.List;
-
-/**
- * Schedule result for one policy.
- *
- * @since Apr 26, 2016
- */
-public class ScheduleResult {
-    int code;
-    String message;
-    String policyName;
-    StreamPartition partition;
-    int index;
-    List<PolicyAssignment> topoliciesScheduled;
-
-    public String toString() {
-        return String.format("policy: %s, result code: %d ", policyName, code, message);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
deleted file mode 100644
index baa489d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-public class WorkItem {
-    public final PolicyDefinition def;
-    public final int requestParallelism;
-
-    public WorkItem(PolicyDefinition def, int workNum) {
-        this.def = def;
-        this.requestParallelism = workNum;
-    }
-
-    public String toString() {
-        return "policy name: " + def.getName() + "(" + requestParallelism + ")";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
deleted file mode 100644
index a717b1c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
-import org.apache.eagle.alert.coordinator.impl.strategies.IWorkSlotStrategy;
-import org.apache.eagle.alert.coordinator.impl.strategies.SameTopologySlotStrategy;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @since Apr 27, 2016.
- */
-public class WorkQueueBuilder {
-
-    private static final Logger LOG = LoggerFactory.getLogger(WorkQueueBuilder.class);
-
-    private final IScheduleContext context;
-    private final TopologyMgmtService mgmtService;
-
-    public WorkQueueBuilder(IScheduleContext context, TopologyMgmtService mgmtService) {
-        this.context = context;
-        this.mgmtService = mgmtService;
-    }
-
-    public StreamWorkSlotQueue createQueue(MonitoredStream stream, boolean isDedicated, int size,
-                                           Map<String, Object> properties) {
-        // FIXME: make extensible and configurable
-        IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, stream.getStreamGroup(), mgmtService);
-        List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, properties);
-        if (slots.size() < size) {
-            LOG.error("allocate stream work queue failed, required size");
-            return null;
-        }
-        StreamWorkSlotQueue queue = new StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties,
-            slots);
-        calculateGroupIndexAndCount(queue);
-        assignQueueSlots(stream, queue);// build reverse reference
-        stream.addQueues(queue);
-
-        return queue;
-    }
-
-    private void assignQueueSlots(MonitoredStream stream, StreamWorkSlotQueue queue) {
-        for (WorkSlot slot : queue.getWorkingSlots()) {
-            TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName());
-            AlertBoltUsage boltUsage = u.getAlertBoltUsage(slot.getBoltId());
-            boltUsage.addQueue(stream.getStreamGroup(), queue);
-            u.addMonitoredStream(stream);
-        }
-    }
-
-    private void calculateGroupIndexAndCount(StreamWorkSlotQueue queue) {
-        Map<String, Integer> result = new HashMap<String, Integer>();
-        int total = 0;
-        for (WorkSlot slot : queue.getWorkingSlots()) {
-            if (result.containsKey(slot.getTopologyName())) {
-                continue;
-            }
-            result.put(slot.getTopologyName(), total);
-            total += context.getTopologies().get(slot.getTopologyName()).getNumOfGroupBolt();
-        }
-
-        queue.setNumberOfGroupBolts(total);
-        queue.setTopoGroupStartIndex(result);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
deleted file mode 100644
index 8528606..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl.strategies;
-
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @since Apr 27, 2016.
- */
-public interface IWorkSlotStrategy {
-
-    List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties);
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
deleted file mode 100644
index e401e98..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl.strategies;
-
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND;
-
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.CoordinatorConstants;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService.TopologyMeta;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * A simple strategy that only find the bolts in the same topology as the
- * required work slots.
- * Invariant:<br/>
- * One slot queue only on the one topology.<br/>
- * One topology doesn't contains two same partition slot queues.
- * @since Apr 27, 2016
- */
-public class SameTopologySlotStrategy implements IWorkSlotStrategy {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SameTopologySlotStrategy.class);
-
-    private final IScheduleContext context;
-    private final StreamGroup partitionGroup;
-    private final TopologyMgmtService mgmtService;
-
-    private final int numOfPoliciesBoundPerBolt;
-    private final double topoLoadUpbound;
-    private final boolean reuseBoltInStreams;
-    private final int streamsPerBolt;
-
-    public SameTopologySlotStrategy(IScheduleContext context, StreamGroup streamPartitionGroup,
-                                    TopologyMgmtService mgmtService) {
-        this.context = context;
-        this.partitionGroup = streamPartitionGroup;
-        this.mgmtService = mgmtService;
-
-        Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR);
-        numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
-        topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND);
-        if (config.hasPath(CoordinatorConstants.REUSE_BOLT_IN_STREAMS)) {
-            reuseBoltInStreams = config.getBoolean(CoordinatorConstants.REUSE_BOLT_IN_STREAMS);
-        } else {
-            reuseBoltInStreams = false;
-        }
-        if (config.hasPath(CoordinatorConstants.STREAMS_PER_BOLT)) {
-            streamsPerBolt = config.getInt(CoordinatorConstants.STREAMS_PER_BOLT);
-        } else {
-            streamsPerBolt = 1;
-        }
-    }
-
-    /**
-     * @param isDedicated - not used yet!.
-     */
-    @Override
-    public List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties) {
-        Iterator<Topology> it = context.getTopologies().values().stream().filter((t) -> t.getNumOfAlertBolt() >= size)
-            .iterator();
-        // priority strategy first???
-        List<WorkSlot> slots = new ArrayList<WorkSlot>();
-        while (it.hasNext()) {
-            Topology t = it.next();
-            if (getQueueOnTopology(size, slots, t)) {
-                break;
-            }
-        }
-
-        if (slots.size() == 0) {
-            int supportedSize = mgmtService.getNumberOfAlertBoltsInTopology();
-            if (size > supportedSize) {
-                LOG.error("can not find available slots for queue, required size {}, supported size {} !", size, supportedSize);
-                return Collections.emptyList();
-            }
-            TopologyMeta topoMeta = mgmtService.creatTopology();
-            if (topoMeta == null) {
-                LOG.error("can not create topology for given queue requirement, required size {}, requried partition group {} !", size, partitionGroup);
-                return Collections.emptyList();
-            }
-
-            context.getTopologies().put(topoMeta.topologyId, topoMeta.topology);
-            context.getTopologyUsages().put(topoMeta.topologyId, topoMeta.usage);
-            boolean placed = getQueueOnTopology(size, slots, topoMeta.topology);
-            if (!placed) {
-                LOG.error("can not find available slots from new created topology, required size {}. This indicates an error !", size);
-            }
-        }
-        return slots;
-    }
-
-    private boolean getQueueOnTopology(int size, List<WorkSlot> slots, Topology t) {
-        TopologyUsage u = context.getTopologyUsages().get(t.getName());
-        if (!isTopologyAvailable(u)) {
-            return false;
-        }
-
-        List<String> bolts = new ArrayList<String>();
-        for (AlertBoltUsage alertUsage : u.getAlertUsages().values()) {
-            if (isBoltAvailable(alertUsage)) {
-                bolts.add(alertUsage.getBoltId());
-            }
-
-            if (bolts.size() == size) {
-                break;
-            }
-        }
-
-        if (bolts.size() == size) {
-            for (String boltId : bolts) {
-                WorkSlot slot = new WorkSlot(t.getName(), boltId);
-                slots.add(slot);
-            }
-            return true;
-        }
-        return false;
-    }
-
-    private boolean isTopologyAvailable(TopologyUsage u) {
-        //        for (MonitoredStream stream : u.getMonitoredStream()) {
-        //            if (partition.equals(stream.getStreamParitition())) {
-        //                return false;
-        //            }
-        //        }
-        if (u == null || u.getLoad() > topoLoadUpbound) {
-            return false;
-        }
-
-        return true;
-    }
-
-    private boolean isBoltAvailable(AlertBoltUsage alertUsage) {
-        // FIXME : more detail to compare on queue exclusion check
-        if (alertUsage.getPartitions().stream().filter(partition -> partition.isDedicated()).count() > 0) {
-            return false;
-        }
-        if (!reuseBoltInStreams && alertUsage.getQueueSize() > 0) {
-            return false;
-        }
-        if (reuseBoltInStreams) {
-            if (alertUsage.getQueueSize() >= streamsPerBolt) {
-                return false;
-            }
-            if (alertUsage.getPartitions().contains(partitionGroup)) {
-                return false;
-            }
-        }
-        return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
deleted file mode 100644
index 36c0bce..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.model;
-
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @since Mar 28, 2016.
- */
-public class AlertBoltUsage {
-
-    private String boltId;
-    private List<String> policies = new ArrayList<String>();
-    // the stream partitions group that scheduled for this given alert bolt
-    private List<StreamGroup> partitions = new ArrayList<StreamGroup>();
-    // the slot queue that scheduled for this given aler bolt
-    private List<StreamWorkSlotQueue> referQueues = new ArrayList<StreamWorkSlotQueue>();
-    private double load;
-
-    public AlertBoltUsage(String anid) {
-        this.boltId = anid;
-    }
-
-    public String getBoltId() {
-        return boltId;
-    }
-
-    public void setBoltId(String boltId) {
-        this.boltId = boltId;
-    }
-
-    public List<String> getPolicies() {
-        return policies;
-    }
-
-    public void addPolicies(PolicyDefinition pd) {
-        policies.add(pd.getName());
-        // add first partition
-        //        for (StreamPartition par : pd.getPartitionSpec()) {
-        //            partitions.add(par);
-        //        }
-    }
-
-    public double getLoad() {
-        return load;
-    }
-
-    public void setLoad(double load) {
-        this.load = load;
-    }
-
-    public List<StreamGroup> getPartitions() {
-        return partitions;
-    }
-
-    public List<StreamWorkSlotQueue> getReferQueues() {
-        return referQueues;
-    }
-
-    public int getQueueSize() {
-        return referQueues.size();
-    }
-
-    public void addQueue(StreamGroup streamPartition, StreamWorkSlotQueue queue) {
-        this.referQueues.add(queue);
-        this.partitions.add(streamPartition);
-    }
-
-    public void removeQueue(StreamWorkSlotQueue queue) {
-        this.referQueues.remove(queue);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
deleted file mode 100644
index 39788d5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.model;
-
-/**
- * @since Mar 28, 2016.
- */
-public class GroupBoltUsage {
-
-    private String boltId;
-    private double load;
-
-    public GroupBoltUsage(String boltId) {
-        this.boltId = boltId;
-    }
-
-    //    private final Set<String> streams = new HashSet<String>();
-    //    private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>();
-
-    //    private final Map<String, List<StreamPartition>> groupByMeta;
-
-    public double getLoad() {
-        return load;
-    }
-
-    public void setLoad(double load) {
-        this.load = load;
-    }
-
-    //    public Set<String> getStreams() {
-    //        return streams;
-    //    }
-    //
-    //
-    //    public Map<String, StreamFilter> getFilters() {
-    //        return filters;
-    //    }
-
-    //    public Map<String, List<StreamPartition>> getGroupByMeta() {
-    //        return groupByMeta;
-    //    }
-
-    public String getBoltId() {
-        return boltId;
-    }
-
-    public void setBoltId(String boltId) {
-        this.boltId = boltId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
deleted file mode 100644
index 3cfc505..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.model;
-
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * @since Mar 27, 2016.
- */
-public class TopologyUsage {
-    // topo info
-    private String topoName;
-    private final Set<String> datasources = new HashSet<String>();
-    // usage information
-    private final Set<String> policies = new HashSet<String>();
-    private final Map<String, AlertBoltUsage> alertUsages = new HashMap<String, AlertBoltUsage>();
-    private final Map<String, GroupBoltUsage> groupUsages = new HashMap<String, GroupBoltUsage>();
-    private final List<MonitoredStream> monitoredStream = new ArrayList<MonitoredStream>();
-
-    private double load;
-
-    /**
-     * This is to be the existing/previous meta-data. <br/>
-     * Only one group meta-data for all of the group bolts in this topology.
-     */
-
-    public TopologyUsage() {
-    }
-
-    public TopologyUsage(String name) {
-        this.topoName = name;
-    }
-
-    public String getTopoName() {
-        return topoName;
-    }
-
-    public void setTopoName(String topoId) {
-        this.topoName = topoId;
-    }
-
-    public Set<String> getDataSources() {
-        return datasources;
-    }
-
-    public Set<String> getPolicies() {
-        return policies;
-    }
-
-    public Map<String, AlertBoltUsage> getAlertUsages() {
-        return alertUsages;
-    }
-
-    public AlertBoltUsage getAlertBoltUsage(String boltId) {
-        return alertUsages.get(boltId);
-    }
-
-    public Map<String, GroupBoltUsage> getGroupUsages() {
-        return groupUsages;
-    }
-
-    public List<MonitoredStream> getMonitoredStream() {
-        return monitoredStream;
-    }
-
-    public void addMonitoredStream(MonitoredStream par) {
-        if (!this.monitoredStream.contains(par)) {
-            this.monitoredStream.add(par);
-        }
-    }
-
-    public double getLoad() {
-        return load;
-    }
-
-    public void setLoad(double load) {
-        this.load = load;
-    }
-
-}


Mime
View raw message