From user-return-19557-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Apr 24 12:02:35 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 74FB9180671 for ; Tue, 24 Apr 2018 12:02:34 +0200 (CEST) Received: (qmail 19203 invoked by uid 500); 24 Apr 2018 10:02:33 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 19193 invoked by uid 99); 24 Apr 2018 10:02:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Apr 2018 10:02:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 0EB301A0398 for ; Tue, 24 Apr 2018 10:02:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.648 X-Spam-Level: ** X-Spam-Status: No, score=2.648 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, KAM_INFOUSMEBIZ=0.75, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id QC1QNMbXD_Vl for ; Tue, 24 Apr 2018 10:02:30 +0000 (UTC) Received: from mail-lf0-f47.google.com (mail-lf0-f47.google.com [209.85.215.47]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 7CC395F238 for ; Tue, 24 Apr 2018 10:02:29 +0000 (UTC) Received: by mail-lf0-f47.google.com with SMTP id d20-v6so19384484lfe.3 for ; Tue, 24 Apr 2018 03:02:29 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=B8pulM2ak2UISoupef8HwBMomeqYiU0iFa+Qvd7g3YU=; b=TigfIwxhsD2IAMUxQZwqmSTnffg9WDNZftocZ/1Op99wujlPbE+7TGfaPedrUbrUCq 1gnp6GEi+IVUeQzYdsRp2WItScK2zy/kU0nOgK5gEfxBfH+UoeKMsaYFSlSYGP7jA4BF o6n1Iwi4CBBIkEdmIR/i8zwnpOoo+UVWEvy0kTkYpq3Gqz6biJ60wbZh/8DtcSE79Q+D sC1is9jeQkbUtyTaIRGCooMznzxFXKPg2PhDvWLnlXTOmPYfFRkvmUhvoGN7YwVRZ3T7 GHH5aeOEAeFQvknmGia9J77VaKRMXHrnP7ZaIyLDEcV1B4negGgJFaYebLdcu9IMeB2A 9heA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=B8pulM2ak2UISoupef8HwBMomeqYiU0iFa+Qvd7g3YU=; b=F7cUNbJGntfM9deKDiQGG0C6C4Nv/bY7SYxazpPSZ8MYPtKxEFKO7lUsHBhN9K6p9l JnsJCiSefUQsGZ9dvyslUSZIoRqxAeVgzDGQJUqSGYlUSTqoRDjfyvpri9XJ+20PbIB8 1psEjbhGCQmgn2nP+luUmYWu68GqYt7afJ1Zn/bNVuv0jSSQ3KX/lN+H+Y3zYunDcylc dQ4F+s1fYp+5fJ6VJmnY/VZTHjnDkJIY60DvJtimtLYdgpJCliA8itCon8xCj2SSJsC9 b82LGiZUENOjphG7iz6vbWcTVNYbW/RJtf3W2nUcSim+PTPuy/TYZ4Y7aDD5IWNR5Bkc mzGA== X-Gm-Message-State: ALQs6tByUY8w8dSeaGT8GPVWKX+uKggY00fIZw+DWmLt8WJdVi2fEWwy /RDimW6cv3J8WeksdRHssik5fkwULKhCZvcV3rQ= X-Google-Smtp-Source: AB8JxZqGOlsd/528z385C9kOdodfKldodC27URurQBPbNsDxMos4b6fwL1k5SmdjYd6DFIc6oDRQlWSV/tsV0AQqK9A= X-Received: by 2002:a19:7906:: with SMTP id u6-v6mr1212417lfc.34.1524564148217; Tue, 24 Apr 2018 03:02:28 -0700 (PDT) MIME-Version: 1.0 Received: by 10.46.136.199 with HTTP; Tue, 24 Apr 2018 03:01:47 -0700 (PDT) In-Reply-To: References: <96FC07A1-8610-4EE0-9CED-4C1F78BEA5CD@transpac.com> <0B28C895-EF1C-4D13-9BBA-B0DD89473CE9@transpac.com> From: Fabian Hueske Date: Tue, 24 Apr 2018 12:01:47 +0200 Message-ID: Subject: Re: data enrichment with SQL use case To: Alexander Smirnov Cc: Ken Krugler , user , miki haiat Content-Type: multipart/alternative; boundary="0000000000007dec03056a954044" --0000000000007dec03056a954044 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Alex, An operator that has to join two input streams obviously requires two inputs. In case of an enrichment join, the operator should first read the meta-data stream and build up a data structure as state against which the other input is joined. If the meta data is (infrequently) updated, these updates should be integrated into the state. The problem is that it is currently not possible to implement such an operator with Flink because operators cannot decide from which input to read, i.e., they have to process whatever data is given to them. Hence, it is not possible to build up a data structure from the meta data stream before consuming the other stream. There are a few workarounds that work in special cases. 1) The meta data is rather small and never updated. You put the meta data as a file into a (distributed) file system an read it from each function instance when it is initialized, i.e., in open(), and put into a hash map. Each function instance will hold the complete meta data in memory (on the heap). Since the meta data is broadcasted, the other stream does not need to be partitioned to join against the meta data in the hash map. You can implement this function as a FlatMapFunction or ProcessFunction. 2) The meta data is too large and/or is updated. In this case, you need a function with two inputs. Both inputs are keyed (keyBy()) on a join attribute. Since you cannot hold back the non-meta data stream, you need to buffer it in (keyed) state until you've read the meta data stream up to a point when you can start processing the other stream. If the meta data is updated at some point, you can just add the new data to the state. The benefits of this approach is that the state is shared across all operators and can be updated. However, you might need to initially buffer quite a bit of data in state if the non-meta data stream has a high volume. Hope that one of these approaches works for your use case. Best, Fabian 2018-04-23 13:29 GMT+02:00 Alexander Smirnov = : > Hi Fabian, > > please share the workarounds, that must be helpful for my case as well > > Thank you, > Alex > > On Mon, Apr 23, 2018 at 2:14 PM Fabian Hueske wrote: > >> Hi Miki, >> >> Sorry for the late response. >> There are basically two ways to implement an enrichment join as in your >> use case. >> >> 1) Keep the meta data in the database and implement a job that reads the >> stream from Kafka and queries the database in an ASyncIO operator for ev= ery >> stream record. This should be the easier implementation but it will send >> one query to the DB for each streamed record. >> 2) Replicate the meta data into Flink state and join the streamed record= s >> with the state. This solution is more complex because you need propagate >> updates of the meta data (if there are any) into the Flink state. At the >> moment, Flink lacks a few features to have a good implementation of this >> approach, but there a some workarounds that help in certain cases. >> >> Note that Flink's SQL support does not add advantages for the either of >> both approaches. You should use the DataStream API (and possible >> ProcessFunctions). >> >> I'd go for the first approach if one query per record is feasible. >> Let me know if you need to tackle the second approach and I can give som= e >> details on the workarounds I mentioned. >> >> Best, Fabian >> >> 2018-04-16 20:38 GMT+02:00 Ken Krugler : >> >>> Hi Miki, >>> >>> I haven=E2=80=99t tried mixing AsyncFunctions with SQL queries. >>> >>> Normally I=E2=80=99d create a regular DataStream workflow that first re= ads from >>> Kafka, then has an AsyncFunction to read from the SQL database. >>> >>> If there are often duplicate keys in the Kafka-based stream, you could >>> keyBy(key) before the AsyncFunction, and then cache the result of the S= QL >>> query. >>> >>> =E2=80=94 Ken >>> >>> On Apr 16, 2018, at 11:19 AM, miki haiat wrote: >>> >>> HI thanks for the reply i will try to break your reply to the flow >>> execution order . >>> >>> First data stream Will use AsyncIO and select the table , >>> Second stream will be kafka and the i can join the stream and map it ? >>> >>> If that the case then i will select the table only once on load ? >>> How can i make sure that my stream table is "fresh" . >>> >>> Im thinking to myself , is thire a way to use flink backend (ROKSDB) >>> and create read/write through >>> macanisem ? >>> >>> Thanks >>> >>> miki >>> >>> >>> >>> On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler < >>> kkrugler_lists@transpac.com> wrote: >>> >>>> If the SQL data is all (or mostly all) needed to join against the data >>>> from Kafka, then I might try a regular join. >>>> >>>> Otherwise it sounds like you want to use an AsyncFunction to do ad hoc >>>> queries (in parallel) against your SQL DB. >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream= / >>>> operators/asyncio.html >>>> >>>> =E2=80=94 Ken >>>> >>>> >>>> On Apr 15, 2018, at 12:15 PM, miki haiat wrote: >>>> >>>> Hi, >>>> >>>> I have a case of meta data enrichment and im wondering if my approach >>>> is the correct way . >>>> >>>> 1. input stream from kafka. >>>> 2. MD in msSQL . >>>> 3. map to new pojo >>>> >>>> I need to extract a key from the kafka stream and use it to select >>>> some values from the sql table . >>>> >>>> SO i thought to use the table SQL api in order to select the table M= D >>>> then convert the kafka stream to table and join the data by the strea= m >>>> key . >>>> >>>> At the end i need to map the joined data to a new POJO and send it to >>>> elesticserch . >>>> >>>> Any suggestions or different ways to solve this use case ? >>>> >>>> thanks, >>>> Miki >>>> >>>> >>>> >>>> >>>> -------------------------- >>>> Ken Krugler >>>> http://www.scaleunlimited.com >>>> custom big data solutions & training >>>> Hadoop, Cascading, Cassandra & Solr >>>> >>>> >>> >>> -------------------------------------------- >>> http://about.me/kkrugler >>> +1 530-210-6378 <(530)%20210-6378> >>> >>> >> --0000000000007dec03056a954044 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Alex,

