Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EA419200BF0 for ; Fri, 16 Dec 2016 03:16:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E8CA8160B2D; Fri, 16 Dec 2016 02:16:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3CE0B160B15 for ; Fri, 16 Dec 2016 03:16:08 +0100 (CET) Received: (qmail 71969 invoked by uid 500); 16 Dec 2016 02:16:07 -0000 Mailing-List: contact issues-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list issues@eagle.incubator.apache.org Received: (qmail 71960 invoked by uid 99); 16 Dec 2016 02:16:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Dec 2016 02:16:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id F183B1823C9 for ; Fri, 16 Dec 2016 02:16:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -7.019 X-Spam-Level: X-Spam-Status: No, score=-7.019 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id d7CWDZAvZonM for ; Fri, 16 Dec 2016 02:16:05 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id C51745FB6B for ; Fri, 16 Dec 2016 02:16:04 +0000 (UTC) Received: (qmail 71932 invoked by uid 99); 16 Dec 2016 02:16:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Dec 2016 02:16:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D650AE3908; Fri, 16 Dec 2016 02:16:03 +0000 (UTC) From: garrettlish To: issues@eagle.incubator.apache.org Reply-To: issues@eagle.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-eagle pull request #732: [EAGLE-837] Stream definition change does... Content-Type: text/plain Message-Id: <20161216021603.D650AE3908@git1-us-west.apache.org> Date: Fri, 16 Dec 2016 02:16:03 +0000 (UTC) archived-at: Fri, 16 Dec 2016 02:16:09 -0000 Github user garrettlish commented on a diff in the pull request: https://github.com/apache/incubator-eagle/pull/732#discussion_r92740230 --- Diff: eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java --- @@ -443,6 +447,83 @@ public void reportError(Throwable error) { Assert.assertTrue(recieved.get()); } + + @Test + public void testStreamDefinitionChange() throws IOException { + PolicyDefinition def = new PolicyDefinition(); + def.setName("policy-definition"); + def.setInputStreams(Arrays.asList(TEST_STREAM)); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE); + definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler"); + definition.setValue("PT0M,plain,1,host,host1"); + def.setDefinition(definition); + def.setPartitionSpec(Arrays.asList(createPartition())); + + AlertBoltSpec boltSpecs = new AlertBoltSpec(); + + AtomicBoolean recieved = new AtomicBoolean(false); + OutputCollector collector = new OutputCollector(new IOutputCollector() { + @Override + public List emit(String streamId, Collection anchors, List tuple) { + recieved.set(true); + return Collections.emptyList(); + } + + @Override + public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { + } + + @Override + public void ack(Tuple input) { + } + + @Override + public void fail(Tuple input) { + } + + @Override + public void reportError(Throwable error) { + } + }); + AlertBolt bolt = createAlertBolt(collector); + + boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def)); + boltSpecs.setVersion("spec_" + System.currentTimeMillis()); + // stream def map + Map sds = new HashMap(); + StreamDefinition sdTest = new StreamDefinition(); + sdTest.setStreamId(TEST_STREAM); + sds.put(sdTest.getStreamId(), sdTest); + + boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish", null); + + bolt.onAlertBoltSpecChange(boltSpecs, sds); + + // how to assert + Tuple t = createTuple(bolt, boltSpecs.getVersion()); + + bolt.execute(t); + + Assert.assertTrue(recieved.get()); + + LOG.info("Update stream"); + sds = new HashMap(); + sdTest = new StreamDefinition(); + sdTest.setStreamId(TEST_STREAM); + sds.put(sdTest.getStreamId(), sdTest); + sdTest.setDescription("update the stream"); + bolt.onAlertBoltSpecChange(boltSpecs, sds); + + LOG.info("No any change"); + sds = new HashMap(); + sdTest = new StreamDefinition(); + sdTest.setStreamId(TEST_STREAM); + sds.put(sdTest.getStreamId(), sdTest); + sdTest.setDescription("update the stream"); + bolt.onAlertBoltSpecChange(boltSpecs, sds); --- End diff -- asserts updated in https://github.com/apache/incubator-eagle/pull/738 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---