flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nico Kruber <n...@data-artisans.com>
Subject Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state
Date Mon, 21 Aug 2017 14:35:38 GMT
Hi Chao,
what I meant by "per-record base" was actually supposed to be "per-event base" 
(event = one entity of whatever the stream contains). As from the API: 
processing is supposed to be one event at a time and this is what is performed 
internally, too.

Nico

On Thursday, 17 August 2017 05:06:07 CEST Chao Wang wrote:
> Thank you! Nico. That helps me a lot!
> 
> 2a) That really clarifies my understanding about Flink. Yes, I think I
> have used static references, since I invoked a native function
> (implemented through JNI) which I believe only has one instance per
> process. And I guess the reason why those Java synchronization
> mechanisms were in vain is because of separate function objects at
> runtime, which results in separate lock objects. Now I use c++ mutex
> within the native function and it resolves my case.
> 
> BTW, could you elaborate a bit more about what do you mean by
> "per-record base"? what do you mean by a record?
> 
> 3) I do not intend to store the CoProcessFunction.Context. I was just
> wondering that since the document said it is only valid during the
> invocation, for maintaining custom states of my program logic I guess I
> cannot use it.
> 
> 
> Thank you,
> Chao
> 
> On 08/16/2017 03:31 AM, Nico Kruber wrote:
> > Hi Chao,
> > 
> > 1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
> > quote the javadoc of the CoProcessFunction:
> > 
> > "Contrary to the {@link CoFlatMapFunction}, this function can also query
> > the time (both event and processing) and set timers, through the provided
> > {@link Context}. When reacting to the firing of set timers the function
> > can emit yet more elements."
> > 
> > So, imho, both deliver a different level of abstraction and control (high-
> > vs. low-level). Also note the different methods available for you to
> > implement.
> > 
> > 2a) In general, Flink calls functions on a per-record base in a serialized
> > fashion per task. For each task at a TaskManager, in case of it having
> > multiple slots, separate function objects are used where you should only
> > get in trouble if you share static references. Otherwise you do not need
> > to worry about thread-safety.
> > 
> > 2b) From what I see in the code (StreamTwoInputProcessor), the same should
> > apply to CoFlatMapFunction and CoProcessFunction so that calls to
> > flatMap1/2 and processElement1/2 are not called in parallel!
> > 
> > 3) why would you want to store the CoProcessFunction.Context?
> > 
> > 
> > Nico
> > 
> > On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:
> >> Hi,
> >> 
> >> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
> >> and to what extent? What's the difference between the two Functions? and
> >> in general, how does Flink prevent race conditions? Here's my case:
> >> 
> >> I tried to condition on two input streams and produce the third stream
> >> if the condition is met. I implemented CoFlatMapFunction and tried to
> >> monitor a state using a field in the implemented class (I want to
> >> isolate my application from the checkpointing feature, and therefore I
> >> do not use the states as documented here
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st
> >> ate .html). The field served as a flag indicating whether there are some
> >> pending data from either input stream, and if yes, processing it along
> >> with the arriving data from the other input stream (the processing
> >> invokes a native function).
> >> 
> >> But then I got double free error and segmentation fault, which I believe
> >> was due to unintentional concurrent access to the native function. Then
> >> I tried to wrap the access into a synchronized method, as well as
> >> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
> >> and the error remained.
> >> 
> >> I considered using CoProcessFunction in my case, but seems to me that it
> >> does not handle customary internal states, stating in the javadoc "The
> >> context [CoProcessFunction.Context] is only valid during the invocation
> >> of this method, do not store it."
> >> 
> >> 
> >> 
> >> Thanks,
> >> Chao


Mime
View raw message