From user-return-4034-archive-asf-public=cust-asf.ponee.io@beam.apache.org Mon Oct 1 23:18:21 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id EA0F018077A for ; Mon, 1 Oct 2018 23:18:20 +0200 (CEST) Received: (qmail 75767 invoked by uid 500); 1 Oct 2018 21:18:19 -0000 Mailing-List: contact user-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@beam.apache.org Delivered-To: mailing list user@beam.apache.org Received: (qmail 75756 invoked by uid 99); 1 Oct 2018 21:18:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Oct 2018 21:18:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 6EEDDC5DFA for ; Mon, 1 Oct 2018 21:18:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -12.863 X-Spam-Level: X-Spam-Status: No, score=-12.863 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, ENV_AND_HDR_SPF_MATCH=-0.5, HTML_IMAGE_ONLY_28=0.726, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=0.001, RCVD_IN_MSPIKE_WL=0.001, SPF_PASS=-0.001, T_REMOTE_IMAGE=0.01, USER_IN_DEF_DKIM_WL=-7.5, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=google.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Jud3Pw2NqWKW for ; Mon, 1 Oct 2018 21:18:17 +0000 (UTC) Received: from mail-ot1-f52.google.com (mail-ot1-f52.google.com [209.85.210.52]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 589A05F24A for ; Mon, 1 Oct 2018 21:18:17 +0000 (UTC) Received: by mail-ot1-f52.google.com with SMTP id o13-v6so14678789otl.4 for ; Mon, 01 Oct 2018 14:18:17 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=MBgCghqEMrY2kI7YPYIdVRTdKoEpA+uRPCoeofDBXnw=; b=sZ7sFPRUziDzzv/mLhnKJE0vDsX1kEytYfSugbYXycYr7UAVJ0rCLZiHNXGCxqEQVe A/nsmDwQj5Fk+MjmQ3chhlhiQ0jyO/nFEXUm7cq5uOg36ov6P8BbU2lp3ei8/I5GFnx9 HNe8CMTq+37xHuCSP/TlQGZNy+TTOltt1IyL83RdUDNLoyY+sGHnBQ9nSeydXpJG2xzl Fp4i8v3u3ADH8WqNIovf8tmfZxH0jEOJxGPeN/uekpVN86+PzCP3B/b0PPuxLbgCcL9d Am993sOzd91wJ91WZ/earr4cgaVDcLmtXpVlCaiMzDg3cuSXBD3yxMZb4VlG+Lk51H8r PB2Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=MBgCghqEMrY2kI7YPYIdVRTdKoEpA+uRPCoeofDBXnw=; b=kNlIGNtr0GyvO3QdHEhK9AxW3tfVrCtD8Ct/UPQnaC9rufyOI770eS8lyffZv2qf1U iCRK3DMYZZvzQ0w7p6YuxLa/RCZlcxzYaFJmVPvEEZKnfONCPYMApB3mZrxh0cDFoU3E ZrGC87nSG6iQKrsukdKT7BWxqMPnW9mlrpnxvlK4ULR9GYJ23RF/fm227K/f8YJ1E46a oT0c5xPO5udqfq+W6DN5xcpR7PMGOh9vMI1EOLvCrBC5BxX6CtsHMBlEYz/ylqnmAhdH TjbVW2u2DR4cvLO6mUizM7TWC2IOEBVqXx4VmRpkktGKq3dowPgGb8JHFQroqoMEeZs6 PLxA== X-Gm-Message-State: ABuFfogYWHhFjzavzOUvXc9pnq1rS88sgZ/6WVm3VygAS05b0YpPJUFs wcHaEGHZ9GwO8sIhAe1BbQZFAaBumz/P+/ot9CsBLkI4 X-Google-Smtp-Source: ACcGV61VVuXM73ytg2veQGNsgUzMlbkzGlS8nth5je/1pEVZ27ya/g/WgCif5W8KgbBi+2WzpYjyGgNx/qNVqM3itlY= X-Received: by 2002:a9d:70cb:: with SMTP id w11-v6mr1918792otj.349.1538428696142; Mon, 01 Oct 2018 14:18:16 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Ankur Goenka Date: Mon, 1 Oct 2018 14:17:40 -0700 Message-ID: Subject: Re: CoGroupByKey only on window end. To: user@beam.apache.org Content-Type: multipart/alternative; boundary="000000000000f2876305773157c8" --000000000000f2876305773157c8 Content-Type: text/plain; charset="UTF-8" CoGBK and GBK need consistent windowing in PCollection. In your case, a custom solution is needed. Here is another way which only need pipeline orchestration and might be simpler. Lets say you have pcollection A with 15 min window and pcollection B with 1 min window Step 1: GBK pcollection A for 15 min window. Step 2: Read GBK A and re-emit same value for 15 x 1 min windows. Lets call this pcollection A' Step 3: Now A' and B have same window. Do CoGBK on A' and B. ... Thanks, Ankur On Mon, Oct 1, 2018 at 9:52 AM Akshay Balwally wrote: > Hi everyone, > > I would like to use a CoGroupByKey statement on unevenly windowed streams > (one of size 15 minutes, one of size 1 minute). As I understand it, > CoGroupByKey groups first by key, then by window. But of course since the > windows are not the same, my CoGroupByKey does not successfully join the > streams. > > One idea I had is to extend CoGroupByKey to make some > "CoGroupByKeyWindowEnd", that groups first by key, then by window.end. I > just wanted to check first- is there a better way to do this? Or something > natively supported by Beam? > > Thanks, > Akshay > -- > Akshay Balwally > Software Engineer > 9372716469 | > > > --000000000000f2876305773157c8 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
CoGBK and GBK need consistent windowing in PCollection. In= your case, a custom solution is needed.=C2=A0
Here is another way whic= h only need pipeline orchestration and might be simpler.

Lets say you have pcollection A with 15 min window and pcollection B with = 1 min window
Step 1: GBK pcollection A for 15 min window.
Ste= p 2: Read GBK A and re-emit same value for 15 x 1 min windows. Lets call th= is pcollection A'
Step 3: Now A' and B have same wi= ndow. Do CoGBK on A' and B.
...

Than= ks,
Ankur

On Mon, Oct 1, 2018 at 9:52 AM Akshay Balwally <abalwally@lyft.com> wrote:=
Hi everyone,
=
I would like to use a CoGroupByKey statement on unevenly win= dowed streams (one of size 15 minutes, one of size 1 minute). As I understa= nd it, CoGroupByKey groups first by key, then by window. But of course sinc= e the windows are not the same, my CoGroupByKey does not successfully join = the streams.

One idea I had is to extend CoGroupBy= Key to make some "CoGroupByKeyWindowEnd", that groups first by ke= y, then by window.end. I just wanted to check first- is there a better way = to do this? Or something natively supported by Beam?

Thanks,
Akshay
--
Akshay Balwally
Software Engineer=C2=A0=C2=A0
9372716469=C2=A0|=C2=A0
=C2=A0
--000000000000f2876305773157c8--