Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D949200C82 for ; Sat, 27 May 2017 10:03:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3C2ED160BD7; Sat, 27 May 2017 08:03:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5C22C160BC6 for ; Sat, 27 May 2017 10:03:53 +0200 (CEST) Received: (qmail 56921 invoked by uid 500); 27 May 2017 08:03:52 -0000 Mailing-List: contact commits-help@eagle.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.apache.org Delivered-To: mailing list commits@eagle.apache.org Received: (qmail 56912 invoked by uid 99); 27 May 2017 08:03:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 27 May 2017 08:03:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7944BDFDD5; Sat, 27 May 2017 08:03:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: qingwzhao@apache.org To: commits@eagle.apache.org Message-Id: <161f9630cbb541ba883d57463d0d86a9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: eagle git commit: [EAGLE-1029] fix bug in generateGroupbyMonitorMetadata Date: Sat, 27 May 2017 08:03:52 +0000 (UTC) archived-at: Sat, 27 May 2017 08:03:54 -0000 Repository: eagle Updated Branches: refs/heads/branch-0.5 cf3e0b21d -> d6d859fbd [EAGLE-1029] fix bug in generateGroupbyMonitorMetadata https://issues.apache.org/jira/browse/EAGLE-1029 Author: Zhao, Qingwen Closes #939 from qingwen220/EAGLE-1029. (cherry picked from commit 1fb60cc762da24625f37c599c2ea89d693adad67) Signed-off-by: Zhao, Qingwen Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/d6d859fb Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/d6d859fb Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/d6d859fb Branch: refs/heads/branch-0.5 Commit: d6d859fbdbd8d3d86cd4595303a9101d0ae0dddd Parents: cf3e0b2 Author: Zhao, Qingwen Authored: Sat May 27 16:03:19 2017 +0800 Committer: Zhao, Qingwen Committed: Sat May 27 16:03:43 2017 +0800 ---------------------------------------------------------------------- .../impl/MonitorMetadataGenerator.java | 10 +++-- .../apache/alert/coordinator/SchedulerTest.java | 41 ++++++++++++++++++++ .../src/test/resources/application.conf | 2 +- 3 files changed, 48 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/d6d859fb/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java index fb20e66..0b63f47 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java @@ -175,10 +175,12 @@ public class MonitorMetadataGenerator { routeSpec.setStreamId(partiton.getStreamId()); for (StreamWorkSlotQueue sq : ms.getQueues()) { - PolicyWorkerQueue queue = new PolicyWorkerQueue(); - queue.setWorkers(sq.getWorkingSlots()); - queue.setPartition(partiton); - routeSpec.addQueue(queue); + if (sq.getTopoGroupStartIndex().containsKey(u.getTopoName())) { + PolicyWorkerQueue queue = new PolicyWorkerQueue(); + queue.setWorkers(sq.getWorkingSlots()); + queue.setPartition(partiton); + routeSpec.addQueue(queue); + } } spec.addRouterSpec(routeSpec); http://git-wip-us.apache.org/repos/asf/eagle/blob/d6d859fb/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java index 1bfdd7b..1e61de8 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java @@ -63,6 +63,8 @@ public class SchedulerTest { private static final String TEST_POLICY_1 = "test-policy1"; private static final String TEST_POLICY_2 = "test-policy2"; private static final String TEST_POLICY_3 = "test-policy3"; + private static final String TEST_POLICY_4 = "test-policy4"; + private static final String TEST_POLICY_5 = "test-policy5"; private static final String STREAM1 = "stream1"; private static final String DS_NAME = "ds1"; private static ObjectMapper mapper = new ObjectMapper(); @@ -189,6 +191,45 @@ public class SchedulerTest { } } + @Test + public void testMonitorMetadataGenerator() { + TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(6, 10); + + GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); + + // topology has + InMemScheduleConext context = createScheduleContext(mgmtService); + createSamplePolicy(context, TEST_POLICY_1, STREAM1, PARALELLISM); + createSamplePolicy(context, TEST_POLICY_2, STREAM1, PARALELLISM); + createSamplePolicy(context, TEST_POLICY_3, STREAM1, PARALELLISM); + createSamplePolicy(context, TEST_POLICY_4, STREAM1, PARALELLISM); + + ps.init(context, mgmtService); + ScheduleOption option = new ScheduleOption(); + option.setPoliciesPerBolt(1); + ps.schedule(option); + ScheduleState state = ps.getState(); + + Assert.assertTrue(state.getGroupSpecs().get("topo2").getRouterSpecs().size() == 0); + + context = createScheduleContext(mgmtService); + createSamplePolicy(context, TEST_POLICY_1, STREAM1, PARALELLISM); + createSamplePolicy(context, TEST_POLICY_2, STREAM1, PARALELLISM); + createSamplePolicy(context, TEST_POLICY_3, STREAM1, PARALELLISM); + createSamplePolicy(context, TEST_POLICY_4, STREAM1, PARALELLISM); + createSamplePolicy(context, TEST_POLICY_5, STREAM1, PARALELLISM); + + ps.init(context, mgmtService); + ps.schedule(option); + state = ps.getState(); + + for(StreamRouterSpec spec : state.getGroupSpecs().get("topo2").getRouterSpecs()) { + if (spec.getStreamId().equals(STREAM1)) { + Assert.assertTrue(spec.getTargetQueue().size() == 1); + } + } + } + private TestTopologyMgmtService createMgmtService() { TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(); return mgmtService; http://git-wip-us.apache.org/repos/asf/eagle/blob/d6d859fb/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf index 5d4da38..b92d3eb 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf @@ -15,7 +15,7 @@ { "coordinator": { - "policiesPerBolt": 5, + "policiesPerBolt": 2, "boltParallelism": 5, "policyDefaultParallelism": 5, "boltLoadUpbound": 0.8,