flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Issue with sharing state in CoFlatMapFunction
Date Tue, 17 Nov 2015 11:59:49 GMT
Hi!

Can you give us a bit more context? For example share the structure of the
program (what stream get windowed and connected in what way)?

I would guess that the following is the problem:

When you connect one stream to another, then partition n of the first
stream connects with partition n of the other stream.
When you do a keyBy().window() then the system reshuffles the data, and the
records are in different partitions, meaning that they arrive in other
instances of the CoFlatMapFunction.

You can also call keyBy() before both inputs to make sure that the records
are properly routed...

Greetings,
Stephan



On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <vstoyak@yahoo.com> wrote:

> Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place
> it on the DataStream before window but fails if placed after window's
> “apply” function.
> I was testing two streams, main “Features” on flatMap1 constantly
> ingesting data and control stream “Model” on flatMap2 changing the model on
> request.
> I am able to set and see b0/b1 properly set in flatMap2, but flatMap1
> always see b0 and b1 as was set to 0 at the initialization.
> Am I missing something obvious here?
> Thanks a lot, Vladimir
>
> public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures>
{
>     private static final long serialVersionUID = 1L;
>
>     Double b0;
>     Double b1;
>
>     public applyModel(){
>         b0=0.0;
>         b1=0.0;
>     }
>
>     @Override
>     public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
>         System.out.print("Main: " + this + "\n");
>     }
>
>     @Override
>     public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
>         System.out.print("Old Model: " + this + "\n");
>         b0 = value.getB0();
>         b1 = value.getB1();
>         System.out.print("New Model: " + this + "\n");
>     }
>
>     @Override
>     public String toString(){
>         return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
>     }}
>
>

Mime
View raw message