An ope= rator that has to join two input streams obviously requires two inputs. In = case of an enrichment join, the operator should first read the meta-data st= ream and build up a data structure as state against which the other input i= s joined. If the meta data is (infrequently) updated, these updates should = be integrated into the state.

The problem is that it is c= urrently not possible to implement such an operator with Flink because oper= ators cannot decide from which input to read, i.e., they have to process wh= atever data is given to them.
Hence, it is not possible to bu= ild up a data structure from the meta data stream before consuming the othe= r stream.

There are a few workarounds that work in special cas= es.
1) The meta data is rather small and never updated. You put th= e meta data as a file into a (distributed) file system an read it from each= function instance when it is initialized, i.e., in open(), and put into a = hash map. Each function instance will hold the complete meta data in memory= (on the heap). Since the meta data is broadcasted, the other stream does n= ot need to be partitioned to join against the meta data in the hash map. Yo= u can implement this function as a FlatMapFunction or ProcessFunction.
<= /div>2) The meta data is too large and/or is updated. In this case, you nee= d a function with two inputs. Both inputs are keyed (keyBy()) on a join att= ribute. Since you cannot hold back the non-meta data stream, you need to bu= ffer it in (keyed) state until you've read the meta data stream up to a= point when you can start processing the other stream. If the meta data is = updated at some point, you can just add the new data to the state. The bene= fits of this approach is that the state is shared across all operators and = can be updated. However, you might need to initially buffer quite a bit of = data in state if the non-meta data stream has a high volume.

