From user-return-34884-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue May 12 08:04:30 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 41518180634 for ; Tue, 12 May 2020 10:04:30 +0200 (CEST) Received: (qmail 5360 invoked by uid 500); 12 May 2020 08:04:28 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 5349 invoked by uid 99); 12 May 2020 08:04:28 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 May 2020 08:04:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id A05E4C262B for ; Tue, 12 May 2020 08:04:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.251 X-Spam-Level: X-Spam-Status: No, score=0.251 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=0.2, KAM_SHORT=0.001, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id CEB19LhoDsXS for ; Tue, 12 May 2020 08:04:25 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::144; helo=mail-lf1-x144.google.com; envelope-from=jingsonglee0@gmail.com; receiver= Received: from mail-lf1-x144.google.com (mail-lf1-x144.google.com [IPv6:2a00:1450:4864:20::144]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id BC23A7DC62 for ; Tue, 12 May 2020 08:04:24 +0000 (UTC) Received: by mail-lf1-x144.google.com with SMTP id v5so5670957lfp.13 for ; Tue, 12 May 2020 01:04:24 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=FitRqCVQtCQA8ed1Tq8YkLtk6yDrZBkAbo1UUBUYWI4=; b=jiLnS5II2MV75rzhEB4qitv5W04sQQgRyxtrFR0mQ894ob7cjtBqWc7IXPdASL+vWI PYhmWYLfvHMvkuFE9GUq9xeugHmnFE1smNEBHgJQWItKmC8fSt0RhF3vUzYLvVMsmgJo d/fHvXsq61Ifid2Avy+x7N13RbzDhB9ONA1OM5ZbfB/0utRx2cgvpxt1Z5QRiPxLbl9q cksrV+S5U0ks1V+q3GEzMRnV1lWIo/6yFfSH25mLAKymQkmmblj/QLtLqOnjjtD8J9V6 RWhn14hBiBc916tOUJqkL0OMWzrZ3d9XGBLn+PTD58PQqhg4owaqOL4ZofphStib5bZP N6AA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=FitRqCVQtCQA8ed1Tq8YkLtk6yDrZBkAbo1UUBUYWI4=; b=K7ZrQFz/X7YOMQecFqxaF2+GsOpzXEAU5EZ0EDcAoBMq7oIa2g+IjCR6dfGT3CsRU4 VwpLxLfpXqHcMwYHezXfQ4J0V1CwOiFIHwQsHUKrrxmIySwmHSc/2gl8N6xS++xGJj1u QRiZfqn+3vLiU6+9Ikx5cMgj2gVC18IdY4JiCMjcTQbofDMSod6Qzr16Rt9YzDDuluAR Bf4FBW3Gvwdx0FgCdJF3tquKGXnCkpHbb99tzs8sKQ5P0lW5IDJeu0cXUjjFNuRNSnnI GciYP3QmhS1GqUG0Tv0NNrlsiKLPRBcOrqt4b22R3HjXYWB2l0+5IUf9EJUQ2jQZG2M3 SSCQ== X-Gm-Message-State: AOAM532A3HzTuRt8dwIjnU64NRjbxV4crpKGhBMfW6cK88N7QHbMz8Ld V2r/4S7eHCV5anO6mxTyxbixnwvdMC9JZHmWGnc= X-Google-Smtp-Source: ABdhPJzNFVtSGGNAEKzNkhpvGkfJlvnExLtLDEfWeuK1VgGSscH+rih8v6IdvKZeYIlGXB6TYnRdRf7Y9hkHILpAVFo= X-Received: by 2002:ac2:414c:: with SMTP id c12mr13574613lfi.47.1589270664066; Tue, 12 May 2020 01:04:24 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Jingsong Li Date: Tue, 12 May 2020 16:04:12 +0800 Message-ID: Subject: Re: Not able to implement an usecase To: Khachatryan Roman Cc: Jaswin Shah , "user@flink.apache.org" , Rui Li Content-Type: multipart/alternative; boundary="000000000000623bd405a56ee996" --000000000000623bd405a56ee996 Content-Type: text/plain; charset="UTF-8" Thanks Roman for involving me. Hi Jaswin, FLIP-115[1] will finish Kafka -> Hive/Filesystem. And will be released in 1.11. We will provide two connectors in table: - file system connector, this connector manage partitions and files by file system paths. You can define a file system table with parquet/orc format, this should be consistent with hive exclude hive metastore support. - hive connector, this connector manage partitions and files by hive metastore, support automatic adding partition to hive metastore. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table Best, Jingsong Lee On Tue, May 12, 2020 at 3:52 PM Khachatryan Roman < khachatryan.roman@gmail.com> wrote: > AFAIK, yes, you can write streams. > > I'm pulling in Jingsong Li and Rui Li as they might know better. > > Regards, > Roman > > > On Mon, May 11, 2020 at 10:21 PM Jaswin Shah > wrote: > >> If I go with table apis, can I write the streams to hive or it is only >> for batch processing as of now. >> >> Get Outlook for Android >> >> ------------------------------ >> *From:* Khachatryan Roman >> *Sent:* Tuesday, May 12, 2020 1:49:10 AM >> *To:* Jaswin Shah >> *Cc:* user@flink.apache.org >> *Subject:* Re: Not able to implement an usecase >> >> Hi Jaswin, >> >> Currently, DataStream API doesn't support outer joins. >> As a workaround, you can use coGroup function [1]. >> >> Hive is also not supported by DataStream API though it's supported by >> Table API [2]. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html >> >> Regards, >> Roman >> >> >> On Mon, May 11, 2020 at 6:03 PM Jaswin Shah >> wrote: >> >> Hi, >> I want to implement the below use case in my application: >> I am doing an interval join between two data streams and then, in process >> function catching up the discrepant results on joining. Joining is done on >> key orderId. Now, I want to identify all the messages in both datastreams >> which are not joined. Means, for a message in left stream if I do not >> find any message in right stream over the interval defined, then, that >> message should be caught and same for right stream if there are messages >> which do not have corresponding messages in left streams then, catch >> them.Need an help how can I achieve the use case. I know this can be >> done with outer join but interval join or tumbling event time window joins >> only support inner join as per my knowledge. I do not want to use table/sql >> api here but want to work on this datastream apis only. >> >> Currently I am using this which is working for 90 % of the cases but 10 % >> of the cases where large large delay can happen and messages in left or >> right streams are missing are not getting supported with my this >> implementaions: >> >> /** >> * Join cart and pg streams on mid and orderId, and the interval specified. >> * >> * @param leftStream >> * @param rightStream >> * @return >> */ >> public SingleOutputStreamOperator intervalJoinCartAndPGStreams(DataStream leftStream, DataStream rightStream, ParameterTool parameter) { >> //Descripant results are sent to kafka from CartPGProcessFunction. >> return leftStream >> .keyBy(new CartJoinColumnsSelector()) >> .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector())) >> .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND)))) >> .process(new CartPGProcessFunction()); >> >> } >> >> >> >> Secondly, I am unable to find the streaming support to stream out the >> datastreams I am reading from kafka to hive which I want to batch process >> with Flink >> >> Please help me on resolving this use cases. >> >> Thanks, >> Jaswin >> >> >> Get Outlook for Android >> >> -- Best, Jingsong Lee --000000000000623bd405a56ee996 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Thanks Roman for involving me.

Hi Jaswi= n,

FLIP-115[1] will finish Kafka -> Hive/Filesyst= em. And will be released in 1.11.

We will provide = two connectors in table:
- file system connector, this connector = manage=C2=A0partitions and files by file system paths. You can define a fil= e system table with parquet/orc format, this should be consistent with hive= exclude hive metastore support.
- hive connector,=C2=A0this conn= ector manage=C2=A0partitions and files by hive metastore, support automatic= adding partition to hive metastore.


Best,
Jingsong Lee

On Tue, May 12, 2020 at 3:52 PM Khachatryan R= oman <khachatryan.roman@g= mail.com> wrote:
AFAIK, yes, you can write streams.

I&= #39;m pulling in Jingsong Li and=C2=A0Rui Li as they might know better.=C2= =A0

Regards,
Roman


On Mon, May 11, 2020 at 10:21 PM Ja= swin Shah <= jaswin.shah@outlook.com> wrote:
If I go with table apis, can I write the streams to hive or it is only for = batch processing as of now.


<= b>From: Khachatryan Roman <khachatryan.roman@gmail.com>
Sent: Tuesday, May 12, 2020 1:49:10 AM
To: Jaswin Shah <jaswin.shah@outlook.com>
Cc: user@= flink.apache.org <user@flink.apache.org>
Subject: Re: Not able to implement an usecase
=C2=A0
Hi=C2=A0Jaswin,

Currently, DataStream API doesn't support outer joins.=C2=A0
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported= by Table API [2].


Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <jaswin.shah@outlook.com<= /a>> wrote:
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process f= unction catching up the discrepant results on joining. Joining is done on k= ey orderId. Now, I want to identify all the messages in both datastreams wh= ich are not joined. Means, for a=C2=A0message in left stream if I do not find any message in right stream over the inter= val defined, then, that message should be caught and same for right stream = if there are messages which do not have corresponding messages in left stre= ams then, catch them.Need=C2=A0an help how can I achieve the use case. I know this can be done with outer jo= in but interval join or tumbling event time window joins only support inner= join as per my knowledge. I do not want to use table/sql api here but want= to work on this datastream apis=C2=A0only.

Currently I am using this which is working for 90 % of the cases but 10 % o= f the cases where large large delay can happen and messages in left or righ= t streams are missing are not getting supported with my this implementaions= :
/**
* Join cart and pg streams on mid and orderId, and the = interval specified.
*
* @param leftStream
* @param righ= tStream
= * @return=
*/public SingleOutputSt= reamOperator<ResultMessage> in= tervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream<= span style=3D"color:rgb(204,120,50)">, DataStream<PGMessage> r= ightStream, ParameterTool para= meter) {
//Descripant results= are sent to kafka from CartPGProcessFunction.
retur= n leftStream
.keyBy(new CartJoinColumnsSelector())
.intervalJoin(rightStream= .keyBy(new PGJoinColumnsSelect= or()))
.between(Time.milliseco= nds(Long.parseLong(paramete= r.get(Constants.IN= TERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND<= /span>))))
.process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the datas= treams I am reading from kafka to hive which I want to batch process with F= link

Please help me on resolving this use cases.

Thanks,
Jaswin



--
Best, Jingsong Lee
--000000000000623bd405a56ee996--