From user-return-22132-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Aug 15 08:55:20 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B4AA9180626 for ; Wed, 15 Aug 2018 08:55:19 +0200 (CEST) Received: (qmail 32732 invoked by uid 500); 15 Aug 2018 06:55:18 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 32722 invoked by uid 99); 15 Aug 2018 06:55:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Aug 2018 06:55:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 05A3E18040D for ; Wed, 15 Aug 2018 06:55:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.202 X-Spam-Level: **** X-Spam-Status: No, score=4.202 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id V3ZVGXTI8I9G for ; Wed, 15 Aug 2018 06:55:16 +0000 (UTC) Received: from mail-pg1-f180.google.com (mail-pg1-f180.google.com [209.85.215.180]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 3C82B5F396 for ; Wed, 15 Aug 2018 06:55:16 +0000 (UTC) Received: by mail-pg1-f180.google.com with SMTP id h12-v6so163363pgs.3 for ; Tue, 14 Aug 2018 23:55:16 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=80q+xPKq/IDGiiznVGZWFtQ7sAS1Y23ITLPK2o1R8pg=; b=cI/bsqsbNDouJkZadQFZxnhZUz2vgW+ykitHUNBzUmGzz00NsxEOzJbKKZ1FUpJaNc t3NtapTTxcARTodLPZHoVb7IYsqLiUXepkyk/QNfjheHxNL0b3qmpFk5q+pIXmKY9ZNV Cru+1Lt8tQJPJ+kJTy0PDAjm3HMnhLpIlQFo/Uc46s12RS7X6xG7iRyNPMIzVnPJyeFe 6DMpxWkuEP5JbmcWszQZYzGC40uIfESJ9sIxJuaxLkCttKtzsXVYzrNWXQvOS1EGAmTj nkljtnwFX9IZhyWw0aad8/kIy/3wIrD8qv/mWkql8KHtWBdLfkZY9b2UmlXG6c/yK8M1 wjow== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=80q+xPKq/IDGiiznVGZWFtQ7sAS1Y23ITLPK2o1R8pg=; b=gf6SJyzIp8znUoQAL2GSUdR9Bv8gKpVZdIWf7JJKInniGKWkRymHeHlwmNrzNaEpRs 5/dqeTE2bmes1kMBVammpUXbjgWRcl9edY6LMk33ng7LRm4TJH98SmH26YiNL1vN2vHF nWOu0CMi1C9NSBOVokwKuhndIL3YJrCFFgSBMMu6K3TZWRDlYTy//rvKXi7dwUvnmf34 X4oU0HL/cg6ze69+YWWPjS0YGAYZRhAob1yKwdxqU/4YjAvoJV6MG2XD8rN/3cciukwb qNbHK/GDpJr8AKHFDmlM4B3vLUjkal9qQK17lzcSa8JBWkTi07m4aVRt2raXAENHFwUo r5aQ== X-Gm-Message-State: AOUpUlH9bH5U1ejjt610ie9+TeHGNHkK6hZ15kYFMJZjkWs0C60I6msX SPfLm1YIEQBKMWmRv3IrxFM= X-Google-Smtp-Source: AA+uWPzFP3aSAQZEo8+pBwvRMhx1+7sTNF2sPYKhCxS30ZSrYyyQtbLbFgyPtOuxXEPKLhZQ4L4bhQ== X-Received: by 2002:a63:8dca:: with SMTP id z193-v6mr24190965pgd.228.1534316115380; Tue, 14 Aug 2018 23:55:15 -0700 (PDT) Received: from [127.0.0.1] (65.49.206.167.16clouds.com. [65.49.206.167]) by smtp.gmail.com with ESMTPSA id d19-v6sm36981389pgi.50.2018.08.14.23.55.10 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Tue, 14 Aug 2018 23:55:14 -0700 (PDT) From: Xingcan Cui Message-Id: Content-Type: multipart/alternative; boundary="Apple-Mail=_06C709D0-DDEF-44DC-9C99-70F4DE897A8B" Mime-Version: 1.0 (Mac OS X Mail 11.5 \(3445.9.1\)) Subject: Re: CoFlatMapFunction with more than two input streams Date: Wed, 15 Aug 2018 14:55:05 +0800 In-Reply-To: Cc: Averell , user To: vino yang References: <1534312267415-0.post@n4.nabble.com> X-Mailer: Apple Mail (2.3445.9.1) --Apple-Mail=_06C709D0-DDEF-44DC-9C99-70F4DE897A8B Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi Averell, I am also in favor of option 2. Besides, you could use CoProcessFunction = instead of CoFlatMapFunction and try to wrap elements of stream_A and = stream_B using the `Either` class. Best, Xingcan > On Aug 15, 2018, at 2:24 PM, vino yang wrote: >=20 > Hi Averell, >=20 > As far as these two solutions are concerned, I think you can only = choose option 2, because as you have stated, the current Flink = DataStream API does not support the replacement of one of the input = stream types of CoFlatMapFunction. Another choice: >=20 > 1. Split it into two separate jobs. But in comparison, I still think = that Option 2 is better. > 2. Since you said that stream_c is slower and has fewer updates, if it = is not very large, you can store it in the RDBMS and then join it with = stream_a and stream_b respectively (using CoFlatMapFunction as well). >=20 > I think you should give priority to your option 2. >=20 > Thanks, vino. >=20 > Averell > = =E4=BA=8E2018=E5=B9=B48=E6=9C=8815=E6=97=A5=E5=91=A8=E4=B8=89 = =E4=B8=8B=E5=8D=881:51=E5=86=99=E9=81=93=EF=BC=9A > Hi, >=20 > I have stream_A of type "Dog", which needs to be transformed using = data from > stream_C of type "Name_Mapping". As stream_C is a slow one (data is = not > being updated frequently), to do the transformation I connect two = streams, > do a keyBy, and then use a RichCoFlatMapFunction in which mapping data = from > stream_C is saved into a State (flatMap1 generates 1 output, while = flatMap2 > is just to update State table, not generating any output). >=20 > Now I have another stream B of type "Cat", which also needs to be > transformed using data from stream_C. After that transformation, > transformed_B will go through a completely different pipeline from > transformed A.=20 >=20 > I can see two approaches for this: > 1. duplicate stream_C and the RichCoFlatMapFunction and apply on = stream_B > 2. create a new stream D of type "Animal", transform it with C, then = split > the result into two streams using split/select using case class = pattern > matching. >=20 > My question is which option should I choose? > With option 1, at least I need to maintain two State tables, let alone = the > cost for duplicating stream (I am not sure how expensive this is in = term of > resource), and the requirement on duplicating the CoFlatMapFunction = (*). > With option 2, there's additional cost coming from unioning, > splitting/selecting, and type-casting at the final streams.=20 > Is there any better option for me? >=20 > Thank you very much for your support. > Regards, > Averell >=20 > (*) I am using Scala, and I tried to create a RichCoFlatMapFunction of = type > [Animal, Name_Mapping] but it cannot be used for a stream of [Dog, > Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the > Function as well. >=20 >=20 >=20 > -- > Sent from: = http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ = --Apple-Mail=_06C709D0-DDEF-44DC-9C99-70F4DE897A8B Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi = Averell,

I am also = in favor of option 2. Besides, you could use CoProcessFunction instead = of CoFlatMapFunction and try to wrap elements of stream_A and stream_B = using the `Either` class.

Best,
Xingcan

On Aug = 15, 2018, at 2:24 PM, vino yang <yanghua1127@gmail.com> wrote:

Hi Averell,

As far as these two solutions are concerned, = I think you can only choose option 2, because as you have stated, the = current Flink DataStream API does not support the replacement of one of = the input stream types of CoFlatMapFunction. Another choice:

1. Split it into two = separate jobs. But in comparison, I still think that Option 2 is = better.
2. Since you said that stream_c is slower = and has fewer updates, if it is not very large, you can store it in the = RDBMS and then join it with stream_a and stream_b respectively (using = CoFlatMapFunction as well).

I think you should give priority to = your option 2.

Thanks, vino.

Averell = <lvhuyen@gmail.com> =E4=BA=8E2018=E5=B9=B48=E6=9C=8815=E6= =97=A5=E5=91=A8=E4=B8=89 =E4=B8=8B=E5=8D=881:51=E5=86=99=E9=81=93=EF=BC=9A=
Hi,

I have stream_A of type "Dog", which needs to be transformed using data = from
stream_C of type "Name_Mapping". As stream_C is a slow one (data is = not
being updated frequently), to do the transformation I connect two = streams,
do a keyBy, and then use a RichCoFlatMapFunction in which mapping data = from
stream_C is saved into a State (flatMap1 generates 1 output, while = flatMap2
is just to update State table, not generating any output).

Now I have another stream B of type "Cat", which also needs to be
transformed using data from stream_C. After that transformation,
transformed_B will go through a completely different pipeline from
transformed A.

I can see two approaches for this:
1. duplicate stream_C and the RichCoFlatMapFunction and apply on = stream_B
2. create a new stream D of type "Animal", transform it with C, then = split
the result into two streams using split/select using case class = pattern
matching.

My question is which option should I choose?
With option 1, at least I need to maintain two State tables, let alone = the
cost for duplicating stream (I am not sure how expensive this is in term = of
resource), and the requirement on duplicating the CoFlatMapFunction = (*).
With option 2, there's additional cost coming from unioning,
splitting/selecting, and type-casting at the final streams.
Is there any better option for me?

Thank you very much for your support.
Regards,
Averell

(*) I am using Scala, and I tried to create a RichCoFlatMapFunction of = type
[Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
Function as well.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble= .com/

= --Apple-Mail=_06C709D0-DDEF-44DC-9C99-70F4DE897A8B--