From user-return-33540-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Mar 17 12:32:47 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 57C1618057A for ; Tue, 17 Mar 2020 13:32:47 +0100 (CET) Received: (qmail 67427 invoked by uid 500); 17 Mar 2020 12:32:45 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 67417 invoked by uid 99); 17 Mar 2020 12:32:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Mar 2020 12:32:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 1260DC0269 for ; Tue, 17 Mar 2020 12:32:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.001 X-Spam-Level: X-Spam-Status: No, score=0.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 3edP-8Jt23MX for ; Tue, 17 Mar 2020 12:32:42 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::d2a; helo=mail-io1-xd2a.google.com; envelope-from=wossyn@gmail.com; receiver= Received: from mail-io1-xd2a.google.com (mail-io1-xd2a.google.com [IPv6:2607:f8b0:4864:20::d2a]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 456407DE33 for ; Tue, 17 Mar 2020 12:32:42 +0000 (UTC) Received: by mail-io1-xd2a.google.com with SMTP id q9so811964iod.4 for ; Tue, 17 Mar 2020 05:32:42 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=wEyij18TR3nfaAo7QyWtdQ/aOP9IG+Z9xmV/tzlT260=; b=JzjsFQ7Whue2Vgus7ptXYKUVKoD7bJEkXJ7awuEzMN2qVOraiAAWmmCEEBr0UJbZJE 33ygND59C6pnoePve+6VEX03kNEFwrwjX2F3YsAx1nIJWP+iZJeISlI735eYWNPrH0I4 bJD8HXe7OmT9QNbGznIvho9j22KKxg1PTsUCEvMBkSrU0pxFz3AKY9xyxB/tiJOzzyA7 G6zbfBh30t5IeEiDhdUed/8L70Y5EdIIchae1bZvwWcDrKJnVdFegE2Is9HDy2gy+f5W Ha6zpJBy8NrCJmfpfXBMIbVFC3wFttAD6aYJsgVEUwrhKGTC/m5OdYjUcVWhscbNHSMU tTYA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=wEyij18TR3nfaAo7QyWtdQ/aOP9IG+Z9xmV/tzlT260=; b=CNtEkWNa7jq8xoVS4Bkip1790FDyez1Y6ThDDXcog72CuxvCfdVO+CjRYSShLklLjN hSCIF4ElOBxlKQYXLYv4ZhhcStQjCWaY4fSIFOM7MTJtT/hHsI8AynFRsraDhJKKQ6KT e6aYMVkZN0wfN39feMyMVUw8jIry68k0PaWRkrPbsEeCYVZI6iE/uzOKb4dmYLYCbszA HDroMn5tdt7sNqxYkSZbYyrHX5JOc+BDLCgpC1nvVuC48OndcXxNaukPwn5Pa9lwhUGZ QwUXu3tUbYQtkXnywUq5k3a55p6Mdumxmr3X0uLEDH9XM2H9/T3YANoe5GijXX/zs93w U2/w== X-Gm-Message-State: ANhLgQ15FiAFvmSsBFbKXMB0kiTR6d5iP0dfT1A0rhRn8tmqo/lXl0kc aQdhlqYM1s5dUotIGRhbP11LlXQLBaejcwYlM4c= X-Google-Smtp-Source: ADFU+vuoiPSSluYC82ee0y+HFgDKzOBEZJihlvqMKaR2pxM99T1VCbnU4OUPg6c35xzxwyEgaTpUbe1lCc9BXB9oCxg= X-Received: by 2002:a02:7787:: with SMTP id g129mr4918164jac.29.1584448361007; Tue, 17 Mar 2020 05:32:41 -0700 (PDT) MIME-Version: 1.0 References: <1795531256.1185396.1584384383523.JavaMail.zimbra@scoop-software.de> In-Reply-To: From: =?UTF-8?Q?Dominik_Wosi=C5=84ski?= Date: Tue, 17 Mar 2020 13:32:30 +0100 Message-ID: Subject: Re: Issues with Watermark generation after join To: Kurt Young Cc: Theo Diefenthal , user Content-Type: multipart/alternative; boundary="000000000000b9122d05a10c2186" --000000000000b9122d05a10c2186 Content-Type: text/plain; charset="UTF-8" Hey sure, the original Temporal Table SQL is: |SELECT e.*, f.level as level FROM | enablers AS e, | LATERAL TABLE (Detectors(e.timestamp)) AS f | WHERE e.id= f.id |"" And the previous SQL query to join A&B is something like : SELECT * | FROM A te, | B s | WHERE s.id = te.id AND s.level = te.level AND s.timestamp = te.timestamp Also, if I replace the SQL to Join A&B with BroadcastProcessFunction this works like a charm, everything is calculated correctly. Even if I don't change the parallelism. I have noticed one more weird behavior, after the temporal table Join I have a windowing function to process the data. Now I have two options, in TTF I can select the rowtime with type Timestamp and assign it to field in output class, this automatically passes the Timestamp over so I don't need to assign it again. But I could also select just a Long field that is not marked as rowtime (even if they actually have the same value but this field was not marked with *.rowtime* on declaration) and then I will need to assign the timestamps and watermarks again, since Flink doesn't now what is the timestamp. Now, the former solution works like a charm, but for the latter one there is actually no output visible from the windowing function. My expectation is that both solutions should work exactly the same and pass the timestamps in the same manner, but apparently they are don't. Best Regards, Dom. > --000000000000b9122d05a10c2186 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hey sur= e,
the original Temporal Table SQL is:
|SELECT e.*, f.level as level FROM
| enablers AS e,
| LATERAL TABLE (Detectors(e.timestamp)) AS f| WHERE e.id=3D = f.id
|""
And the previous SQL query to join A&B is something like :

SELECT *
| FROM A te,
| B s
| WHERE s.id =3D te.id AND s.level =3D te.level AND s.timestamp =3D te.times= tamp

Also, if I replace the SQL to Join A&B with BroadcastProcessF= unction this works like a charm, everything is calculated correctly. Even i= f I don't change the parallelism.

I have noticed one more weird behavior, after= the temporal table Join I have a windowing function to process the data. N= ow I have two options, in TTF I can select the rowtime with type Timestamp = and assign it to field in output class, this automatically passes the Times= tamp over so I don't need to assign it again. But I could also select j= ust a Long field that is not marked as rowtime (even if they actually have = the same value but this field was not marked with .rowtime=C2=A0on d= eclaration) and then I will need to assign the timestamps and watermarks ag= ain, since Flink doesn't now what is the timestamp. Now, the former sol= ution works like a charm, but for the latter one there is actually no outpu= t visible from the windowing function. My expectation is that both solution= s should work exactly the same and pass the timestamps in the same manner, = but apparently they are don't.

Best Regards,
Dom= .
--000000000000b9122d05a10c2186--