Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 17F19200CEC for ; Mon, 21 Aug 2017 16:35:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 16DE51653DE; Mon, 21 Aug 2017 14:35:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5BD021653D9 for ; Mon, 21 Aug 2017 16:35:50 +0200 (CEST) Received: (qmail 33850 invoked by uid 500); 21 Aug 2017 14:35:44 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 33840 invoked by uid 99); 21 Aug 2017 14:35:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Aug 2017 14:35:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 037A1C3A13 for ; Mon, 21 Aug 2017 14:35:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.828 X-Spam-Level: X-Spam-Status: No, score=-1.828 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.8, SPF_SOFTFAIL=0.972] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 9kB_hzYPxoWE for ; Mon, 21 Aug 2017 14:35:43 +0000 (UTC) Received: from mail-wr0-f175.google.com (mail-wr0-f175.google.com [209.85.128.175]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 2A98561131 for ; Mon, 21 Aug 2017 14:35:42 +0000 (UTC) Received: by mail-wr0-f175.google.com with SMTP id f8so64807419wrf.3 for ; Mon, 21 Aug 2017 07:35:42 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:to:cc:subject:date:message-id:organization:in-reply-to :references:mime-version; bh=ALbK4L29HP0KFsJOsND7O4vYKzHyv6Uk4rAvcmkxd90=; b=ZFhSFA2A0+2Y8ckFUCjYJOknFKYTGicNnNnCx54C0+8NE2Kf+Kk32kAK06lf8tytrN dk2Gr8Wh5k+H9s8y5y88v7RM8s1Ow7pt/ltiYwYBVbil1b8I6KF+/pfA+jxZ4ts+tUOw k+5bYi0ZimQ2XIGix9wk4TPzT+MfMHSVHYfpxEd/f+BP7qunblMbuChhNQrk4igVzcRR ezShVGG4k9md5xHTMYKQcCPJdDg1EyOXghKnMELttn3wU/YOuEbdsIZn723yuBPglnit aPZY3vBXAYNc6kME1MpY2Pb5GKLQwhFPF46WCU+GxWXnQJZMasKRXskaxyOUGvY511tP eohg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:organization :in-reply-to:references:mime-version; bh=ALbK4L29HP0KFsJOsND7O4vYKzHyv6Uk4rAvcmkxd90=; b=CzIYkjXpEq/1QnGXwbqyHjk4Ij0zn8HQWpNEzTgdgtIzGRft8OK3iVfy2955nGPIMu 0ZJU6ABq/QyaAYQeWF33CnFchgp3blYzfZ+cKp6JRszdbCbnutL0Od0RKHDAGRSZ1rKV xwcMOKFoTcBObiI+58zI4QdaMBYfG3aYrqPE4n8nH6MpA1nNMPhEyN8oh+g9g5R6jfFT 58mNlWM9MhztGB8APnmLxX++b72ugdYFgCuMtmcmmS3Nmp1mvkrBPqJlKLZA47DxzsrP brMBgP8X4r6Y8EI9j4p9M7h6acEezfLZiHMFbWPI3mVCWLA3WN/mBlj6ekJCbt1RkZV8 BRbA== X-Gm-Message-State: AHYfb5hNTLGep2x0kmbptlXQQz29T1kjWO5O0iGW852ir94gSbT7hkSl XEh/GiEjLAD30yJqWj8= X-Received: by 10.223.179.11 with SMTP id j11mr859900wrd.235.1503326140867; Mon, 21 Aug 2017 07:35:40 -0700 (PDT) Received: from nico-work.localnet (dslb-084-059-071-252.084.059.pools.vodafone-ip.de. [84.59.71.252]) by smtp.gmail.com with ESMTPSA id 3sm5886994wmi.46.2017.08.21.07.35.39 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Mon, 21 Aug 2017 07:35:40 -0700 (PDT) From: Nico Kruber To: Chao Wang Cc: user@flink.apache.org Subject: Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state Date: Mon, 21 Aug 2017 16:35:38 +0200 Message-ID: <1771535.rGALQnymKj@nico-work> Organization: data Artisans In-Reply-To: <38baca6e-5ba9-1e29-0ae5-07dd04c85332@wustl.edu> References: <16518078.snpQ2NUW06@nico-work> <38baca6e-5ba9-1e29-0ae5-07dd04c85332@wustl.edu> MIME-Version: 1.0 Content-Type: multipart/signed; boundary="nextPart5509606.eGDR5tQT6E"; micalg="pgp-sha1"; protocol="application/pgp-signature" archived-at: Mon, 21 Aug 2017 14:35:51 -0000 --nextPart5509606.eGDR5tQT6E Content-Transfer-Encoding: 7Bit Content-Type: text/plain; charset="us-ascii" Hi Chao, what I meant by "per-record base" was actually supposed to be "per-event base" (event = one entity of whatever the stream contains). As from the API: processing is supposed to be one event at a time and this is what is performed internally, too. Nico On Thursday, 17 August 2017 05:06:07 CEST Chao Wang wrote: > Thank you! Nico. That helps me a lot! > > 2a) That really clarifies my understanding about Flink. Yes, I think I > have used static references, since I invoked a native function > (implemented through JNI) which I believe only has one instance per > process. And I guess the reason why those Java synchronization > mechanisms were in vain is because of separate function objects at > runtime, which results in separate lock objects. Now I use c++ mutex > within the native function and it resolves my case. > > BTW, could you elaborate a bit more about what do you mean by > "per-record base"? what do you mean by a record? > > 3) I do not intend to store the CoProcessFunction.Context. I was just > wondering that since the document said it is only valid during the > invocation, for maintaining custom states of my program logic I guess I > cannot use it. > > > Thank you, > Chao > > On 08/16/2017 03:31 AM, Nico Kruber wrote: > > Hi Chao, > > > > 1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me > > quote the javadoc of the CoProcessFunction: > > > > "Contrary to the {@link CoFlatMapFunction}, this function can also query > > the time (both event and processing) and set timers, through the provided > > {@link Context}. When reacting to the firing of set timers the function > > can emit yet more elements." > > > > So, imho, both deliver a different level of abstraction and control (high- > > vs. low-level). Also note the different methods available for you to > > implement. > > > > 2a) In general, Flink calls functions on a per-record base in a serialized > > fashion per task. For each task at a TaskManager, in case of it having > > multiple slots, separate function objects are used where you should only > > get in trouble if you share static references. Otherwise you do not need > > to worry about thread-safety. > > > > 2b) From what I see in the code (StreamTwoInputProcessor), the same should > > apply to CoFlatMapFunction and CoProcessFunction so that calls to > > flatMap1/2 and processElement1/2 are not called in parallel! > > > > 3) why would you want to store the CoProcessFunction.Context? > > > > > > Nico > > > > On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote: > >> Hi, > >> > >> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe, > >> and to what extent? What's the difference between the two Functions? and > >> in general, how does Flink prevent race conditions? Here's my case: > >> > >> I tried to condition on two input streams and produce the third stream > >> if the condition is met. I implemented CoFlatMapFunction and tried to > >> monitor a state using a field in the implemented class (I want to > >> isolate my application from the checkpointing feature, and therefore I > >> do not use the states as documented here > >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st > >> ate .html). The field served as a flag indicating whether there are some > >> pending data from either input stream, and if yes, processing it along > >> with the arriving data from the other input stream (the processing > >> invokes a native function). > >> > >> But then I got double free error and segmentation fault, which I believe > >> was due to unintentional concurrent access to the native function. Then > >> I tried to wrap the access into a synchronized method, as well as > >> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain > >> and the error remained. > >> > >> I considered using CoProcessFunction in my case, but seems to me that it > >> does not handle customary internal states, stating in the javadoc "The > >> context [CoProcessFunction.Context] is only valid during the invocation > >> of this method, do not store it." > >> > >> > >> > >> Thanks, > >> Chao --nextPart5509606.eGDR5tQT6E Content-Type: application/pgp-signature; name="signature.asc" Content-Description: This is a digitally signed message part. Content-Transfer-Encoding: 7Bit -----BEGIN PGP SIGNATURE----- iF0EABECAB0WIQTIh4KsbsNd3l7wd+cg8nJL2uqeWQUCWZrvugAKCRAg8nJL2uqe WfvLAJ0YoJWNx4a6CyI3bd/cVp0VUv7L2ACfWxqvk0QjyF33qtiBWepamPILCXQ= =nFCD -----END PGP SIGNATURE----- --nextPart5509606.eGDR5tQT6E--