Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6C443200BF7 for ; Mon, 9 Jan 2017 19:34:51 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6AE4C160B3E; Mon, 9 Jan 2017 18:34:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 69FEC160B2F for ; Mon, 9 Jan 2017 19:34:50 +0100 (CET) Received: (qmail 40897 invoked by uid 500); 9 Jan 2017 18:34:49 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 40887 invoked by uid 99); 9 Jan 2017 18:34:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Jan 2017 18:34:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id F35F6C15D6 for ; Mon, 9 Jan 2017 18:34:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.692 X-Spam-Level: *** X-Spam-Status: No, score=3.692 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id U9ucTaXhFVgb for ; Mon, 9 Jan 2017 18:34:47 +0000 (UTC) Received: from mail-wj0-f177.google.com (mail-wj0-f177.google.com [209.85.210.177]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 4B7175FDB4 for ; Mon, 9 Jan 2017 18:34:47 +0000 (UTC) Received: by mail-wj0-f177.google.com with SMTP id i20so69694855wjn.2 for ; Mon, 09 Jan 2017 10:34:47 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=VW9ETgAKTszpLqXORCXR6uin6TDYfo1edD/CBoq7iXc=; b=VggnVz+HQQoB/5dHoffxAtl76iXrP4KgV1zSKLRCXzmicuGAOzpmJWMLkivlVnKJvM UgQDpYWia8uLFizxW8hJKJb0k4q1RUQVz4e0fWuDvb7Ph7aIzEJsKYqC2q7qRJe3Ewt7 B64XaQJRMTxIMHAHDYxs0j8Rmc4VvpfglbaCQ35ai9qvRdVdCapHvqdZfz29QwFBRwSh GeV6uQnmMKpWvsxHpbmcWf78qELg1MSW8jranuxkUPtIl0nne3RzK7hVxx7cvw4wugf3 ZdhCdEvciemXjbWKFOCQKg09qHv9/LKjt1cttem/XOY2o2obapAhJDfQJQnYXoFcA7pT US1g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=VW9ETgAKTszpLqXORCXR6uin6TDYfo1edD/CBoq7iXc=; b=ruLE/aS51y8B1+7zUC+mtfBuHv4kUw6RmvM56xBOgZb2pUgXq5JFV4heQSobw9cX82 kxlFYJ0H4wQIEPxvzq5BfU/fer8TSxQ00Z2IeRVErSgs3zy7HShUo4wC8TTUQ9XYpYwe svuvDoJQyv+tSiSKCn/00et4sP9Lvlt0yHuRSsHDjJ+TVxE3eEfLSa1VFmaUWODpQbdj zjnxHsVRZZ483FgzXyHVEsXxfWbhyMTjbJrslm8fUaXMziHZL3MPICvFT/whDsRUZGrC RixB0PCcWnT4odpsjk1YxJ3ZslWo7DctneRHDgLxZR1IPDu9+YFib4SEBvdjxZXHn9Eu at3w== X-Gm-Message-State: AIkVDXKPQfRKFe74MyfxXPwKknWM6AdVwpoAKKMOlptYzKCxEdCMpVE4olxGOXhzXbnkmF0Kf9m4NJ8CP4eNHA== X-Received: by 10.194.141.145 with SMTP id ro17mr66891448wjb.14.1483986886741; Mon, 09 Jan 2017 10:34:46 -0800 (PST) MIME-Version: 1.0 Received: by 10.194.203.168 with HTTP; Mon, 9 Jan 2017 10:34:26 -0800 (PST) In-Reply-To: References: <1483905976681-10912.post@n4.nabble.com> From: Igor Berman Date: Mon, 9 Jan 2017 20:34:26 +0200 Message-ID: Subject: Re: Joining two kafka streams To: user@flink.apache.org Content-Type: multipart/alternative; boundary=089e0122985a3c57160545ad9ff6 archived-at: Mon, 09 Jan 2017 18:34:51 -0000 --089e0122985a3c57160545ad9ff6 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Tzu-Li, Huge thanks for the input, I'll try to implement prototype of your idea and see if it answers my requirements On 9 January 2017 at 08:02, Tzu-Li (Gordon) Tai wrote= : > Hi Igor! > > What you can actually do is let a single FlinkKafkaConsumer consume from > both topics, producing a single DataStream which you can keyBy afterwards= . > All versions of the FlinkKafkaConsumer support consuming multiple Kafka > topics simultaneously. This is logically the same as union and then a > keyBy, like what you described. > > Note that this approach requires that the records in both of your Kafka > topics are of the same type when consumed into Flink (ex., same POJO > classes, or simply both as Strings, etc.). > If that isn=E2=80=99t possible and you have different data types / schema= s for the > topics, you=E2=80=99d probably need to use =E2=80=9Cconnect=E2=80=9D and = then a keyBy. > > If you=E2=80=99re applying a window directly after joining the two topic = streams, > you could also use a window join: > > dataStream.join(otherStream) > .where().equalTo() > .window(TumblingEventTimeWindows.of(Time.seconds(3))) > .apply (new JoinFunction () {...}); > > The =E2=80=9Cwhere=E2=80=9D specifies how to select the key from the firs= t stream, and > =E2=80=9CequalTo=E2=80=9D the second one. > > Hope this helps, let me know if you have other questions! > > Cheers, > Gordon > > On January 9, 2017 at 4:06:34 AM, igor.berman (igor.berman@gmail.com) > wrote: > > Hi, > I have usecase when I need to join two kafka topics together by some > fields. > In general, I could put content of one topic into another, and partition > by > same key, but I can't touch those two topics(i.e. there are other > consumers > from those topics), on the other hand it's essential to process same keys > at > same "thread" to achieve locality and not to get races when working with > same key from different machines/threads > > my idea is to use union of two streams and then key by the field, > but is there better approach to achieve "locality"? > > any inputs will be appreciated > Igor > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Joining-two- > kafka-streams-tp10912.html > Sent from the Apache Flink User Mailing List archive. mailing list archiv= e > at Nabble.com. > > --089e0122985a3c57160545ad9ff6 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Tzu-Li,
Huge thanks for the input, I'll try to = implement prototype of your idea and see if it answers my requirements


On 9 January 2017 at 08:02, Tzu-Li (Gordon) Tai <= tzulitai@apache.or= g> wrote:
Hi Igor!

What you can actuall= y do is let a single FlinkKafkaConsumer consume from both topics, producing= a single DataStream which you can keyBy afterwards.
All versions = of the FlinkKafkaConsumer support consuming multiple Kafka topics simultane= ously. This is logically the same as union and then a keyBy, like what you = described.

Note that this approach requires that the= records in both of your Kafka topics are of the same type when consumed in= to Flink (ex., same POJO classes, or simply both as Strings, etc.).

If you=E2=80=99re applying a w= indow directly after joining the two topic streams, you could also use a wi= ndow join:
dataStr=
eam.join(otherStream)
    .where(<=
;key selector>).equalTo(<key selector=
>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunctio=
n () {...});
<= /div>
The =E2=80=9Cwhere=E2=80=9D specifies how to select the key from t= he first stream, and =E2=80=9CequalTo=E2=80=9D the second one.
Hope this helps, let me know if you have other questions!<= /div>

Cheers,
Gordon

On January 9, 2017 at 4:06:34 AM, igor.berman (igor.berman@gmail.co= m) wrote:

Hi,
I have usecase when I need to join two kafka topics together by some fi= elds. =20
In general, I could put content of one topic into another, and partitio= n by
same key, but I can't touch those two topics(i.e. there are other c= onsumers
from those topics), on the other hand it's essential to process sam= e keys at
same "thread" to achieve locality and not to get races when w= orking with
same key from different machines/threads

my idea is to use union of two streams and then key by the field,
but is there better approach to achieve "locality"?

any inputs will be appreciated
Igor



--
View this message in context: http://apache-flink-user-mailing-list-archive.233= 6050.n4.nabble.com/Joining-two-kafka-streams-tp10912.html
Sent from the Apache Flink User Mailing List archive. mailing list arch= ive at Nabble.com.

--089e0122985a3c57160545ad9ff6--