beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dawid Wysakowicz (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (BEAM-3414) AfterProcessingTime trigger issue with Flink Runner
Date Mon, 19 Mar 2018 14:37:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Dawid Wysakowicz reassigned BEAM-3414:
--------------------------------------

    Assignee: Dawid Wysakowicz  (was: Aljoscha Krettek)

> AfterProcessingTime trigger issue with Flink Runner
> ---------------------------------------------------
>
>                 Key: BEAM-3414
>                 URL: https://issues.apache.org/jira/browse/BEAM-3414
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core, runner-flink
>    Affects Versions: 2.2.0
>         Environment: idea, ubuntu 16.04, FlinkRunner
>            Reporter: huangjianhuang
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>
> in my demo, I read data from kafka and count globally, finally output the total count
of recieved data, as follow:
> {code:java}
>         FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
>                 .as(FlinkPipelineOptions.class);
>         options.setStreaming(true);
>         options.setRunner(FlinkRunner.class);
>         Pipeline pipeline = Pipeline.create(options);
>         pipeline
>                 .apply("Read from kafka",
>                         KafkaIO.<String, String>read()
> //                                .withTimestampFn(kafkaData -> TimeUtil.timeMillisToInstant(kafkaData.getKey()))
>                                 .withBootstrapServers("localhost:9092")
>                                 .withTopic("recharge")
>                                 .withKeyDeserializer(StringDeserializer.class)
>                                 .withValueDeserializer(StringDeserializer.class)
>                                 .withoutMetadata()
>                 )
>                 .apply(Values.create())
>                 .apply(Window.<String>into(new GlobalWindows())
>                                 .triggering(Repeatedly.forever(
>                                         AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
>                                 .accumulatingFiredPanes()
>                 )
>                 .apply(Count.globally())
>                 .apply("output",
>                         ParDo.of(new DoFn<Long, Void>() {
>                             @ProcessElement
>                             public void process(ProcessContext context) {
>                                 System.out.println("---get at: " + Instant.now() + "------");
>                                 System.out.println(context.element());
>                             }
>                         }));
> {code}
> the result should be displayed after (5s) I sent first data, but sometimes there were
nothing display after I sent data. the pic shows the outputs i got in a test:
> (cant upload a pic, desc as text)
> {code:java}
> Send 681Msg at: 2018-01-05T06:34:31.436
> 	---get at: 2018-01-05T06:34:36.668Z------
> 	681
> Send 681Msg at: 2018-01-05T06:34:47.166
> 	---get at: 2018-01-05T06:34:52.284Z------
> 	1362
> Send 681Msg at: 2018-01-05T06:34:55.505
> Send 681Msg at: 2018-01-05T06:35:22.068
> 	---get at: 2018-01-05T06:35:22.112Z------
> 	2044
> {code}
> btw, the code works fine with direct runner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message