H= ope that one of these approaches works for your use case.

Best= , Fabian

2018-04-23 13:29 GMT+02:00 Alexander Smirnov <alexander.smirno= ff@gmail.com>:
Hi Fabian,

please share the workarounds, that must = be helpful for my case as well

Thank you,
Alex

On Mon, Apr 23, 2018 at 2:14 PM Fabian Huesk= e <fhueske@gmail.= com> wrote:
Hi Miki,

Sorry for the late res= ponse.
There are basically two ways to implement an enrichment joi= n as in your use case.

1) Keep the meta data in the database a= nd implement a job that reads the stream from Kafka and queries the databas= e in an ASyncIO operator for every stream record. This should be the easier= implementation but it will send one query to the DB for each streamed reco= rd.
2) Replicate the meta data into Flink state and join the strea= med records with the state. This solution is more complex because you need = propagate updates of the meta data (if there are any) into the Flink state.= At the moment, Flink lacks a few features to have a good implementation of= this approach, but there a some workarounds that help in certain cases.
Note that Flink's SQL support does not add advantages f= or the either of both approaches. You should use the DataStream API (and po= ssible ProcessFunctions).

I'd go for the first = approach if one query per record is feasible.
Let me know if = you need to tackle the second approach and I can give some details on the w= orkarounds I mentioned.

