flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chao Wang <chaow...@wustl.edu>
Subject Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state
Date Thu, 17 Aug 2017 03:06:07 GMT
Thank you! Nico. That helps me a lot!

2a) That really clarifies my understanding about Flink. Yes, I think I 
have used static references, since I invoked a native function 
(implemented through JNI) which I believe only has one instance per 
process. And I guess the reason why those Java synchronization 
mechanisms were in vain is because of separate function objects at 
runtime, which results in separate lock objects. Now I use c++ mutex 
within the native function and it resolves my case.

BTW, could you elaborate a bit more about what do you mean by 
"per-record base"? what do you mean by a record?

3) I do not intend to store the CoProcessFunction.Context. I was just 
wondering that since the document said it is only valid during the 
invocation, for maintaining custom states of my program logic I guess I 
cannot use it.

Thank you,

On 08/16/2017 03:31 AM, Nico Kruber wrote:
> Hi Chao,
> 1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
> quote the javadoc of the CoProcessFunction:
> "Contrary to the {@link CoFlatMapFunction}, this function can also query the
> time (both event and processing) and set timers, through the provided {@link
> Context}. When reacting to the firing of set timers the function can emit yet
> more elements."
> So, imho, both deliver a different level of abstraction and control (high- vs.
> low-level). Also note the different methods available for you to implement.
> 2a) In general, Flink calls functions on a per-record base in a serialized
> fashion per task. For each task at a TaskManager, in case of it having
> multiple slots, separate function objects are used where you should only get
> in trouble if you share static references. Otherwise you do not need to worry
> about thread-safety.
> 2b) From what I see in the code (StreamTwoInputProcessor), the same should
> apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2
> and processElement1/2 are not called in parallel!
> 3) why would you want to store the CoProcessFunction.Context?
> Nico
> On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:
>> Hi,
>> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
>> and to what extent? What's the difference between the two Functions? and
>> in general, how does Flink prevent race conditions? Here's my case:
>> I tried to condition on two input streams and produce the third stream
>> if the condition is met. I implemented CoFlatMapFunction and tried to
>> monitor a state using a field in the implemented class (I want to
>> isolate my application from the checkpointing feature, and therefore I
>> do not use the states as documented here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state
>> .html). The field served as a flag indicating whether there are some pending
>> data from either input stream, and if yes, processing it along with the
>> arriving data from the other input stream (the processing invokes a native
>> function).
>> But then I got double free error and segmentation fault, which I believe
>> was due to unintentional concurrent access to the native function. Then
>> I tried to wrap the access into a synchronized method, as well as
>> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
>> and the error remained.
>> I considered using CoProcessFunction in my case, but seems to me that it
>> does not handle customary internal states, stating in the javadoc "The
>> context [CoProcessFunction.Context] is only valid during the invocation
>> of this method, do not store it."
>> Thanks,
>> Chao

View raw message