Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (unknown [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 92DB3200BAE for ; Fri, 28 Oct 2016 09:23:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 91750160AE3; Fri, 28 Oct 2016 07:23:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B379C160ADD for ; Fri, 28 Oct 2016 09:23:17 +0200 (CEST) Received: (qmail 56777 invoked by uid 500); 28 Oct 2016 07:23:16 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 56768 invoked by uid 99); 28 Oct 2016 07:23:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 07:23:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 7A716C16A5 for ; Fri, 28 Oct 2016 07:23:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id e_NsD5bCtxgJ for ; Fri, 28 Oct 2016 07:23:15 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 143F95F477 for ; Fri, 28 Oct 2016 07:23:14 +0000 (UTC) Received: (qmail 98735 invoked by uid 99); 28 Oct 2016 06:54:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 06:54:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 289D1E5CE1; Fri, 28 Oct 2016 06:54:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: jinhuwu@apache.org To: commits@eagle.incubator.apache.org Message-Id: <0de7b378330848d3b8704735cf6f9666@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: =?utf-8?q?incubator-eagle_git_commit=3A_=5BEAGLE-694=5D_alert_engi?= =?utf-8?q?ne_could_not_reduce_alert_bolt_number_when_parallelism_of_p?= =?utf-8?b?4oCm?= Date: Fri, 28 Oct 2016 06:54:58 +0000 (UTC) archived-at: Fri, 28 Oct 2016 07:23:18 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master ab1c9b64f -> c4a0b94d7 [EAGLE-694] alert engine could not reduce alert bolt number when parallelism of p… Author: wujinhu Closes #577 from wujinhu/EAGLE-694. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c4a0b94d Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c4a0b94d Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c4a0b94d Branch: refs/heads/master Commit: c4a0b94d760b54b3613e11d67e355e982a18910f Parents: ab1c9b6 Author: wujinhu Authored: Fri Oct 28 14:54:51 2016 +0800 Committer: wujinhu Committed: Fri Oct 28 14:54:51 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/coordinator/PolicyDefinition.java | 2 +- .../coordinator/provider/ScheduleContextBuilder.java | 4 ++-- .../alert/coordinator/ScheduleContextBuilderTest.java | 13 +++++++------ 3 files changed, 10 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c4a0b94d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java index 6df682a..cfd7fef 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java @@ -153,7 +153,7 @@ public class PolicyDefinition implements Serializable { && (another.definition != null && another.definition.equals(this.definition)) && Objects.equals(this.definition, another.definition) && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) - // && another.parallelismHint == this.parallelismHint + && another.parallelismHint == this.parallelismHint ) { return true; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c4a0b94d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java index 69225da..98b598a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java @@ -224,8 +224,8 @@ public class ScheduleContextBuilder { } else { StreamWorkSlotQueue queue = queueMap.get(assignment.getQueueId()); if (queue == null - || policies.get(assignment.getPolicyName()).getParallelismHint() > queue.getQueueSize()) { - // queue not found or policy has hint bigger than queue (possible a poilcy update) + || policies.get(assignment.getPolicyName()).getParallelismHint() != queue.getQueueSize()) { + // queue not found or policy has hint not equal to queue (possible a poilcy update) LOG.info("Policy assignment {} 's policy doesnt match queue: {}!", assignment, queue); paIt.remove(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c4a0b94d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java index ac83c73..e7efbd7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java @@ -157,14 +157,15 @@ public class ScheduleContextBuilderTest { StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); PolicyDefinition pd1 = client.listPolicies().get(0); - pd1.setParallelismHint(4); // default queue is 5 , change to smaller, has no effect + pd1.setParallelismHint(4); // default queue is 5 , change to smaller, same like change bigger context = builder.buildContext(); - PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next(); - StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight(); - Assert.assertNotNull(queueNew); + Assert.assertFalse(context.getPolicyAssignments().values().iterator().hasNext()); + //PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next(); + //StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight(); + //Assert.assertNotNull(queueNew); // just to make sure queueNew is present - Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId()); + //Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId()); // default queue is 5 , change to bigger 6, policy assignment removed pd1.setParallelismHint(queue.getQueueSize() + 1); @@ -333,7 +334,7 @@ public class ScheduleContextBuilderTest { slots.add(slot2); slots.add(slot3); slots.add(slot4); - slots.add(slot5); + //slots.add(slot5); StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), slots); ms.addQueues(q);