eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [31/84] [partial] eagle git commit: Clean repo for eagle site
Date Mon, 03 Apr 2017 11:54:39 GMT
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
deleted file mode 100644
index e7efbd7..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.alert.coordinator.mock.InMemMetadataServiceClient;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.Definition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since May 5, 2016
- */
-public class ScheduleContextBuilderTest {
-
-    Config config = ConfigFactory.load().getConfig("coordinator");
-
-    @Test
-    public void test() {
-        InMemMetadataServiceClient client = getSampleMetadataService();
-
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
-
-        IScheduleContext context = builder.buildContext();
-
-        // assert topology usage
-        Map<String, TopologyUsage> usages = context.getTopologyUsages();
-        Assert.assertEquals(1, usages.get(TOPO1).getMonitoredStream().size());
-        Assert.assertTrue(usages.get(TOPO1).getPolicies().contains(TEST_POLICY_1));
-
-        String alertBolt0 = TOPO1 + "-alert-" + "0";
-        String alertBolt1 = TOPO1 + "-alert-" + "1";
-        String alertBolt2 = TOPO1 + "-alert-" + "2";
-        for (AlertBoltUsage u : usages.get(TOPO1).getAlertUsages().values()) {
-            if (u.getBoltId().equals(alertBolt0) || u.getBoltId().equals(alertBolt1)
-                || u.getBoltId().equals(alertBolt2)) {
-                Assert.assertEquals(1, u.getPolicies().size());
-                Assert.assertTrue(u.getPolicies().contains(TEST_POLICY_1));
-                Assert.assertEquals(1, u.getPartitions().size());
-                Assert.assertEquals(1, u.getReferQueues().size());
-            }
-        }
-    }
-
-    @Test
-    public void test_remove_policy() {
-        InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
-
-        PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
-
-        IScheduleContext context = builder.buildContext();
-        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-        StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
-
-        client.removePolicy(0);
-        context = builder.buildContext();
-        Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-
-        WorkSlot slot = queue.getWorkingSlots().get(0);
-        Set<String> topoPolicies = context.getTopologyUsages().get(slot.topologyName).getPolicies();
-        Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1));
-        Assert.assertEquals(0, topoPolicies.size());
-    }
-
-    @Test
-    public void test_changed_policy_partition() {
-        InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
-        PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
-
-        IScheduleContext context = builder.buildContext();
-        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-
-        StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
-
-        PolicyDefinition pd1 = client.listPolicies().get(0);
-        // add a new group by column : need to replace the partiton spec, to
-        // avoid reference same object in
-        // on jvm (no serialization and deserialization)
-        StreamPartition par = new StreamPartition(pd1.getPartitionSpec().get(0));
-        par.getColumns().add("s1");
-        pd1.getPartitionSpec().clear();
-        pd1.getPartitionSpec().add(par);
-
-        context = builder.buildContext();
-
-        // assert the policy assignment is removed
-        Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-        // assert the monitored stream is removed as no policy on it now.
-        Assert.assertEquals(0, context.getMonitoredStreams().size());
-        // assert the topology usage doesn't contain policy
-        WorkSlot slot = queue.getWorkingSlots().get(0);
-        TopologyUsage topologyUsage = context.getTopologyUsages().get(slot.topologyName);
-        Set<String> topoPolicies = topologyUsage.getPolicies();
-        Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1));
-        Assert.assertEquals(0, topoPolicies.size());
-        // assert the topology usage doesn't contain the monitored stream
-        Assert.assertEquals(0, topologyUsage.getMonitoredStream().size());
-        // assert the alert bolt usage doesn't have the queue reference
-        Assert.assertEquals(0, topologyUsage.getAlertBoltUsage(slot.getBoltId()).getReferQueues().size());
-    }
-
-    @Test
-    public void test_changed_policy_parallelism() {
-        InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
-        PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
-
-        IScheduleContext context = builder.buildContext();
-        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-
-        StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
-
-        PolicyDefinition pd1 = client.listPolicies().get(0);
-        pd1.setParallelismHint(4); // default queue is 5 , change to smaller, same like change bigger
-
-        context = builder.buildContext();
-        Assert.assertFalse(context.getPolicyAssignments().values().iterator().hasNext());
-        //PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next();
-        //StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight();
-        //Assert.assertNotNull(queueNew);
-        // just to make sure queueNew is present
-        //Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId());
-
-        // default queue is 5 , change to bigger 6, policy assignment removed
-        pd1.setParallelismHint(queue.getQueueSize() + 1);
-        context = builder.buildContext();
-
-        Assert.assertFalse(context.getPolicyAssignments().values().iterator().hasNext());
-    }
-
-    @Test
-    public void test_changed_policy_definition() {
-        InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
-        PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
-
-        IScheduleContext context = builder.buildContext();
-        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-
-        StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
-
-        PolicyDefinition pd1 = client.listPolicies().get(0);
-        pd1.getDefinition().value = "define.. new...";
-
-        context = builder.buildContext();
-        PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next();
-        StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight();
-        Assert.assertNotNull(queueNew);
-        // just to make sure queueNew is present
-        Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId());
-    }
-
-    @Test
-    public void test_stream_noalert_policies_generation() throws Exception {
-        InMemMetadataServiceClient client = getSampleMetadataServiceWithNodataAlert();
-
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
-        IScheduleContext context = builder.buildContext();
-
-        PolicyDefinition policyDefinition = null;
-        PolicyDefinition aggrPolicyDefinition = null;
-        for (Entry<String, PolicyDefinition> entry : context.getPolicies().entrySet()) {
-            if (entry.getKey().endsWith("_nodata_alert")) {
-                policyDefinition = entry.getValue();
-                continue;
-            }
-            if (entry.getKey().endsWith("_aggregation_stream_policy")) {
-                aggrPolicyDefinition = entry.getValue();
-                continue;
-            }
-        }
-        Assert.assertEquals(3, context.getPolicies().size());
-
-        Assert.assertNotNull(policyDefinition);
-        Assert.assertEquals("nodataalert", policyDefinition.getDefinition().getType());
-        Assert.assertEquals("PT5S,dynamic,1," + COL1, policyDefinition.getDefinition().getValue());
-
-        Assert.assertNotNull(aggrPolicyDefinition);
-        Assert.assertEquals("siddhi", aggrPolicyDefinition.getDefinition().getType());
-
-        Kafka2TupleMetadata datasource = null;
-        for (Entry<String, Kafka2TupleMetadata> entry : context.getDataSourceMetadata().entrySet()) {
-            if ("nodata_alert_aggregation_ds".equals(entry.getKey())) {
-                datasource = entry.getValue();
-                break;
-            }
-        }
-        Assert.assertNotNull(datasource);
-
-        String publishmentName = policyDefinition.getName() + "_publish";
-        Publishment publishment = null;
-        for (Entry<String, Publishment> entry : context.getPublishments().entrySet()) {
-            if (publishmentName.equals(entry.getKey())) {
-                publishment = entry.getValue();
-                break;
-            }
-        }
-        Assert.assertNotNull(publishment);
-    }
-
-    @Test
-    public void test_renamed_topologies() {
-        InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
-
-        IScheduleContext context = builder.buildContext();
-        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-
-        Topology t = client.listTopologies().get(0);
-        t.setName("newName");
-
-        context = builder.buildContext();
-        Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-    }
-
-    private static final String TOPO1 = "topo1";
-    private static final String V1 = "v1";
-    private static final String COL1 = "col1";
-    private static final String OUT_STREAM1 = "out-stream1";
-    private static final String TEST_POLICY_1 = "test-policy-1";
-    private static final String TEST_STREAM_DEF_1 = "testStreamDef";
-    private static final String TEST_DATASOURCE_1 = "test-datasource-1";
-    private static StreamPartition par;
-    private static String queueId;
-    private static StreamGroup streamGroup;
-
-    public static InMemMetadataServiceClient getSampleMetadataService() {
-        InMemMetadataServiceClient client = new InMemMetadataServiceClient();
-        client.addTopology(createSampleTopology());
-        client.addDataSource(createKafka2TupleMetadata());
-        client.addPolicy(createPolicy());
-        client.addPublishment(createPublishment());
-        client.addStreamDefinition(createStreamDefinition());
-        client.addScheduleState(createScheduleState());
-        return client;
-    }
-
-    public static InMemMetadataServiceClient getSampleMetadataServiceWithNodataAlert() {
-        InMemMetadataServiceClient client = new InMemMetadataServiceClient();
-        client.addTopology(createSampleTopology());
-        client.addDataSource(createKafka2TupleMetadata());
-        client.addPolicy(createPolicy());
-        client.addPublishment(createPublishment());
-        client.addStreamDefinition(createStreamDefinitionWithNodataAlert());
-        client.addScheduleState(createScheduleState());
-        return client;
-    }
-
-    private static StreamDefinition createStreamDefinitionWithNodataAlert() {
-        StreamDefinition def = new StreamDefinition();
-        def.setStreamId(TEST_STREAM_DEF_1);
-        def.setDataSource(TEST_DATASOURCE_1);
-
-        StreamColumn col = new StreamColumn();
-        col.setName(COL1);
-        col.setRequired(true);
-        col.setType(Type.STRING);
-        col.setNodataExpression("PT5S,dynamic,1," + COL1);
-        def.getColumns().add(col);
-
-        return def;
-    }
-
-
-    private static ScheduleState createScheduleState() {
-        ScheduleState ss = new ScheduleState();
-        ss.setVersion(V1);
-
-        ss.getMonitoredStreams().add(createMonitoredStream());
-        ss.getAssignments().add(createAssignment());
-
-        return ss;
-    }
-
-    private static MonitoredStream createMonitoredStream() {
-        MonitoredStream ms = new MonitoredStream(streamGroup);
-        ms.setVersion(V1);
-
-        List<WorkSlot> slots = new ArrayList<WorkSlot>();
-        WorkSlot slot0 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 0);
-        WorkSlot slot1 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 1);
-        WorkSlot slot2 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 2);
-        WorkSlot slot3 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 3);
-        WorkSlot slot4 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 4);
-        WorkSlot slot5 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 5);
-        slots.add(slot0);
-        slots.add(slot1);
-        slots.add(slot2);
-        slots.add(slot3);
-        slots.add(slot4);
-        //slots.add(slot5);
-
-        StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), slots);
-        ms.addQueues(q);
-        queueId = q.getQueueId();
-        return ms;
-    }
-
-    private static PolicyAssignment createAssignment() {
-        PolicyAssignment pa = new PolicyAssignment(TEST_POLICY_1, queueId);
-        return pa;
-    }
-
-    private static PolicyDefinition createPolicy() {
-        PolicyDefinition def = new PolicyDefinition();
-        def.setName(TEST_POLICY_1);
-        def.setInputStreams(Arrays.asList(TEST_STREAM_DEF_1));
-        def.setOutputStreams(Arrays.asList(OUT_STREAM1));
-        def.setParallelismHint(5);
-        def.setDefinition(new Definition());
-
-        streamGroup = new StreamGroup();
-        par = new StreamPartition();
-        par.setStreamId(TEST_STREAM_DEF_1);
-        par.getColumns().add(COL1);
-        StreamSortSpec sortSpec = new StreamSortSpec();
-//        sortSpec.setColumn("col1");
-        sortSpec.setWindowMargin(3);
-        sortSpec.setWindowPeriod("PT1M");
-
-        par.setSortSpec(sortSpec);
-        streamGroup.addStreamPartition(par);
-
-        List<StreamPartition> lists = new ArrayList<StreamPartition>();
-        lists.add(par);
-        def.setPartitionSpec(lists);
-        return def;
-    }
-
-    private static StreamDefinition createStreamDefinition() {
-        StreamDefinition def = new StreamDefinition();
-        def.setStreamId(TEST_STREAM_DEF_1);
-        def.setDataSource(TEST_DATASOURCE_1);
-
-        StreamColumn col = new StreamColumn();
-        col.setName(COL1);
-        col.setRequired(true);
-        col.setType(Type.STRING);
-        def.getColumns().add(col);
-
-        return def;
-    }
-
-    private static Publishment createPublishment() {
-        Publishment pub = new Publishment();
-        pub.setType("KAFKA");
-        pub.setName("test-stream-output");
-        pub.setPolicyIds(Arrays.asList(TEST_POLICY_1));
-        return pub;
-    }
-
-    private static Kafka2TupleMetadata createKafka2TupleMetadata() {
-        Kafka2TupleMetadata ktm = new Kafka2TupleMetadata();
-        ktm.setName(TEST_DATASOURCE_1);
-        ktm.setSchemeCls("SchemeClass");
-        ktm.setTopic("tupleTopic");
-        ktm.setType("KAFKA");
-        ktm.setCodec(new Tuple2StreamMetadata());
-        return ktm;
-    }
-
-    private static Topology createSampleTopology() {
-        Topology t = new Topology(TOPO1, 3, 10);
-        for (int i = 0; i < t.getNumOfGroupBolt(); i++) {
-            t.getGroupNodeIds().add(t.getName() + "-grp-" + i);
-        }
-        for (int i = 0; i < t.getNumOfAlertBolt(); i++) {
-            t.getAlertBoltIds().add(t.getName() + "-alert-" + i);
-        }
-        return t;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
deleted file mode 100644
index 1bfdd7b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
+++ /dev/null
@@ -1,724 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.alert.coordinator.mock.TestTopologyMgmtService;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.eagle.alert.coordination.model.*;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.ScheduleOption;
-import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.GroupBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.Definition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.*;
-
-/**
- * @since Apr 22, 2016
- */
-public class SchedulerTest {
-
-    private static final String TOPO2 = "topo2";
-    private static final String TOPO1 = "topo1";
-    private static final int PARALELLISM = 5;
-    private static final String STREAM2 = "stream2";
-    private static final String JOIN_POLICY_1 = "join-policy-1";
-    private static final String TEST_TOPIC = "test-topic";
-    private static final String TEST_POLICY_1 = "test-policy1";
-    private static final String TEST_POLICY_2 = "test-policy2";
-    private static final String TEST_POLICY_3 = "test-policy3";
-    private static final String STREAM1 = "stream1";
-    private static final String DS_NAME = "ds1";
-    private static ObjectMapper mapper = new ObjectMapper();
-    private static final Logger LOG = LoggerFactory.getLogger(SchedulerTest.class);
-
-    @BeforeClass
-    public static void setup() {
-        ConfigFactory.invalidateCaches();
-        System.setProperty("config.resource", "/application.conf");
-    }
-
-    @Test
-    public void test01_simple() throws Exception {
-        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
-        TestTopologyMgmtService mgmtService = createMgmtService();
-        IScheduleContext context = createScheduleContext(mgmtService);
-        ps.init(context, mgmtService);
-        ps.schedule(new ScheduleOption());
-
-        ScheduleState status = ps.getState();
-        context = ps.getContext(); // context updated!
-        Map<String, SpoutSpec> spec = status.getSpoutSpecs();
-
-        LOG.info(mapper.writeValueAsString(spec));
-        Assert.assertEquals(2, spec.size());
-        Assert.assertTrue(spec.containsKey(TOPO1));
-        assertFirstPolicyScheduled(context, status);
-    }
-
-    private void assertFirstPolicyScheduled(IScheduleContext context, ScheduleState status) {
-        String version = status.getVersion();
-        // assert spout spec
-        {
-            Iterator<SpoutSpec> it = status.getSpoutSpecs().values().iterator();
-            {
-                // assert spout 1
-                SpoutSpec ss = it.next();
-                Assert.assertEquals(version, ss.getVersion());
-                Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size());
-                Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next());
-
-                Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size());
-                List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next();
-                Assert.assertEquals(1, metas.size());
-
-                StreamRepartitionMetadata streamMeta = metas.iterator().next();
-                Assert.assertEquals(STREAM1, streamMeta.getStreamId());
-                Assert.assertEquals(TEST_TOPIC, streamMeta.getTopicName());
-                Assert.assertEquals(1, streamMeta.groupingStrategies.size());
-
-                StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
-                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts);
-                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size());
-                Assert.assertEquals(0, gs.startSequence);
-
-                Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-            }
-            {
-                // assert spout 2
-                SpoutSpec ss = it.next();
-                Assert.assertEquals(version, ss.getVersion());
-                Assert.assertEquals(0, ss.getKafka2TupleMetadataMap().size());
-            }
-        }
-        // assert grp-by spec
-        {
-            Iterator<RouterSpec> gsit = status.getGroupSpecs().values().iterator();
-            {
-                Assert.assertEquals(2, status.getGroupSpecs().values().size());
-                Assert.assertTrue(gsit.hasNext());
-                RouterSpec gspec = gsit.next();
-                Assert.assertEquals(version, gspec.getVersion());
-                String topo1 = gspec.getTopologyName();
-                LOG.info("group spec topology name:", topo1);
-                List<StreamRouterSpec> routeSpecs = gspec.getRouterSpecs();
-                Assert.assertEquals(1, routeSpecs.size());
-                for (StreamRouterSpec spec : routeSpecs) {
-                    StreamPartition par = spec.getPartition();
-                    Assert.assertEquals(STREAM1, par.getStreamId());
-                    Assert.assertEquals(Arrays.asList("col1"), par.getColumns());
-                    Assert.assertEquals(STREAM1, spec.getStreamId());
-
-                    Assert.assertEquals(1, spec.getTargetQueue().size());
-                    List<PolicyWorkerQueue> queues = spec.getTargetQueue();
-                    Assert.assertEquals(1, queues.size());
-                    Assert.assertEquals(5, queues.get(0).getWorkers().size());
-                    for (WorkSlot slot : queues.get(0).getWorkers()) {
-                        Assert.assertEquals(topo1, slot.getTopologyName());
-                        LOG.info(slot.getBoltId());
-                    }
-                }
-            }
-            // grp-spec2
-            {
-                RouterSpec gs2 = gsit.next();
-                Assert.assertEquals(version, gs2.getVersion());
-                List<StreamRouterSpec> routeSpecs = gs2.getRouterSpecs();
-                Assert.assertEquals(0, routeSpecs.size());
-            }
-        }
-        // alert spec
-        {
-            Assert.assertEquals(2, status.getAlertSpecs().values().size());
-            Iterator<AlertBoltSpec> asit = status.getAlertSpecs().values().iterator();
-            // topo1
-            {
-                AlertBoltSpec alertSpec = asit.next();
-                Assert.assertEquals(version, alertSpec.getVersion());
-                String topo1 = alertSpec.getTopologyName();
-                LOG.info("alert spec topology name {}", topo1);
-                for (List<String> definitions : alertSpec.getBoltPolicyIdsMap().values()) {
-                    Assert.assertEquals(1, definitions.size());
-                    Assert.assertEquals(TEST_POLICY_1, definitions.get(0));
-                }
-            }
-            // topo2
-            {
-                AlertBoltSpec alertSpec = asit.next();
-                Assert.assertEquals(version, alertSpec.getVersion());
-                String topo1 = alertSpec.getTopologyName();
-                LOG.info("alert spec topology name {}", topo1);
-                Assert.assertEquals(0, alertSpec.getBoltPolicyIdsMap().size());
-            }
-        }
-    }
-
-    private TestTopologyMgmtService createMgmtService() {
-        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService();
-        return mgmtService;
-    }
-
-    private InMemScheduleConext createScheduleContext(TestTopologyMgmtService mgmtService) {
-        InMemScheduleConext context = new InMemScheduleConext();
-        // topo
-        Pair<Topology, TopologyUsage> pair1 = mgmtService.createEmptyTopology(TOPO1);
-        Pair<Topology, TopologyUsage> pair2 = mgmtService.createEmptyTopology(TOPO2);
-        context.addTopology(pair1.getLeft());
-        context.addTopologyUsages(pair1.getRight());
-        context.addTopology(pair2.getLeft());
-        context.addTopologyUsages(pair2.getRight());
-
-        // policy
-        createSamplePolicy(context, TEST_POLICY_1, STREAM1, PARALELLISM);
-
-        // data source
-        Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
-        ds.setName(DS_NAME);
-        ds.setTopic(TEST_TOPIC);
-        ds.setCodec(new Tuple2StreamMetadata());
-        context.addDataSource(ds);
-
-        // schema
-        {
-            StreamDefinition schema = new StreamDefinition();
-            {
-                StreamColumn c = new StreamColumn();
-                c.setName("col1");
-                c.setType(Type.STRING);
-                c.setDefaultValue("dflt");
-                schema.getColumns().add(c);
-            }
-            {
-                StreamColumn c = new StreamColumn();
-                c.setName("col2");
-                c.setType(Type.DOUBLE);
-                c.setDefaultValue("0.0");
-                schema.getColumns().add(c);
-            }
-            schema.setStreamId(STREAM1);
-            schema.setValidate(false);
-            schema.setDataSource(DS_NAME);
-            context.addSchema(schema);
-        }
-        {
-            StreamDefinition schema = new StreamDefinition();
-            {
-                StreamColumn c = new StreamColumn();
-                c.setName("col1");
-                c.setType(Type.STRING);
-                c.setDefaultValue("dflt");
-                schema.getColumns().add(c);
-            }
-            schema.setStreamId(STREAM2);
-            schema.setValidate(false);
-            schema.setDataSource(DS_NAME);
-            context.addSchema(schema);
-        }
-
-        return context;
-    }
-
-    /**
-     * Add policy after add policy
-     */
-    @Test
-    public void test_schedule_add2() {
-        TestTopologyMgmtService mgmtService = createMgmtService();
-        IScheduleContext context = createScheduleContext(mgmtService);
-        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
-        ps.init(context, mgmtService);
-
-        ScheduleOption option = new ScheduleOption();
-        ps.schedule(option);
-        ScheduleState status = ps.getState();
-        context = ps.getContext(); // context updated!
-        assertFirstPolicyScheduled(context, status);
-
-        createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_2, STREAM1, PARALELLISM);
-
-        ps.init(context, mgmtService); // reinit
-        ps.schedule(option);
-        status = ps.getState();
-        context = ps.getContext(); // context updated!
-        // now assert two policy on the same queue
-        assertSecondPolicyCreated(context, status);
-
-        // add one policy on different stream of the same topic
-        createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_3, STREAM2, PARALELLISM);
-
-        ps.init(context, mgmtService); // re-init
-        ps.schedule(option);
-        status = ps.getState();
-        context = ps.getContext(); // context updated!
-        assertThridPolicyScheduled(context, status);
-    }
-
-    private void assertThridPolicyScheduled(IScheduleContext context, ScheduleState status) {
-        {
-            // now assert two policy on the same queue
-            Assert.assertEquals(2, status.getSpoutSpecs().values().size());
-            Iterator<SpoutSpec> it = status.getSpoutSpecs().values().iterator();
-            {
-                // assert spout 1
-                SpoutSpec ss = it.next();
-                Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size());
-                Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next());
-
-                Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size());
-                List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next();
-                Assert.assertEquals(1, metas.size());
-
-                StreamRepartitionMetadata streamMeta = metas.iterator().next();
-                Assert.assertEquals(STREAM1, streamMeta.getStreamId());
-                Assert.assertEquals(TEST_TOPIC, streamMeta.getTopicName());
-                Assert.assertEquals(1, streamMeta.groupingStrategies.size());
-
-                StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
-                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts);
-                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size());
-                Assert.assertEquals(0, gs.startSequence);
-
-                PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1);
-                PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_2);
-                Assert.assertNotNull(pa1);
-                Assert.assertNotNull(pa2);
-                Assert.assertEquals(pa1.getQueueId(), pa2.getQueueId());
-            }
-            {
-                // assert spout 2
-                SpoutSpec ss = it.next();
-                Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size());
-
-                Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next());
-
-                Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size());
-                List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next();
-                Assert.assertEquals(1, metas.size());
-
-                StreamRepartitionMetadata streamMeta = metas.iterator().next();
-                Assert.assertEquals(STREAM2, streamMeta.getStreamId());
-                Assert.assertEquals(TEST_TOPIC, streamMeta.getTopicName());
-                Assert.assertEquals(1, streamMeta.groupingStrategies.size());
-
-                StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
-                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts);
-                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size());
-                Assert.assertEquals(0, gs.startSequence);
-
-                // assert policy assignment for the three policies
-                PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1);
-                PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_2);
-                PolicyAssignment pa3 = context.getPolicyAssignments().get(TEST_POLICY_3);
-                Assert.assertNotNull(pa1);
-                Assert.assertNotNull(pa2);
-                Assert.assertNotNull(pa3);
-                Assert.assertEquals(pa1.getQueueId(), pa2.getQueueId());
-                Assert.assertNotEquals(pa1.getQueueId(), pa3.getQueueId());
-                StreamWorkSlotQueue queue1 = getQueue(context, pa1.getQueueId()).getRight();
-                StreamWorkSlotQueue queue3 = getQueue(context, pa3.getQueueId()).getRight();
-                Assert.assertNotEquals(queue1.getWorkingSlots().get(0).getTopologyName(),
-                    queue3.getWorkingSlots().get(0).getTopologyName());
-            }
-        }
-        // group spec
-        {
-            Iterator<RouterSpec> gsit = status.getGroupSpecs().values().iterator();
-            Assert.assertEquals(2, status.getGroupSpecs().values().size());
-            {
-                // first topology's grp - spec
-                gsit.next();
-                // should be same with second policy scheduled, not assert here
-            }
-            {
-                // second topology's grp - spec
-                RouterSpec spec = gsit.next();
-                Assert.assertEquals(1, spec.getRouterSpecs().size());
-                StreamRouterSpec routeSpec = spec.getRouterSpecs().get(0);
-                Assert.assertEquals(STREAM2, routeSpec.getStreamId());
-                Assert.assertEquals(Arrays.asList("col1"), routeSpec.getPartition().getColumns());
-            }
-        }
-        // alert spec
-        {
-            Assert.assertEquals(2, status.getAlertSpecs().values().size());
-            Iterator<AlertBoltSpec> asit = status.getAlertSpecs().values().iterator();
-            {
-                // same to the two policy case, not assert here
-                asit.next();
-            }
-            {
-                // seconds topology's alert spec
-                AlertBoltSpec as = asit.next();
-                Assert.assertEquals(5, as.getBoltPolicyIdsMap().size());
-                for (List<String> pdList : as.getBoltPolicyIdsMap().values()) {
-                    Assert.assertEquals(1, pdList.size());
-                    Assert.assertEquals(TEST_POLICY_3, pdList.get(0));
-                }
-            }
-        }
-    }
-
-    private void assertSecondPolicyCreated(IScheduleContext context, ScheduleState status) {
-        String version = status.getVersion();
-        {
-            // spout : assert two policy on the same topology (same worker
-            // queue)
-            Iterator<SpoutSpec> it = status.getSpoutSpecs().values().iterator();
-            {
-                // assert spout 1 has two policy
-                SpoutSpec ss = it.next();
-                Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size());
-                Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next());
-
-                Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size());
-                List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next();
-                Assert.assertEquals(1, metas.size());
-
-                StreamRepartitionMetadata streamMeta = metas.iterator().next();
-                Assert.assertEquals(STREAM1, streamMeta.getStreamId());
-                Assert.assertEquals(TEST_TOPIC, streamMeta.getTopicName());
-                Assert.assertEquals(1, streamMeta.groupingStrategies.size());
-
-                StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
-                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts);
-                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size());
-                Assert.assertEquals(0, gs.startSequence);
-
-                // assert two policy on the same queue
-                PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1);
-                PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_2);
-                Assert.assertNotNull(pa1);
-                Assert.assertNotNull(pa2);
-                Assert.assertEquals(pa1.getQueueId(), pa2.getQueueId());
-                StreamWorkSlotQueue queue = getQueue(context, pa1.getQueueId()).getRight();
-                Assert.assertNotNull(queue);
-            }
-            {
-                // assert spout 2 is still empty
-                SpoutSpec ss = it.next();
-                Assert.assertEquals(0, ss.getKafka2TupleMetadataMap().size());
-            }
-        }
-
-        // assert grp-by spec. This is nothing different compare to first policy
-        {
-            Iterator<RouterSpec> gsit = status.getGroupSpecs().values().iterator();
-            {
-                Assert.assertEquals(2, status.getGroupSpecs().values().size());
-                Assert.assertTrue(gsit.hasNext());
-                RouterSpec gspec = gsit.next();
-                Assert.assertEquals(version, gspec.getVersion());
-                String topo1 = gspec.getTopologyName();
-                LOG.info("group spec topology name:", topo1);
-                List<StreamRouterSpec> routeSpecs = gspec.getRouterSpecs();
-                Assert.assertEquals(1, routeSpecs.size());
-                for (StreamRouterSpec spec : routeSpecs) {
-                    StreamPartition par = spec.getPartition();
-                    Assert.assertEquals(STREAM1, par.getStreamId());
-                    Assert.assertEquals(Arrays.asList("col1"), par.getColumns());
-                    Assert.assertEquals(STREAM1, spec.getStreamId());
-
-                    Assert.assertEquals(1, spec.getTargetQueue().size());
-                    List<PolicyWorkerQueue> queues = spec.getTargetQueue();
-                    Assert.assertEquals(1, queues.size());
-                    Assert.assertEquals(5, queues.get(0).getWorkers().size());
-                    for (WorkSlot slot : queues.get(0).getWorkers()) {
-                        Assert.assertEquals(topo1, slot.getTopologyName());
-                        LOG.info(slot.getBoltId());
-                    }
-                }
-            }
-            // grp-spec for second topology is still empty
-            {
-                RouterSpec gs2 = gsit.next();
-                Assert.assertEquals(version, gs2.getVersion());
-                List<StreamRouterSpec> routeSpecs = gs2.getRouterSpecs();
-                Assert.assertEquals(0, routeSpecs.size());
-            }
-        }
-        // alert spec
-        {
-            Assert.assertEquals(2, status.getAlertSpecs().values().size());
-            Iterator<AlertBoltSpec> asit = status.getAlertSpecs().values().iterator();
-            {
-                AlertBoltSpec alertSpec = asit.next();
-                Assert.assertEquals(version, alertSpec.getVersion());
-                String topo1 = alertSpec.getTopologyName();
-                LOG.info("alert spec topology name {}", topo1);
-                for (List<String> definitions : alertSpec.getBoltPolicyIdsMap().values()) {
-                    Assert.assertEquals(2, definitions.size());
-                    // List<String> names = Arrays.asList(definitions.stream().map((t) ->
-                    // t.getName()).toArray(String[]::new));
-                    Assert.assertTrue(definitions.contains(TEST_POLICY_1));
-                    Assert.assertTrue(definitions.contains(TEST_POLICY_2));
-                }
-            }
-            // second spout
-            {
-                AlertBoltSpec spec = asit.next();
-                Assert.assertEquals(0, spec.getBoltPolicyIdsMap().size());
-            }
-        }
-    }
-
-    public static Pair<MonitoredStream, StreamWorkSlotQueue> getQueue(IScheduleContext context, String queueId) {
-        for (MonitoredStream ms : context.getMonitoredStreams().values()) {
-            for (StreamWorkSlotQueue q : ms.getQueues()) {
-                if (q.getQueueId().equals(queueId)) {
-                    return Pair.of(ms, q);
-                }
-            }
-        }
-        return null;
-    }
-
-    @Test
-    public void testGroupEquals() {
-        StreamRepartitionStrategy gs1 = new StreamRepartitionStrategy();
-        StreamPartition sp = new StreamPartition();
-        sp.setColumns(Arrays.asList("col1"));
-        sp.setSortSpec(new StreamSortSpec());
-        sp.setStreamId("testStream");
-        sp.setType(StreamPartition.Type.GROUPBY);
-        gs1.partition = sp;
-
-        StreamRepartitionStrategy gs2 = new StreamRepartitionStrategy();
-        sp = new StreamPartition();
-        sp.setColumns(Arrays.asList("col1"));
-        sp.setSortSpec(new StreamSortSpec());
-        sp.setStreamId("testStream");
-        sp.setType(StreamPartition.Type.GROUPBY);
-        gs2.partition = sp;
-
-        Assert.assertTrue(gs1.equals(gs2));
-        List<StreamRepartitionStrategy> list = new ArrayList<StreamRepartitionStrategy>();
-        list.add(gs1);
-        Assert.assertTrue(list.contains(gs2));
-    }
-
-    private void createSamplePolicy(InMemScheduleConext context, String policyName, String stream, int hint) {
-        PolicyDefinition pd = new PolicyDefinition();
-        pd.setParallelismHint(hint);
-        Definition def = new Definition();
-        pd.setDefinition(def);
-        pd.setName(policyName);
-        pd.setInputStreams(Arrays.asList(stream));
-        pd.setOutputStreams(Arrays.asList("outputStream2"));
-        StreamPartition par = new StreamPartition();
-        par.setColumns(Arrays.asList("col1"));
-        par.setType(StreamPartition.Type.GLOBAL);
-        par.setStreamId(stream);
-        pd.setPartitionSpec(Arrays.asList(par));
-        context.addPoilcy(pd);
-    }
-
-    /**
-     * Add and remove
-     */
-    @Test
-    public void test_schedule2_remove() {
-        // TODO
-    }
-
-    @Test
-    public void test_schedule_updateParitition() {
-        // This case design test is move to outter logic of ScheduleConetxtBuilder
-    }
-
-    @Test
-    public void test_schedule_updateDefinition() {
-        // This case design test is move to outter logic of ScheduleConetxtBuilder
-    }
-
-    @Test
-    public void test_schedule_nogroupby() {
-        // TODO
-    }
-
-    @SuppressWarnings("unused")
-    @Test
-    public void test_schedule_multipleStream() throws Exception {
-        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService();
-        IScheduleContext context = createScheduleContext(mgmtService);
-        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
-
-        createJoinPolicy((InMemScheduleConext) context, JOIN_POLICY_1, Arrays.asList(STREAM1, STREAM2));
-
-        ps.init(context, mgmtService);
-        ScheduleOption option = new ScheduleOption();
-        ps.schedule(option);
-        ScheduleState state = ps.getState();
-
-        context = ps.getContext(); // context updated!
-        // assert
-        Assert.assertTrue(context.getPolicyAssignments().containsKey(JOIN_POLICY_1));
-        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-        PolicyAssignment pa1 = context.getPolicyAssignments().get(JOIN_POLICY_1);
-        PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_1);
-        Assert.assertNotEquals(pa1.getQueueId(), pa2.getQueueId());
-
-        StreamWorkSlotQueue joinPair = getQueue(context, pa1.getQueueId()).getRight();
-        String joinTopo = joinPair.getWorkingSlots().get(0).topologyName;
-        StreamWorkSlotQueue streamPair = getQueue(context, pa2.getQueueId()).getRight();
-        String streamTopo = streamPair.getWorkingSlots().get(0).topologyName;
-        Assert.assertNotEquals(joinTopo, streamTopo);
-
-        // TODO more assert on state
-        SpoutSpec joinSpout = state.getSpoutSpecs().get(joinTopo);
-        RouterSpec groupSpec = state.getGroupSpecs().get(joinTopo);
-        AlertBoltSpec alertSpec = state.getAlertSpecs().get(joinTopo);
-
-        Assert.assertEquals(1, joinSpout.getStreamRepartitionMetadataMap().size());
-        Assert.assertEquals(2, joinSpout.getStreamRepartitionMetadataMap().get(TEST_TOPIC).size());
-
-        Assert.assertEquals(2, groupSpec.getRouterSpecs().size());
-
-        LOG.info(new ObjectMapper().writeValueAsString(state));
-    }
-
-    private void createJoinPolicy(InMemScheduleConext context, String policyName, List<String> asList) {
-        PolicyDefinition pd = new PolicyDefinition();
-        pd.setParallelismHint(5);
-        Definition def = new Definition();
-        pd.setDefinition(def);
-        pd.setName(policyName);
-        pd.setInputStreams(asList);
-        pd.setOutputStreams(Arrays.asList("outputStream2"));
-        for (String streamId : pd.getInputStreams()) {
-            StreamPartition par = new StreamPartition();
-            par.setColumns(Arrays.asList("col1"));
-            par.setType(StreamPartition.Type.GROUPBY);
-            par.setStreamId(streamId);
-            pd.addPartition(par);
-        }
-        context.addPoilcy(pd);
-    }
-
-    @Test
-    public void testIrregularPolicyParallelismHint() {
-        Config config = ConfigFactory.load();
-        int defaultParallelism = config.getInt("coordinator.policyDefaultParallelism");
-        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 12);
-        InMemScheduleConext context = createScheduleContext(mgmtService);
-        // recreate test poicy
-        context.getPolicies().clear();
-        // make the hint bigger than bolt number
-        int irregularParallelism = defaultParallelism + 2;
-        createSamplePolicy(context, "irregularPolicy", STREAM1, irregularParallelism);
-        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
-
-        ps.init(context, mgmtService);
-
-        ScheduleState scheduled = ps.schedule(new ScheduleOption());
-        Assert.assertEquals(2, scheduled.getSpoutSpecs().size());
-        Assert.assertEquals(2, scheduled.getGroupSpecs().size());
-        Assert.assertEquals(2, scheduled.getAlertSpecs().size());
-        // assertion
-        RouterSpec spec = scheduled.getGroupSpecs().get(TOPO1);
-        Assert.assertTrue(spec.getRouterSpecs().size() > 0); // must be allocated
-        for (StreamRouterSpec routerSpec : spec.getRouterSpecs()) {
-            Assert.assertEquals(1, routerSpec.getTargetQueue().size());
-            // irregularParallelism is prompted to 2 * defaultParallelism = 10
-            Assert.assertEquals(10, routerSpec.getTargetQueue().get(0).getWorkers().size());
-        }
-    }
-
-    @Test
-    public void testDataSources() throws Exception {
-        InMemScheduleConext context = loadContext("/multi/");
-        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(4, 10);
-
-        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
-        ps.init(context, mgmtService);
-
-        ScheduleState state = ps.schedule(new ScheduleOption());
-        Assert.assertNotNull(state);
-        Assert.assertEquals(2, state.getAssignments().size());
-        Assert.assertEquals(1, state.getAlertSpecs().size());
-        Assert.assertEquals(10, state.getAlertSpecs().get("alertUnitTopology_1").getBoltPolicyIdsMap().size());
-    }
-
-    private InMemScheduleConext loadContext(String base) throws Exception {
-        InMemScheduleConext context = new InMemScheduleConext();
-
-        List<Kafka2TupleMetadata> metadata = loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
-        for (Kafka2TupleMetadata k : metadata) {
-            context.addDataSource(k);
-        }
-
-        List<PolicyDefinition> policies = loadEntities(base + "policies.json", PolicyDefinition.class);
-        for (PolicyDefinition p : policies) {
-            context.addPoilcy(p);
-        }
-
-        List<Publishment> pubs = loadEntities(base + "publishments.json", Publishment.class);
-        for (Publishment pub : pubs) {
-            context.addPublishment(pub);
-        }
-
-        List<StreamDefinition> defs = loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
-        for (StreamDefinition def : defs) {
-            context.addSchema(def);
-        }
-
-        List<Topology> topos = loadEntities(base + "topologies.json", Topology.class);
-        for (Topology t : topos) {
-            context.addTopology(t);
-
-            TopologyUsage u = new TopologyUsage(t.getName());
-            for (String gnid : t.getGroupNodeIds()) {
-                u.getGroupUsages().put(gnid, new GroupBoltUsage(gnid));
-            }
-            for (String anid : t.getAlertBoltIds()) {
-                u.getAlertUsages().put(anid, new AlertBoltUsage(anid));
-            }
-            context.addTopologyUsages(u);
-        }
-
-        return context;
-    }
-
-    public static <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
-        System.out.println(FileUtils.readFileToString(new File(SchedulerTest.class.getResource(path).getPath())));
-        JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
-        List<T> l = mapper.readValue(SchedulerTest.class.getResourceAsStream(path), type);
-        return l;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
deleted file mode 100644
index 1f3baf5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import com.google.common.base.Joiner;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.coordinator.ExclusiveExecutor;
-import org.apache.eagle.alert.utils.ZookeeperEmbedded;
-import org.junit.*;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-@Ignore
-public class TestExclusiveExecutor {
-
-    ZookeeperEmbedded zkEmbed;
-
-    @Before
-    public void setUp() throws Exception {
-        zkEmbed = new ZookeeperEmbedded(2181);
-        zkEmbed.start();
-
-        Thread.sleep(2000);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        zkEmbed.shutdown();
-    }
-
-    @Test
-    public void testConcurrency() throws Exception {
-        ByteArrayOutputStream newStreamOutput = new ByteArrayOutputStream();
-        PrintStream newStream = new PrintStream(newStreamOutput);
-        PrintStream oldStream = System.out;
-
-        System.setOut(newStream);
-
-        ZKConfig zkConfig = new ZKConfig();
-        zkConfig.zkQuorum = "127.0.0.1:2181";
-        zkConfig.zkRetryTimes = 3;
-        zkConfig.zkRoot = "/";
-        zkConfig.connectionTimeoutMs  = 3000;
-        zkConfig.zkRetryInterval  = 1000;
-        zkConfig.zkSessionTimeoutMs = 5000;
-
-        String path = "/concurrenty";
-        AtomicBoolean lock1 = new AtomicBoolean(false);
-        Runnable runnableOne = () -> { System.out.println("this is thread one"); lock1.set(true);};
-        new Thread(() -> {
-            ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig);
-            try {
-                executor.execute(path, runnableOne);
-            } catch (TimeoutException e) {
-            }
-        }).start();
-
-
-        AtomicBoolean lock2 = new AtomicBoolean();
-        Runnable runnableTwo = () ->  { System.out.println("this is thread two"); lock2.set(true);};
-        new Thread(() -> {
-            ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig);
-            try {
-                executor.execute(path, runnableTwo);
-            } catch (TimeoutException e) {
-            }
-        }).start();
-
-        Thread.sleep(2000);
-
-        System.out.flush();
-        BufferedReader br = new BufferedReader(new StringReader(newStreamOutput.toString()));
-        List<String> logs = new ArrayList<String>();
-        String line = null;
-        while ((line = br.readLine()) != null) {
-            logs.add(line);
-        }
-
-        System.setOut(oldStream);
-        System.out.println("Cached logs: " + Joiner.on("\n").join(logs));
-
-        Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread one")));
-        Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread two")));
-
-        Assert.assertTrue(lock1.get());
-        Assert.assertTrue(lock2.get());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
deleted file mode 100644
index 875bb81..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.coordinator.ExclusiveExecutor;
-import org.apache.eagle.alert.utils.ZookeeperEmbedded;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TestGreedyScheduleCoordinator {
-
-    public static class GreedyScheduleCoordinator {
-
-        public int schedule(int input) throws TimeoutException {
-            Config config = ConfigFactory.load().getConfig("coordinator");
-            ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-            ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig);
-            final AtomicInteger r = new AtomicInteger();
-            executor.execute("/alert/test", () -> {
-                try {
-                    Thread.sleep(input);
-                } catch (Exception e){
-                }
-
-                r.set(input);
-            });
-            try {
-                executor.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-            throw new RuntimeException("Acquire greedy scheduler lock failed, please retry later");
-        }
-    }
-
-    ZookeeperEmbedded zkEmbed;
-
-    @Before
-    public void setUp() throws Exception {
-        zkEmbed = new ZookeeperEmbedded(2181);
-        zkEmbed.start();
-
-        Thread.sleep(2000);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        zkEmbed.shutdown();
-    }
-
-    @Test
-    public void testMain() throws Exception {
-        final GreedyScheduleCoordinator coordinator = new GreedyScheduleCoordinator();
-
-
-        new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    System.out.println("output: " + coordinator.schedule(1));
-                } catch (TimeoutException e) {
-                }
-
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                }
-            }
-
-        }).start();
-
-        new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    System.out.println("output: " + coordinator.schedule(2));
-                } catch (TimeoutException e) {
-                }
-
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                }
-            }
-
-        }).start();
-
-        new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                try {
-                    System.out.println("output: " + coordinator.schedule(3));
-                } catch (TimeoutException e) {
-                }
-
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                }
-            }
-
-        }).start();
-
-        Thread.sleep(15000);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java
deleted file mode 100644
index c9d3b5e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.eagle.alert.coordinator.impl.MetadataValdiator;
-import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
-import org.junit.Test;
-
-/**
- * Created on 10/2/16.
- */
-public class TestMetadataValidator {
-
-    private static final ObjectMapper om = new ObjectMapper();
-
-    @Test
-    public void validate() throws Exception {
-        InMemScheduleConext context = new InMemScheduleConext();
-        MetadataValdiator mv = new MetadataValdiator(context);
-
-
-        // om.readValue(TestMetadataValidator.class.getResourceAsStream("/validation/datasources.json"), new Gene);
-        // TODO add more test here.
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
deleted file mode 100644
index 56ee980..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.alert.coordinator.mock.TestTopologyMgmtService;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordinator.impl.WorkQueueBuilder;
-import org.apache.eagle.alert.coordinator.impl.strategies.SameTopologySlotStrategy;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since Apr 27, 2016
- */
-public class WorkSlotStrategyTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(WorkSlotStrategyTest.class);
-
-    @Test
-    public void test() {
-        InMemScheduleConext context = new InMemScheduleConext();
-
-        StreamPartition partition = new StreamPartition();
-        partition.setType(StreamPartition.Type.GLOBAL);
-        partition.setStreamId("s1");
-        partition.setColumns(Arrays.asList("f1", "f2"));
-
-        StreamGroup group = new StreamGroup();
-        group.addStreamPartition(partition);
-
-
-        {
-            TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(3, 3, "prefix-time1", true);
-            SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService);
-            List<WorkSlot> slots = strategy.reserveWorkSlots(5, false, new HashMap<String, Object>());
-            Assert.assertEquals(0, slots.size());
-            Assert.assertEquals(1, context.getTopologies().size());
-        }
-
-        {
-            TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 5, "prefix-time2", true);
-            SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService);
-            List<WorkSlot> slots = strategy.reserveWorkSlots(5, false, new HashMap<String, Object>());
-            Assert.assertEquals(5, slots.size());
-            LOG.info(slots.get(0).getTopologyName());
-            Assert.assertEquals(2, context.getTopologies().size());
-            Assert.assertEquals(2, context.getTopologyUsages().size());
-
-            // assert all on same topology
-            for (WorkSlot ws : slots) {
-                Assert.assertEquals(slots.get(0).getTopologyName(), ws.getTopologyName());
-            }
-            Iterator<TopologyUsage> it = context.getTopologyUsages().values().iterator();
-            TopologyUsage usage = it.next();
-            for (AlertBoltUsage u : usage.getAlertUsages().values()) {
-                Assert.assertTrue(u.getPartitions().size() == 0);
-                Assert.assertTrue(u.getQueueSize() == 0);
-            }
-            // assert
-            usage = it.next();
-            for (AlertBoltUsage u : usage.getAlertUsages().values()) {
-                LOG.info(u.getBoltId());
-                Assert.assertTrue(u.getPartitions().size() == 0);
-                Assert.assertTrue(u.getBoltId(), u.getQueueSize() == 0);
-            }
-        }
-    }
-
-    @SuppressWarnings("unused")
-    @Test
-    public void test2_overlap() {
-        InMemScheduleConext context = new InMemScheduleConext();
-
-        StreamPartition partition = new StreamPartition();
-        partition.setType(StreamPartition.Type.GLOBAL);
-        partition.setStreamId("s1");
-        partition.setColumns(Arrays.asList("f1", "f2"));
-        StreamGroup sg = new StreamGroup();
-        sg.addStreamPartition(partition, false);
-
-        MonitoredStream ms1 = new MonitoredStream(sg);
-
-        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 5, "prefix-3", true);
-
-        String topo1 = null;
-        String bolt1 = null;
-        WorkQueueBuilder wrb = new WorkQueueBuilder(context, mgmtService);
-        StreamWorkSlotQueue queue = wrb.createQueue(ms1, false, 5, new HashMap<String, Object>());
-        {
-            Assert.assertEquals(5, queue.getWorkingSlots().size());
-            topo1 = queue.getWorkingSlots().get(0).getTopologyName();
-            bolt1 = queue.getWorkingSlots().get(0).getBoltId();
-            Assert.assertEquals(1, context.getTopologies().size());
-            Assert.assertEquals(1, context.getTopologyUsages().size());
-            LOG.info(queue.getWorkingSlots().get(0).getTopologyName());
-            for (WorkSlot ws : queue.getWorkingSlots()) {
-                Assert.assertEquals(topo1, ws.getTopologyName());
-            }
-
-            TopologyUsage usage = context.getTopologyUsages().values().iterator().next();
-            for (AlertBoltUsage u : usage.getAlertUsages().values()) {
-                Assert.assertTrue(u.getPartitions().size() > 0);
-                Assert.assertTrue(u.getBoltId(), u.getQueueSize() > 0);
-            }
-        }
-
-        // second partition
-        StreamPartition partition2 = new StreamPartition();
-        partition2.setType(StreamPartition.Type.GLOBAL);
-        partition2.setStreamId("s2");
-        partition2.setColumns(Arrays.asList("f1", "f2"));
-
-        StreamGroup sg2 = new StreamGroup();
-        sg2.addStreamPartition(partition2);
-        MonitoredStream ms2 = new MonitoredStream(sg2);
-        queue = wrb.createQueue(ms2, false, 5, new HashMap<String, Object>());
-        {
-            Assert.assertEquals(5, queue.getWorkingSlots().size());
-            Assert.assertEquals(2, context.getTopologies().size());
-            Assert.assertEquals(2, context.getTopologyUsages().size());
-
-            String topo2 = queue.getWorkingSlots().get(0).getTopologyName();
-            String bolt2 = queue.getWorkingSlots().get(0).getBoltId();
-            for (WorkSlot ws : queue.getWorkingSlots()) {
-                Assert.assertEquals(topo2, ws.getTopologyName());
-            }
-            Assert.assertNotEquals(topo1, topo2);
-        }
-    }
-    
-    @Test
-    public void testMultipleStreams() {
-    	ConfigFactory.invalidateCaches();
-        System.setProperty("config.resource", "/application-multiplestreams.conf");
-    	
-        InMemScheduleConext context = new InMemScheduleConext();
-
-        StreamGroup group1 = createStreamGroup("s1", Arrays.asList("f1", "f2"), true);
-        StreamGroup group2 = createStreamGroup("s2", Arrays.asList("f2", "f3"), false);
-        StreamGroup group3 = createStreamGroup("s3", Arrays.asList("f4"), false);
-        StreamGroup group4 = createStreamGroup("s4", Arrays.asList("f5"), false);
-
-        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(3, 4, "prefix-time1", true);
-        WorkQueueBuilder wrb = new WorkQueueBuilder(context, mgmtService);
-        {
-            StreamWorkSlotQueue queue = wrb.createQueue(new MonitoredStream(group1), group1.isDedicated(), 2, new HashMap<String, Object>());
-            print(context.getTopologyUsages().values());
-            
-            TopologyUsage usage = context.getTopologyUsages().values().iterator().next();
-            
-            Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group1));
-            Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().size());
-            Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().size());
-            
-            List<String> group1Slots = new ArrayList<String>();
-            getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().forEach(slot -> {
-            	group1Slots.add(slot.getBoltId());
-            });
-         
-            StreamWorkSlotQueue queue2 = wrb.createQueue(new MonitoredStream(group2), group2.isDedicated(), 2, new HashMap<String, Object>());
-            print(context.getTopologyUsages().values());
-            
-            Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group2));
-            Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().size());
-            Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().size());
-            getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().forEach(slot -> {
-            	Assert.assertTrue(!group1Slots.contains(slot.getBoltId()));
-            });
-            
-
-            StreamWorkSlotQueue queue3 = wrb.createQueue(new MonitoredStream(group3), group3.isDedicated(), 2, new HashMap<String, Object>());
-            print(context.getTopologyUsages().values());
-            
-            Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group3));
-            Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group3).getQueues().size());
-            Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group3).getQueues().get(0).getWorkingSlots().size());
-            getMonitorStream(usage.getMonitoredStream()).get(group3).getQueues().get(0).getWorkingSlots().forEach(slot -> {
-            	Assert.assertTrue(!group1Slots.contains(slot.getBoltId()));
-            });
-            
-            StreamWorkSlotQueue queue4 = wrb.createQueue(new MonitoredStream(group4), group4.isDedicated(), 2, new HashMap<String, Object>());
-            print(context.getTopologyUsages().values());
-            
-            Assert.assertTrue(!getMonitorStream(usage.getMonitoredStream()).containsKey(group4));
-            
-        }
-    }
-    
-    @Test
-    public void testMultipleStreamsWithoutReuse() {
-    	ConfigFactory.invalidateCaches();
-        System.setProperty("config.resource", "/application-multiplestreams2.conf");
-    	
-        InMemScheduleConext context = new InMemScheduleConext();
-
-        StreamGroup group1 = createStreamGroup("s1", Arrays.asList("f1", "f2"), true);
-        StreamGroup group2 = createStreamGroup("s2", Arrays.asList("f2", "f3"), false);
-        StreamGroup group3 = createStreamGroup("s3", Arrays.asList("f4"), false);
-        StreamGroup group4 = createStreamGroup("s4", Arrays.asList("f5"), false);
-
-        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(3, 4, "prefix-time1", true);
-        WorkQueueBuilder wrb = new WorkQueueBuilder(context, mgmtService);
-        {
-            StreamWorkSlotQueue queue = wrb.createQueue(new MonitoredStream(group1), group1.isDedicated(), 2, new HashMap<String, Object>());
-            print(context.getTopologyUsages().values());
-            
-            TopologyUsage usage = context.getTopologyUsages().values().iterator().next();
-            
-            Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group1));
-            Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().size());
-            Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().size());
-            
-            List<String> group1Slots = new ArrayList<String>();
-            getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().forEach(slot -> {
-            	group1Slots.add(slot.getBoltId());
-            });
-         
-            StreamWorkSlotQueue queue2 = wrb.createQueue(new MonitoredStream(group2), group2.isDedicated(), 2, new HashMap<String, Object>());
-            print(context.getTopologyUsages().values());
-            
-            Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group2));
-            Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().size());
-            Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().size());
-            getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().forEach(slot -> {
-            	Assert.assertTrue(!group1Slots.contains(slot.getBoltId()));
-            });
-            
-
-            StreamWorkSlotQueue queue3 = wrb.createQueue(new MonitoredStream(group3), group3.isDedicated(), 2, new HashMap<String, Object>());
-            print(context.getTopologyUsages().values());
-            
-            Assert.assertTrue(!getMonitorStream(usage.getMonitoredStream()).containsKey(group3));
-            
-            StreamWorkSlotQueue queue4 = wrb.createQueue(new MonitoredStream(group4), group4.isDedicated(), 2, new HashMap<String, Object>());
-            print(context.getTopologyUsages().values());
-            
-            Assert.assertTrue(!getMonitorStream(usage.getMonitoredStream()).containsKey(group4));
-            
-        }
-    }
-    
-    private Map<StreamGroup, MonitoredStream> getMonitorStream(List<MonitoredStream> monitorStreams) {
-    	Map<StreamGroup, MonitoredStream> result = new HashMap<StreamGroup, MonitoredStream>();
-    	monitorStreams.forEach(monitorStream -> {
-    		result.put(monitorStream.getStreamGroup(), monitorStream);
-    	});
-    	return result;
-    }
-    
-    private StreamGroup createStreamGroup(String streamId, List<String> columns, boolean dedicated) {
-    	StreamPartition partition = new StreamPartition();
-        partition.setType(StreamPartition.Type.GLOBAL);
-        partition.setStreamId(streamId);
-        partition.setColumns(columns);
-
-        StreamGroup group = new StreamGroup();
-        group.addStreamPartition(partition, dedicated);
-        return group;
-    }
-    
-    private void print(Collection<TopologyUsage> usages) {
-    	try {
-    		ObjectMapper om = new ObjectMapper();
-        	LOG.info(">>>" + om.writeValueAsString(usages));
-    	} catch (Exception e) {}
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
deleted file mode 100644
index 826cde4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator.mock;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-
-/**
- * According to metadata servic client semantic, change to the interface returned value should not direclty change the
- * states.
- *
- * @since May 5, 2016
- */
-@SuppressWarnings("serial")
-public class InMemMetadataServiceClient implements IMetadataServiceClient {
-
-    private List<StreamingCluster> clusters = new ArrayList<StreamingCluster>();
-    private List<Topology> topologies = new ArrayList<Topology>();
-    private List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>();
-    private List<StreamDefinition> definitions = new ArrayList<StreamDefinition>();
-    private List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>();
-
-    private SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>();
-    private List<SpoutSpec> spoutSpecs = new ArrayList<SpoutSpec>();
-    private List<Publishment> publishmetns = new ArrayList<Publishment>();
-    private List<AlertPublishEvent> alerts = new ArrayList<>();
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public List<StreamingCluster> listClusters() {
-        return Collections.unmodifiableList(clusters);
-    }
-
-    @Override
-    public List<Topology> listTopologies() {
-        return Collections.unmodifiableList(topologies);
-    }
-
-    @Override
-    public List<PolicyDefinition> listPolicies() {
-        return Collections.unmodifiableList(policies);
-    }
-
-    public void removePolicy(int idx) {
-        policies.remove(idx);
-    }
-
-    @Override
-    public List<StreamDefinition> listStreams() {
-        return Collections.unmodifiableList(definitions);
-    }
-
-    @Override
-    public List<Kafka2TupleMetadata> listDataSources() {
-        return Collections.unmodifiableList(datasources);
-    }
-
-    @Override
-    public List<Publishment> listPublishment() {
-        return Collections.unmodifiableList(publishmetns);
-    }
-
-    @Override
-    public List<SpoutSpec> listSpoutMetadata() {
-        return Collections.unmodifiableList(spoutSpecs);
-    }
-
-    @Override
-    public ScheduleState getVersionedSpec() {
-        Iterator<Entry<String, ScheduleState>> it = scheduleStates.entrySet().iterator();
-        if (it.hasNext()) {
-            return it.next().getValue();
-        }
-        return null;
-    }
-
-    @Override
-    public ScheduleState getVersionedSpec(String version) {
-        return scheduleStates.get(version);
-    }
-
-    @Override
-    public void addScheduleState(ScheduleState state) {
-        scheduleStates.put(state.getVersion(), state);
-    }
-
-    @Override
-    public void addStreamingCluster(StreamingCluster cluster) {
-        clusters.add(cluster);
-    }
-
-    @Override
-    public void addStreamingClusters(List<StreamingCluster> clusters) {
-        this.clusters.addAll(clusters);
-    }
-
-    @Override
-    public void addTopology(Topology t) {
-        topologies.add(t);
-    }
-
-    @Override
-    public void addTopologies(List<Topology> topologies) {
-        this.topologies.addAll(topologies);
-    }
-
-    @Override
-    public void addPolicy(PolicyDefinition policy) {
-        policies.add(policy);
-    }
-
-    @Override
-    public void addPolicies(List<PolicyDefinition> policies) {
-        this.policies.addAll(policies);
-    }
-
-    @Override
-    public void addStreamDefinition(StreamDefinition streamDef) {
-        definitions.add(streamDef);
-    }
-
-    @Override
-    public void addStreamDefinitions(List<StreamDefinition> streamDefs) {
-        this.definitions.addAll(streamDefs);
-    }
-
-    @Override
-    public void addDataSource(Kafka2TupleMetadata k2t) {
-        datasources.add(k2t);
-    }
-
-    @Override
-    public void addDataSources(List<Kafka2TupleMetadata> k2ts) {
-        this.datasources.addAll(k2ts);
-    }
-
-    @Override
-    public void addPublishment(Publishment pub) {
-        publishmetns.add(pub);
-    }
-
-    @Override
-    public void addPublishments(List<Publishment> pubs) {
-        this.publishmetns.addAll(pubs);
-    }
-
-    @Override
-    public void clear() {
-        // do nothing
-    }
-
-    @Override
-    public void clearScheduleState(int maxCapacity) {
-
-    }
-
-    @Override
-    public List<AlertPublishEvent> listAlertPublishEvent() {
-        return this.alerts;
-    }
-
-    @Override
-    public void addAlertPublishEvent(AlertPublishEvent event) {
-        this.alerts.add(event);
-    }
-
-    @Override
-    public void addAlertPublishEvents(List<AlertPublishEvent> events) {
-        this.alerts.addAll(events);
-    }
-
-}


Mime
View raw message