Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0026818D1A for ; Fri, 21 Aug 2015 12:57:03 +0000 (UTC) Received: (qmail 35004 invoked by uid 500); 21 Aug 2015 12:57:03 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 34931 invoked by uid 500); 21 Aug 2015 12:57:03 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 34921 invoked by uid 99); 21 Aug 2015 12:57:03 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 12:57:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 14ABE1AA98E for ; Fri, 21 Aug 2015 12:57:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.15 X-Spam-Level: *** X-Spam-Status: No, score=3.15 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=3, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id jsYicFlbX2Ai for ; Fri, 21 Aug 2015 12:56:53 +0000 (UTC) Received: from mail-io0-f170.google.com (mail-io0-f170.google.com [209.85.223.170]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id D063C20F6D for ; Fri, 21 Aug 2015 12:56:52 +0000 (UTC) Received: by iodt126 with SMTP id t126so80522988iod.2 for ; Fri, 21 Aug 2015 05:56:46 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=2GuERPReeOjJTvgmf9wVrLUuhzjfQS4vRdUJuC/iSeU=; b=bJWlzMZEA3MqQEZBXiTdmdCGZjLRGO9OJDVzD9STCd628AFT+N2fgfdW6NEXRPbsLQ 0rN0TBFPr5NgARPJKYJ2uy+R0KE22qKi/PdS7S6alB1mHWI21DKkczQYVflEwzumpspT rduD0s0aYk3Lc3i/WcTATurW/PwlWhWtTyeMkYu9IVm3TodfHsqfT9VzLTzGhRRtG1dO FE+JSBFYYQZQ5ge9kCkub2vCXOrhnCFCB19FP+ZQkrXs1x4YA7fbOZwY2tcLp3jztAk2 TTMJbr3b1rqK3Wb5aLWm8rxzOrqFTLOT3F94dPtwxHfEf8cXzmhwVWBAahflTXh0TB94 nFOQ== MIME-Version: 1.0 X-Received: by 10.107.133.213 with SMTP id p82mr9042690ioi.71.1440161806397; Fri, 21 Aug 2015 05:56:46 -0700 (PDT) Received: by 10.107.143.19 with HTTP; Fri, 21 Aug 2015 05:56:46 -0700 (PDT) In-Reply-To: References: Date: Fri, 21 Aug 2015 19:56:46 +0700 Message-ID: Subject: Re: Keep Model in Operator instance up to date From: Welly Tambunan To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113ece14e3b6c9051dd1cca1 --001a113ece14e3b6c9051dd1cca1 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Gyula, Thanks a lot. That's really help a lot ! Have a great vacation Cheers On Fri, Aug 21, 2015 at 7:47 PM, Gyula F=C3=B3ra wro= te: > Hi > > You are right, if all operators need continuous updates than the most > straightforward way is to push all the updates to the operators like you > described. > > If the cached data is the same for all operators and is small enough you > can centralize the updates in a dedicated operator and push the updated > data to the operators every once in a while. > > Cheers > Gyula > > > > On Thu, Aug 20, 2015 at 4:31 PM Welly Tambunan wrote: > >> Hi Gyula, >> >> I have a couple of operator on the pipeline. Filter, mapper, flatMap, an= d >> each of these operator contains some cache data. >> >> So i think that means for every other operator on the pipeline, i will >> need to add a new stream to update each cache data. >> >> >> Cheers >> >> On Thu, Aug 20, 2015 at 5:33 PM, Gyula F=C3=B3ra = wrote: >> >>> Hi, >>> >>> I don't think I fully understand your question, could you please try to >>> be a little more specific? >>> >>> I assume by caching you mean that you keep the current model as an >>> operator state. Why would you need to add new streams in this case? >>> >>> I might be slow to answer as I am currently on vacation without stable >>> internet connection. >>> >>> Cheers, >>> Gyula >>> >>> On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan >>> wrote: >>> >>>> Hi Gyula, >>>> >>>> I have another question. So if i cache something on the operator, to >>>> keep it up to date, i will always need to add and connect another str= eam >>>> of changes to the operator ? >>>> >>>> Is this right for every case ? >>>> >>>> Cheers >>>> >>>> On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan >>>> wrote: >>>> >>>>> Hi Gyula, >>>>> >>>>> That's really helpful. The docs is improving so much since the last >>>>> time (0.9). >>>>> >>>>> Thanks a lot ! >>>>> >>>>> Cheers >>>>> >>>>> On Wed, Aug 19, 2015 at 3:07 PM, Gyula F=C3=B3ra >>>>> wrote: >>>>> >>>>>> Hey, >>>>>> >>>>>> If it is always better to check the events against a more up-to-date >>>>>> model (even if the events we are checking arrived before the update)= then >>>>>> it is fine to keep the model outside of the system. >>>>>> >>>>>> In this case we need to make sure that we can push the updates to th= e >>>>>> external system consistently. If you are using the PersistentKafkaSo= urce >>>>>> for instance it can happen that some messages are replayed in case o= f >>>>>> failure. In this case you need to make sure that you remove duplicat= e >>>>>> updates or have idempotent updates. >>>>>> >>>>>> You can read about the checkpoint mechanism in the Flink website: >>>>>> https://ci.apache.org/projects/flink/flink-docs-master/internals/str= eam_checkpointing.html >>>>>> >>>>>> Cheers, >>>>>> Gyula >>>>>> >>>>>> On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan >>>>>> wrote: >>>>>> >>>>>>> Thanks Gyula, >>>>>>> >>>>>>> Another question i have.. >>>>>>> >>>>>>> > ... while external model updates would be *tricky *to keep >>>>>>> consistent. >>>>>>> Is that still the case if the Operator treat the external model as >>>>>>> read-only ? We create another stream that will update the external = model >>>>>>> separately. >>>>>>> >>>>>>> Could you please elaborate more about this one ? >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> On Wed, Aug 19, 2015 at 2:52 PM, Gyula F=C3=B3ra >>>>>>> wrote: >>>>>>> >>>>>>>> In that case I would apply a map to wrap in some common type, like >>>>>>>> a n Either before the union. >>>>>>>> >>>>>>>> And then in the coflatmap you can unwrap it. >>>>>>>> On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Gyula, >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> >>>>>>>>> However update1 and update2 have a different type. Based on my >>>>>>>>> understanding, i don't think we can use union. How can we handle = this one ? >>>>>>>>> >>>>>>>>> We like to create our event strongly type to get the domain >>>>>>>>> language captured. >>>>>>>>> >>>>>>>>> >>>>>>>>> Cheers >>>>>>>>> >>>>>>>>> On Wed, Aug 19, 2015 at 2:37 PM, Gyula F=C3=B3ra >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hey, >>>>>>>>>> >>>>>>>>>> One input of your co-flatmap would be model updates and the othe= r >>>>>>>>>> input would be events to check against the model if I understand= correctly. >>>>>>>>>> >>>>>>>>>> This means that if your model updates come from more than one >>>>>>>>>> stream you need to union them into a single stream before connec= ting them >>>>>>>>>> with the event stream and applying the coatmap. >>>>>>>>>> >>>>>>>>>> DataStream updates1 =3D .... >>>>>>>>>> DataStream updates2 =3D .... >>>>>>>>>> DataStream events =3D .... >>>>>>>>>> >>>>>>>>>> events.connect(updates1.union(updates2).broadcast()).flatMap(...= ) >>>>>>>>>> >>>>>>>>>> Does this answer your question? >>>>>>>>>> >>>>>>>>>> Gyula >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wednesday, August 19, 2015, Welly Tambunan >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Gyula, >>>>>>>>>>> >>>>>>>>>>> Thanks for your response. >>>>>>>>>>> >>>>>>>>>>> However the model can received multiple event for update. How >>>>>>>>>>> can we do that with co-flatmap as i can see the connect API onl= y received >>>>>>>>>>> single datastream ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> > ... while external model updates would be tricky to keep >>>>>>>>>>> consistent. >>>>>>>>>>> Is that still the case if the Operator treat the external model >>>>>>>>>>> as read-only ? We create another stream that will update the ex= ternal model >>>>>>>>>>> separately. >>>>>>>>>>> >>>>>>>>>>> Cheers >>>>>>>>>>> >>>>>>>>>>> On Wed, Aug 19, 2015 at 2:05 PM, Gyula F=C3=B3ra >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey! >>>>>>>>>>>> >>>>>>>>>>>> I think it is safe to say that the best approach in this case >>>>>>>>>>>> is creating a co-flatmap that will receive updates on one inpu= t. The events >>>>>>>>>>>> should probably be broadcasted in this case so you can check i= n parallel. >>>>>>>>>>>> >>>>>>>>>>>> This approach can be used effectively with Flink's checkpoint >>>>>>>>>>>> mechanism, while external model updates would be tricky to kee= p consistent. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Gyula >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan < >>>>>>>>>>>> if05041@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi All, >>>>>>>>>>>>> >>>>>>>>>>>>> We have a streaming computation that required to validate the >>>>>>>>>>>>> data stream against the model provided by the user. >>>>>>>>>>>>> >>>>>>>>>>>>> Right now what I have done is to load the model into flink >>>>>>>>>>>>> operator and then validate against it. However the model can = be updated and >>>>>>>>>>>>> changed frequently. Fortunately we always publish this event = to RabbitMQ. >>>>>>>>>>>>> >>>>>>>>>>>>> I think we can >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 1. Create RabbitMq listener for model changed event from >>>>>>>>>>>>> inside the operator, then update the model if event arrive= d. >>>>>>>>>>>>> >>>>>>>>>>>>> But i think this will create race condition if not handle >>>>>>>>>>>>> correctly and it seems odd to keep this >>>>>>>>>>>>> >>>>>>>>>>>>> 2. We can move the model into external in external memory >>>>>>>>>>>>> cache storage and keep the model up to date using flink. S= o the operator >>>>>>>>>>>>> will retrieve that from memory cache >>>>>>>>>>>>> >>>>>>>>>>>>> 3. Create two stream and using co operator for managing >>>>>>>>>>>>> the shared state. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> What is your suggestion on keeping the state up to date from >>>>>>>>>>>>> external event ? Is there some kind of best practice for main= taining model >>>>>>>>>>>>> up to date on streaming operator ? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks a lot >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Welly Tambunan >>>>>>>>>>>>> Triplelands >>>>>>>>>>>>> >>>>>>>>>>>>> http://weltam.wordpress.com >>>>>>>>>>>>> http://www.triplelands.com >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Welly Tambunan >>>>>>>>>>> Triplelands >>>>>>>>>>> >>>>>>>>>>> http://weltam.wordpress.com >>>>>>>>>>> http://www.triplelands.com >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Welly Tambunan >>>>>>>>> Triplelands >>>>>>>>> >>>>>>>>> http://weltam.wordpress.com >>>>>>>>> http://www.triplelands.com >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Welly Tambunan >>>>>>> Triplelands >>>>>>> >>>>>>> http://weltam.wordpress.com >>>>>>> http://www.triplelands.com >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Welly Tambunan >>>>> Triplelands >>>>> >>>>> http://weltam.wordpress.com >>>>> http://www.triplelands.com >>>>> >>>> >>>> >>>> >>>> -- >>>> Welly Tambunan >>>> Triplelands >>>> >>>> http://weltam.wordpress.com >>>> http://www.triplelands.com >>>> >>> >> >> >> -- >> Welly Tambunan >> Triplelands >> >> http://weltam.wordpress.com >> http://www.triplelands.com >> > --=20 Welly Tambunan Triplelands http://weltam.wordpress.com http://www.triplelands.com --001a113ece14e3b6c9051dd1cca1 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Gyula,=C2=A0

Thanks a lot. That'= s really help a lot !=C2=A0

Have a great vacation<= /div>

Cheers

<= div class=3D"gmail_quote">On Fri, Aug 21, 2015 at 7:47 PM, Gyula F=C3=B3ra = <gyula.fora@gmail.com> wrote:
Hi

You are right, if all operators need continuous updates t= han the most straightforward way is to push all the updates to the operator= s like you described.

If the cached data is the same for all operato= rs and is small enough you can centralize the updates in a dedicated operat= or and push the updated data to the operators every once in a while.
Cheers
Gyula



On Thu, Aug 20, 2015 at 4:31 PM Welly Tambunan <<= a href=3D"mailto:if05041@gmail.com" target=3D"_blank">if05041@gmail.com= > wrote:
Hi Gyu= la,=C2=A0

I have a couple of operator on the pipeline. F= ilter, mapper, flatMap, and each of these operator contains some cache data= .=C2=A0

So i think that means for every other oper= ator on the pipeline, i will need to add a new stream to update each cache = data.=C2=A0


Cheers

On Thu, Aug 20, 2015 a= t 5:33 PM, Gyula F=C3=B3ra <gyula.fora@gmail.com> wrote:<= br>
Hi,

I don't think I fully unde= rstand your question, could you please try to be a little more specific?
I assume by caching you mean that you keep the current model as an op= erator state. Why would you need to add new streams in this case?

I = might be slow to answer as I am currently on vacation without stable intern= et connection.

Cheers,
Gyula

On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan <if05041@gmail.com&g= t; wrote:
Hi Gyula= ,=C2=A0

I have another question. So if i cache something= on the operator, to keep it up to date, =C2=A0i will always need to add an= d connect another stream of changes to the operator ?

<= div>Is this right for every case ?=C2=A0

Cheers

On Wed, = Aug 19, 2015 at 3:21 PM, Welly Tambunan <if05041@gmail.com> = wrote:
Hi Gyula,=C2=A0
That's really helpful. The docs is improving so much = since the last time (0.9).=C2=A0

Thanks a lot !

Cheers

On Wed, Aug 19, 2015 at 3:07 PM, Gyula F= =C3=B3ra <gyula.fora@gmail.com> wrote:
Hey,

If it is always better to check the events agai= nst a more up-to-date model (even if the events we are checking arrived bef= ore the update) then it is fine to keep the model outside of the system.
In this case we need to make sure that we can push the updates to the= external system consistently. If you are using the PersistentKafkaSource f= or instance it can happen that some messages are replayed in case of failur= e. In this case you need to make sure that you remove duplicate updates or = have idempotent updates.

You can read about the checkpoint mechanism= in the Flink website: https:= //ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpoin= ting.html

Cheers,
Gyula

On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan <if05041@gmail.com>= ; wrote:
Thanks Gy= ula,=C2=A0

Another question i have..=C2=A0

>=C2=A0... while extern= al model updates would be tricky to keep consistent.=C2=A0
Is tha= t still the case if the Operator treat the external model as read-only ? We= create another stream that will update the external model separately.=C2= =A0

Could you please elaborate more about this one ?=C2=A0

Cheers

On Wed, Aug 19,= 2015 at 2:52 PM, Gyula F=C3=B3ra <gyula.fora@gmail.com> = wrote:
In that case I would apply a map t= o wrap in some common type, like a n Either<t1,t2> before the union.<= br>
And then in the coflatmap you can unwrap it.
On Wed, Aug 19, 2015 at 9:50 AM Welly Ta= mbunan <if05041@g= mail.com> wrote:
Hi Gyula,=C2=A0

Thanks.

H= owever update1 and update2 have a different type. Based on my understanding= , i don't think we can use union. How can we handle this one ?=C2=A0

We like to create our event strongly type to get the= domain language captured.=C2=A0


Ch= eers

O= n Wed, Aug 19, 2015 at 2:37 PM, Gyula F=C3=B3ra <gyula.fora@gmail.com> wrote:
Hey,

On= e input of your co-flatmap would be model updates and the other input would= be events to check against the model if I understand correctly.
=
This means that if your model updates come from more than on= e stream you need to union them into a single stream before connecting them= with the event stream and applying the coatmap.

D= ataStream updates1 =3D ....
DataStream updates2=C2=A0=3D ....
DataStream events=C2=A0=3D ....

events.c= onnect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula
=


On Wednesday, August= 19, 2015, Welly Tambunan <
if05041@gmail.com> wrote:
Hi Gyula,=C2=A0

Thanks for your res= ponse.=C2=A0

However the model can received multip= le event for update. How can we do that with co-flatmap as i can see the co= nnect API only received single datastream ?


>=C2=A0... while e= xternal model updates would be tricky to keep consistent.=C2=A0
Is that still the case if the Operator treat th= e external model as read-only ? We create another stream that will update t= he external model separately.=C2=A0

Cheers

On Wed, Aug 19, 2015 at 2:0= 5 PM, Gyula F=C3=B3ra <gyfora@apache.org> wrote:
Hey!

I think it i= s safe to say that the best approach in this case is creating a co-flatmap = that will receive updates on one input. The events should probably be broad= casted in this case so you can check in parallel.

This approach can = be used effectively with Flink's checkpoint mechanism, while external m= odel updates would be tricky to keep consistent.

Cheers,
Gyula



=



--
=



--



--
=



--
=



--
=



--
=


--
=
Welly Tambunan
Triplelands=C2=A0

<= a href=3D"http://weltam.wordpress.com" target=3D"_blank">http://weltam.word= press.com
--001a113ece14e3b6c9051dd1cca1--