Best, Fabian

2018-04-16 20:38 GMT+0= 2:00 Ken Krugler <kkrugler_lists@transpac.com>:
Hi Miki= ,

I haven=E2=80=99t tried mixing AsyncFunctions with SQL= queries.

Normally I=E2=80=99d create a regular Da= taStream workflow that first reads from Kafka, then has an AsyncFunction to= read from the SQL database.

If there are often du= plicate keys in the Kafka-based stream, you could keyBy(key) before the Asy= ncFunction, and then cache the result of the SQL query.

=E2=80=94 Ken

On Apr 16, 20= 18, at 11:19 AM, miki haiat <miko5054@gmail.com> wrote:

HI thanks=C2=A0 for the reply=C2=A0 i will tr= y to break your reply to the flow execution order .

Firs= t data stream Will use AsyncIO and select the table ,
Second stream will= be kafka and the i can join the stream and map it ?

If that=C2=A0 =C2=A0the case=C2=A0 then i will select the table only once = on load ?
How can i make sure that my stream table is "fresh" = .

Im thinking to myself , is thire a way to use flink backend (ROKSD= B)=C2=A0 and create read/write through=C2=A0
m= acanisem=C2=A0?

Thanks=C2=A0

=
miki



On Mon, Apr 16, 2018 at 2:45 AM, Ken= Krugler <kkrugler_lists@transpac.com> wrote:
<= blockquote class=3D"gmail_quote" style=3D"margin:0 0 0 .8ex;border-left:1px= #ccc solid;padding-left:1ex">
If the SQ= L data is all (or mostly all) needed to join against the data from Kafka, t= hen I might try a regular join.

Otherwise it sounds like= you want to use an AsyncFunction to do ad hoc queries (in parallel) agains= t your SQL DB.


=E2=80=94 Ken


On Apr 15, 2018, at 12:15 PM, miki haiat <miko5054@gmail.com> wr= ote:

Hi,

I have a case of meta data enrichment and i= m wondering if my approach is the correct way .
  1. input str= eam from kafka.=C2=A0
  2. MD in msSQL .
  3. map to new pojo=C2= =A0
I need to extract=C2=A0 a key from the kafka stream=C2=A0= =C2=A0and use it to select some values from the sql table=C2=A0 .

<= /div>
SO i thought=C2=A0 to use=C2=A0 the table SQL api in order = to select the table MD=C2=A0
then convert the kafka stream to table and = join the data by=C2=A0 the stream key .

At the end i need= to map the joined data to a new POJO and send it to elesticserch .

Any suggestions or different ways to solve this use case = ?

thanks,
Miki=C2=A0=C2=A0



-------= -------------------
Ken Krugler
custom big data sol= utions & training
Hadoop, Cascading, Cassandra & Solr
=



<= div>--------------------------------------------



--0000000000007dec03056a954044--