From user-return-32926-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Feb 26 14:41:29 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4E85718065C for ; Wed, 26 Feb 2020 15:41:29 +0100 (CET) Received: (qmail 27708 invoked by uid 500); 26 Feb 2020 14:41:27 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 27698 invoked by uid 99); 26 Feb 2020 14:41:27 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Feb 2020 14:41:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 245D31A4186 for ; Wed, 26 Feb 2020 14:41:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.449 X-Spam-Level: X-Spam-Status: No, score=0.449 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.248, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=ververica-com.20150623.gappssmtp.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 1JjgosB4L88e for ; Wed, 26 Feb 2020 14:41:23 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::441; helo=mail-wr1-x441.google.com; envelope-from=arvid@data-artisans.com; receiver= Received: from mail-wr1-x441.google.com (mail-wr1-x441.google.com [IPv6:2a00:1450:4864:20::441]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 0B45A7DD39 for ; Wed, 26 Feb 2020 14:41:23 +0000 (UTC) Received: by mail-wr1-x441.google.com with SMTP id l5so3354645wrx.4 for ; Wed, 26 Feb 2020 06:41:23 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=ververica-com.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=bCFqfZXi0K65bHSBFkH+Lt/IdpT/q4WcEaPBVdNo8KM=; b=wuOy9u/R4imOHtPkOBmkiZdgSz0GM6KFDH2qXx/edz+sKrAMxhKDqm6aRUGJzQimIl Mv1U45tFmWC78UMyl5PbhZtma0rRHQPosdZEppuYTbifj6yDsHzDnldF+J9euavIetmy U4+A91T2ZyTLmFoC1FzbtcM1nnM2lSvhr0xBrPWWNOUm08u2rgPSndz3eREPqOvowMzq 0ZEeanIDLYTSqbwpqKrPW0uNXFjpegPC0Gghr5QLGLKy4WCDoaQhJLylVFKLriCLeITZ SZfuAldhmQZCYSBnN0gVekGT4OuZSA932hAOSnp+dnDaw/uv8cAxmChupQ3NXdf/noBg ivWw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=bCFqfZXi0K65bHSBFkH+Lt/IdpT/q4WcEaPBVdNo8KM=; b=MojCgVi59G2BUtUTuigRh2Jnfttnn3r1KQhjFFL29NpmxAY/K1RmmCTIj0fwYh5FpG d7KvUr1TMeCNsLfcmryLh76C10gLTMo+v8B1q0AwbQY1IW93pQfpEN6+v1XhRSf++qS9 TwJkuLJZq+sJcI77Nmvw7IWq4sTtFpCZsbOYqyg3nik88SxYsIb1Y7FlOPXbxQcAANYu MMwx9dgiU5kSkxYeZlcIdPiBudkKi2yWWmlSzEviA6k7A8JFkXZkax5QUyp8scZld+VC w0K549CiPA/V5qtkjXUR4K0Whvnf1fUutWABmhQeGh5qPPqxrbDo8gb9UCGepJuuGJ5d 3DZA== X-Gm-Message-State: APjAAAXjfB8scmjSYo4ywXBalW0ED6G8Tq4CuUOA4zITrjbSqEDf10uW 85uNx1E9qi8METehwHSApnQDYJmm5nPtLBNPm8UFWg== X-Google-Smtp-Source: APXvYqwFOl5Mt5n8MFQnSLqqo7Ec8lDTVU1Ulw4r80Kp9xfqSNqtqsWnKMxmxlChT5K6INzXMx15cEuEEghs4SFIXy4= X-Received: by 2002:adf:dfc2:: with SMTP id q2mr5725979wrn.209.1582728082603; Wed, 26 Feb 2020 06:41:22 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Arvid Heise Date: Wed, 26 Feb 2020 15:41:11 +0100 Message-ID: Subject: Re: async io parallelism To: Alexey Trenikhun Cc: "user@flink.apache.org" Content-Type: multipart/alternative; boundary="00000000000023dc42059f7b998f" --00000000000023dc42059f7b998f Content-Type: text/plain; charset="UTF-8" Hi Alexey, no there are as many instances as configured, but each would operate on the current key group range and maintain the order on that. So messages with the same key are never shuffled with ordered async. Messages with different key would be processed potentially independently and could change order. Best, Arvid On Mon, Feb 24, 2020 at 4:43 PM Alexey Trenikhun wrote: > Arvid, thank you. > So there is single instance of FIFO per async IO operator regardless of > parallelism of the async IO operator? > Thanks, > Alexey > > ------------------------------ > *From:* Arvid Heise > *Sent:* Saturday, February 22, 2020 1:23:01 PM > *To:* Alexey Trenikhun > *Cc:* user@flink.apache.org > *Subject:* Re: async io parallelism > > Hi Alexey, > > the short answer is: order is preserved in all cases. > > Basically, ordered asyncIO maintains an internal FIFO queue where all > pending elements reside. All async results are saved into this queue, but > elements will only be outputted when the head element has a result. > > So assume you have three input records i1, i2, i3 and get the outputs > asynchronously in the order o2, o1, o3 after 100 ms each, then there is no > output after receiving o2, then o1 and o2 are outputted after 200 ms, and > then o3 after 300 ms. > > Best, > > Arvid > > On Sat, Feb 22, 2020 at 2:22 AM Alexey Trenikhun wrote: > > Hello, > Let's say, my elements are simple key-value pairs, elements are coming > from Kafka, where they were partitioned by "key", then I do processing > using KeyedProcessFunction (keyed by same "key"), then I enrich elements > using ordered RichAsyncFunction, then output to another > KeyedProcessFunction (keyed by same "key") and then write to Kafka topic, > again partitioned by same "key", something like this: > > FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction) > -> AsyncDataStream.orderedWait() -> keyBy("key")->Output( > KeyedProcessFunction)->FlinkKafkaProducer > > Will it preserve order of events with same "key"? > > - Will Output function receive elements with same "key" in same order > as they were originally in Kafka? > - Will FlinkKafkaProducer writes elements with same "key" in same > order as they were originally in Kafka? > - Does it depend on parallelism of async IO? Documentation says "the > stream order is preserved", but if there are multiple parallel instances of > async function, does it mean order relative to each single instance? Or > total stream order? > > Thanks, > Alexey > > --00000000000023dc42059f7b998f Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Alexey,

no there are as m= any instances as configured, but each would operate on the current key grou= p range and maintain the order on that.
So messages with the same= key are never shuffled with ordered async. Messages with different key wou= ld be processed potentially independently and could change order.
=

Best,

Arvid
On Mon, F= eb 24, 2020 at 4:43 PM Alexey Trenikhun <yender@msn.com> wrote:
Arvid, thank you.
So there is single instance of FIFO per async = IO operator regardless of parallelism of the async IO operator?
Thanks,
Alexey


Fro= m: Arvid Heise <arvid@ververica.com>
Sent: Saturday, February 22, 2020 1:23:01 PM
To: Alexey Trenikhun <yender@msn.com>
Cc: user@= flink.apache.org <user@flink.apache.org>
Subject: Re: async io parallelism
=C2=A0
Hi Alexey,

the short answer is: order is preserved in all cases.

Basically, ordered asyncIO maintains an internal FIFO queue where all = pending elements reside. All async results are saved into this queue, but e= lements will only be outputted when the head element has a result.

So assume you have three input records i1, i2, i3 and get the outputs = asynchronously in the order o2, o1, o3 after 100 ms each, then there is no = output after receiving o2, then o1 and o2 are outputted after 200 ms, and t= hen o3 after 300 ms.

Best,

Arvid

On Sat, Feb 22, 2020 at 2:22 AM Alexey Trenikhun <yender@msn.com> wrote= :
Hello,
Let's say, my elements are simple key-value pairs, elements are coming = from Kafka, where they were partitioned by "key", then I do proce= ssing using=C2=A0KeyedProcessFunction (keyed by same "key"), then= I enrich elements using ordered=C2=A0RichAsyncFunction, then output to another KeyedProcessFunction (keyed by same "key")=C2=A0and then write to Kafka topic, = again partitioned by same "key", something like this:

FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction) ->=C2=A0AsyncDataStream.orderedWa= it() -> keyBy("key")->Output(Key= edProcessFunction)->FlinkKafkaProducer

Will it prese= rve order of events with same "key"?=C2=A0
  • Will=C2=A0<= span style=3D"font-family:Calibri,Helvetica,sans-serif">Output function rec= eive elements with same "key" in same order as they were originally=C2=A0in Kafka?
  • Will=C2=A0<= span style=3D"font-family:Calibri,Helvetica,sans-serif">FlinkKafkaProducer = writes elements with=C2=A0same "key"=C2=A0in same order as they = were originally in Kafka?
  • Does it depend on=C2=A0parallelism of async IO? Documentation s= ays "the stream order is preserved", but if there are multiple pa= rallel instances of async function, does it mean order relative to each single instance? Or total stream order?
= Thanks,
= Alexey
--00000000000023dc42059f7b998f--