flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vladimir Stoyak <vsto...@yahoo.com>
Subject Re: Issue with sharing state in CoFlatMapFunction
Date Tue, 17 Nov 2015 13:49:02 GMT
I know I can use broadcast, but was wondering if there is a better way


DataStream<Model> control_stream = env.addSource(new FlinkKafkaConsumer082<Model>(control_topic,
new AvroDeserializationSchema(Model.class), properties)).broadcast();




On Tuesday, November 17, 2015 2:45 PM, Vladimir Stoyak <vstoyak@yahoo.com> wrote:



Not that I necessarily need that for this particular example, but is there a Global State
available? 

IE, how can I make a state available across all parallel instances of an operator?



On Tuesday, November 17, 2015 1:49 PM, Vladimir Stoyak <vstoyak@yahoo.com> wrote:



Perfect! It does explain my problem.

Thanks a lot



On Tuesday, November 17, 2015 1:43 PM, Stephan Ewen <sewen@apache.org> wrote:



Is the CoFlatMapFunction intended to be executed in parallel?

If yes, you need some way to deterministically assign which record goes to which parallel
instance. In some way the CoFlatMapFunction does a parallel (partitions) join between the
model and the result of the session windows, so you need some form of key that selects which
partition the elements go to. Does that make sense?

If not, try to set it to parallelism 1 explicitly.

Greetings,
Stephan



On Tue, Nov 17, 2015 at 1:11 PM, Vladimir Stoyak <vstoyak@yahoo.com> wrote:

My model DataStream is not keyed and does not have any windows, only the main stream has windows
and apply function
>
>
>I have two Kafka Streams, one for events and one for model
>
>
>DataStream<Model> model_stream = env.addSource(new FlinkKafkaConsumer082<Model>(model_topic,
new AvroDeserializationSchema(Model.class), properties)); 
>DataStream<Raw> main_stream = env.addSource(new FlinkKafkaConsumer082<Raw>(raw_topic,
new AvroDeserializationSchema(Raw.class), properties));
>
>
>
>
>
>My topology looks like this:
>main_stream 
>.assignTimestamps(new myTimeExtractor()) 
>.keyBy("event_key") 
>.window(GlobalWindows.create()) 
>.trigger(new sessionTrigger(session_timeout)) 
>.apply(new AggFunction()) 
>.connect(model_stream) 
>.flatMap(new applyModel()) 
>.print();
>
>
> AggFunction is a simple aggregate function:
>Long start_ts=Long.MAX_VALUE; 
>        Long end_ts=Long.MIN_VALUE; 
>        Long dwell_time=0L,last_event_ts=0L; 
>        int size = Lists.newArrayList(values).size(); 
>
>
>        for (Raw value: values) { 
>            if(value.getTs() > end_ts) end_ts = value.getTs(); 
>            if (value.getTs() < start_ts) start_ts = value.getTs(); 
>
>
>            if(last_event_ts == 0L){ 
>                last_event_ts = value.getTs(); 
>            } else { 
>                dwell_time += value.getTs() - last_event_ts; 
>                last_event_ts = value.getTs(); 
>            } 
>        } 
>
>
>        out.collect(new Features(tuple.getField(0), tuple.getField(2), tuple.getField(1),
start_ts, end_ts, size, dwell_time, Boolean.FALSE)); 
>
>
>
>
>
>On Tuesday, November 17, 2015 12:59 PM, Stephan Ewen <sewen@apache.org> wrote:
> 
>
>
>Hi!
>
>
>Can you give us a bit more context? For example share the structure of the program (what
stream get windowed and connected in what way)?
>
>
>I would guess that the following is the problem:
>
>
>When you connect one stream to another, then partition n of the first stream connects
with partition n of the other stream.
>When you do a keyBy().window() then the system reshuffles the data, and the records are
in different partitions, meaning that they arrive in other instances of the CoFlatMapFunction.
>
>
>You can also call keyBy() before both inputs to make sure that the records are properly
routed...
>
>
>Greetings,
>Stephan
>
>
>
>
>
>
>On Tue, Nov 17, 2015 at 12:29 PM, Vladimir Stoyak <vstoyak@yahoo.com> wrote:
>
>Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream
before window but fails if placed after window's “apply” function.
>>I was testing two streams, main “Features” on flatMap1 constantly ingesting data
and control stream “Model” on flatMap2 changing the model on request.
>>I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0
and b1 as was set to 0 at the initialization.
>>Am I missing something obvious here?
>>Thanks a lot, Vladimir
>>publicstaticclassapplyModel implementsCoFlatMapFunction<Features,Model,EnrichedFeatures>{privatestaticfinallongserialVersionUID
=1L;Doubleb0;Doubleb1;publicapplyModel(){b0=0.0;b1=0.0;}@OverridepublicvoidflatMap1(Featuresvalue,Collector<EnrichedFeatures>out){System.out.print("Main:
"+this+"\n");}@OverridepublicvoidflatMap2(Modelvalue,Collector<EnrichedFeatures>out){System.out.print("Old
Model: "+this+"\n");b0 =value.getB0();b1 =value.getB1();System.out.print("New Model: "+this+"\n");}@OverridepublicStringtoString(){return"CoFlatMapFunction:
{b0: "+b0 +", b1: "+b1 +"}";}}
>
>
>

Mime
View raw message