eagle-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (EAGLE-837) Stream definition change does not reflect in AlertBolt
Date Fri, 16 Dec 2016 02:16:58 GMT

    [ https://issues.apache.org/jira/browse/EAGLE-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15753177#comment-15753177
] 

ASF GitHub Bot commented on EAGLE-837:
--------------------------------------

Github user garrettlish commented on a diff in the pull request:

    https://github.com/apache/incubator-eagle/pull/732#discussion_r92740230
  
    --- Diff: eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
---
    @@ -443,6 +447,83 @@ public void reportError(Throwable error) {
     
             Assert.assertTrue(recieved.get());
         }
    +    
    +    @Test
    +    public void testStreamDefinitionChange() throws IOException {
    +        PolicyDefinition def = new PolicyDefinition();
    +        def.setName("policy-definition");
    +        def.setInputStreams(Arrays.asList(TEST_STREAM));
    +
    +        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
    +        definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
    +        definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler");
    +        definition.setValue("PT0M,plain,1,host,host1");
    +        def.setDefinition(definition);
    +        def.setPartitionSpec(Arrays.asList(createPartition()));
    +
    +        AlertBoltSpec boltSpecs = new AlertBoltSpec();
    +
    +        AtomicBoolean recieved = new AtomicBoolean(false);
    +        OutputCollector collector = new OutputCollector(new IOutputCollector() {
    +            @Override
    +            public List<Integer> emit(String streamId, Collection<Tuple>
anchors, List<Object> tuple) {
    +                recieved.set(true);
    +                return Collections.emptyList();
    +            }
    +
    +            @Override
    +            public void emitDirect(int taskId, String streamId, Collection<Tuple>
anchors, List<Object> tuple) {
    +            }
    +
    +            @Override
    +            public void ack(Tuple input) {
    +            }
    +
    +            @Override
    +            public void fail(Tuple input) {
    +            }
    +
    +            @Override
    +            public void reportError(Throwable error) {
    +            }
    +        });
    +        AlertBolt bolt = createAlertBolt(collector);
    +
    +        boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
    +        boltSpecs.setVersion("spec_" + System.currentTimeMillis());
    +        // stream def map
    +        Map<String, StreamDefinition> sds = new HashMap();
    +        StreamDefinition sdTest = new StreamDefinition();
    +        sdTest.setStreamId(TEST_STREAM);
    +        sds.put(sdTest.getStreamId(), sdTest);
    +        
    +        boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish",
null);
    +
    +        bolt.onAlertBoltSpecChange(boltSpecs, sds);
    +
    +        // how to assert
    +        Tuple t = createTuple(bolt, boltSpecs.getVersion());
    +
    +        bolt.execute(t);
    +
    +        Assert.assertTrue(recieved.get());
    +        
    +        LOG.info("Update stream");
    +        sds = new HashMap();
    +        sdTest = new StreamDefinition();
    +        sdTest.setStreamId(TEST_STREAM);
    +        sds.put(sdTest.getStreamId(), sdTest);
    +        sdTest.setDescription("update the stream");
    +        bolt.onAlertBoltSpecChange(boltSpecs, sds);
    +        
    +        LOG.info("No any change");
    +        sds = new HashMap();
    +        sdTest = new StreamDefinition();
    +        sdTest.setStreamId(TEST_STREAM);
    +        sds.put(sdTest.getStreamId(), sdTest);
    +        sdTest.setDescription("update the stream");
    +        bolt.onAlertBoltSpecChange(boltSpecs, sds);
    --- End diff --
    
    asserts updated in https://github.com/apache/incubator-eagle/pull/738


> Stream definition change does not reflect in AlertBolt
> ------------------------------------------------------
>
>                 Key: EAGLE-837
>                 URL: https://issues.apache.org/jira/browse/EAGLE-837
>             Project: Eagle
>          Issue Type: Bug
>            Reporter: Garrett Li
>            Assignee: Garrett Li
>
> Stream definition change only trigger router bolt & publisher update, we don't update
corresponding alert bolt stream definition references. It will cause alert bolt still use
old stream definition references, it could produce array index out of bound exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message