flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nico Kruber <n...@data-artisans.com>
Subject Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state
Date Wed, 16 Aug 2017 08:31:42 GMT
Hi Chao,

1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me 
quote the javadoc of the CoProcessFunction:

"Contrary to the {@link CoFlatMapFunction}, this function can also query the 
time (both event and processing) and set timers, through the provided {@link 
Context}. When reacting to the firing of set timers the function can emit yet 
more elements."

So, imho, both deliver a different level of abstraction and control (high- vs. 
low-level). Also note the different methods available for you to implement.

2a) In general, Flink calls functions on a per-record base in a serialized 
fashion per task. For each task at a TaskManager, in case of it having 
multiple slots, separate function objects are used where you should only get 
in trouble if you share static references. Otherwise you do not need to worry 
about thread-safety.

2b) From what I see in the code (StreamTwoInputProcessor), the same should 
apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2 
and processElement1/2 are not called in parallel!

3) why would you want to store the CoProcessFunction.Context?


On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:
> Hi,
> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
> and to what extent? What's the difference between the two Functions? and
> in general, how does Flink prevent race conditions? Here's my case:
> I tried to condition on two input streams and produce the third stream
> if the condition is met. I implemented CoFlatMapFunction and tried to
> monitor a state using a field in the implemented class (I want to
> isolate my application from the checkpointing feature, and therefore I
> do not use the states as documented here
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state
> .html). The field served as a flag indicating whether there are some pending
> data from either input stream, and if yes, processing it along with the
> arriving data from the other input stream (the processing invokes a native
> function).
> But then I got double free error and segmentation fault, which I believe
> was due to unintentional concurrent access to the native function. Then
> I tried to wrap the access into a synchronized method, as well as
> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
> and the error remained.
> I considered using CoProcessFunction in my case, but seems to me that it
> does not handle customary internal states, stating in the javadoc "The
> context [CoProcessFunction.Context] is only valid during the invocation
> of this method, do not store it."
> Thanks,
> Chao

View raw message