From user-return-35874-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Jun 18 02:23:27 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 548EE180608 for ; Thu, 18 Jun 2020 04:23:27 +0200 (CEST) Received: (qmail 99478 invoked by uid 500); 18 Jun 2020 02:23:24 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 99453 invoked by uid 99); 18 Jun 2020 02:23:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jun 2020 02:23:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 770611813DE for ; Thu, 18 Jun 2020 02:23:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.001 X-Spam-Level: X-Spam-Status: No, score=-0.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id j8CVUHnju6Om for ; Thu, 18 Jun 2020 02:23:22 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.219.68; helo=mail-qv1-f68.google.com; envelope-from=imjark@gmail.com; receiver= Received: from mail-qv1-f68.google.com (mail-qv1-f68.google.com [209.85.219.68]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id E7140BB802 for ; Thu, 18 Jun 2020 02:23:21 +0000 (UTC) Received: by mail-qv1-f68.google.com with SMTP id r16so2083971qvm.6 for ; Wed, 17 Jun 2020 19:23:21 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=QG9O2UDLab8ztuhurSj1DLMcvsa3MBQsUMqfoLhcuXk=; b=EmVDkIavDFV+Xc6rrK3xX0T8G8pHZResXFotI7m/Xt0eHBniWsno7HQIXcIIkpYf9O r9gvluvoPZODN4CgCdWrL0eS6DR7x1plxKckO6+BTgUB+AIIN748cfIDeP70lDY6N215 pGjue16SfohMd3Wn1z4FABUcBKeYoK2BDlW8CshmnIJkOXu1Q1+q50k9UO1fZ1Fr5nMj 3oqBX6Wodx1sfbG7VHocRee+gIat6+CQJEF5wRE6AiVvplxVjatkHlCdb3nJVAjnn0JX L+zjXeLW035VK3uZl1qZs9evSlu4ND+h0MnzrfxXNK7OrP26IF0itrP/YMaAP+5XWhRm DNlw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=QG9O2UDLab8ztuhurSj1DLMcvsa3MBQsUMqfoLhcuXk=; b=O2zJ0ofh+rXvcjcgOVHZBOWwxcGhdD6sCbcwZq7bLHsjwyJU1VxB2RFi5YieOU5twW hNhhq9QhRQnkqqs21MCNJKViDXGthT5wAiBf6UqlSJRZPOg46pZ+OB3qc8Zc5hjerSGL D2BVKOhEkjZeSZr2SVP52AGI4eYOo61QAXw/UHvYnQmTlNbybDGdhZkNMYAW5FlZz4Ab mRo3I7yYQYD/eY3olIDNd1zRz+UWFFDzkyC0+YqjQ3ow0oWI1xo0Zmuo+4k8DAFxvpz5 3Ra7yZyK7BdDN4qdce2D4eji4tsJrdyhr9JJArr7boGGIGhdFdYdwyN4BBY8+kG1bMVc eb5w== X-Gm-Message-State: AOAM532jEdLt50eVxkaLjeC8TjbXmqBFrC+tjzSBxkl41Q/YMnIWO38c 4smVHOXHbidHzakmAIm1ydADkgnXMcvFe4JyVaw= X-Google-Smtp-Source: ABdhPJxYxxDRlmtZu2FcAaB06NE9ZlIwkOndmbhUwC5BYEUxZxQv1Wpm5dXlh4o6muKEyEfvtqVdmvFUPX8ksNkJaXw= X-Received: by 2002:ad4:5551:: with SMTP id v17mr1662658qvy.89.1592447001632; Wed, 17 Jun 2020 19:23:21 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Jark Wu Date: Thu, 18 Jun 2020 10:23:10 +0800 Message-ID: Subject: Re: Blink Planner Retracting Streams To: John Mathews Cc: godfrey he , user Content-Type: multipart/alternative; boundary="000000000000db24a205a8527577" --000000000000db24a205a8527577 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi John, Maybe I misunderstand something, but CRow doesn't have the `getSchema()` method. You can getSchema() on the Table, this also works if you convert the table into Tuple2. Actually, there is no big difference between CRow and Tuple2, they both wrap the change flag and the Row. Best, Jark On Thu, 18 Jun 2020 at 06:39, John Mathews wrote: > Hello Godfrey, > > Thanks for the response! > > I think the problem with Tuple2, is that if my understanding is correct o= f > how CRow worked, when CRow's getSchema() was returned it would return the > underlying schema of the row it contained. Tuple2 doesn't do that, so it > changes/breaks a lot of our downstream code that is relying on the > TableSchema to return the underlying row's schema, and not a Tuple schema= . > > Any thoughts on that issue? > > > On Wed, Jun 17, 2020 at 12:16 AM godfrey he wrote: > >> hi John, >> >> You can use Tuple2[Boolean, Row] to replace CRow, the >> StreamTableEnvironment#toRetractStream method return DataStream[(Boolean= , >> T)]. >> >> the code looks like: >> >> tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] = { >> override def map(value: (Boolean, Row)): R =3D ... >> }) >> >> Bests, >> Godfrey >> >> John Mathews =E4=BA=8E2020=E5=B9=B46=E6=9C=8817= =E6=97=A5=E5=91=A8=E4=B8=89 =E4=B8=8B=E5=8D=8812:13=E5=86=99=E9=81=93=EF=BC= =9A >> >>> Hello, >>> >>> I am working on migrating from the flink table-planner to the new blink >>> one, and one problem I am running into is that it doesn't seem like Bli= nk >>> has a concept of a CRow, unlike the original table-planner. >>> >>> I am therefore struggling to figure out how to properly convert a >>> retracting stream to a SingleOutputStreamOperator when using just the B= link >>> planner libraries. >>> >>> E.g. in the old planner I could do something like this: >>> SingleOutputStreamOperator stream =3D >>> tableEnvironment.toRetractStream(table, typeInfo) >>> .map(value -> new CRow(value.f1, value.f0); >>> >>> but without the CRow, I'm not sure how to accomplish this. >>> >>> Any suggestions? >>> >>> Thanks! >>> John >>> >>> >>> --000000000000db24a205a8527577 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi John,

Maybe I misunderstand somethin= g, but CRow doesn't have the `getSchema()` method. You can getSchema() = on the Table, this also works if you convert the table into Tuple2<Boole= an, Row>.=C2=A0
Actually, there is no big difference between CRow and= =C2=A0Tuple2<Boolean, Row>, they both wrap the change flag and the Ro= w.=C2=A0

Best,
Jark

=


On Thu, 18 Jun 2020 at 06:39, John Mathews <jmathews3773@gmail.com> wrote:
Hello = Godfrey,=C2=A0

Thanks for the response!=C2=A0
=
I think the problem with Tuple2, is that if my understanding= is correct of how CRow worked, when CRow's getSchema() was returned it= would return the underlying schema of the row it contained. Tuple2 doesn&#= 39;t do that, so it changes/breaks a lot of our downstream code that is rel= ying on the TableSchema to return the underlying row's schema, and not = a Tuple schema.

Any thoughts on that issue?
<= div>

On Wed, Jun 17, 2020 at 12:16 AM godfrey he <godfreyhe@gmail.com> w= rote:
hi John,

You can use Tuple2[Boolean, Row] to re= place CRow, the StreamTableEnvironment#toRetractStream method return=C2=A0D= ataStream[(Boolean, T)].

the code looks like:

tEnv.toRetractStream[Row](table).map(new MapFunction[(= Boolean, Row), R] {
=C2=A0 =C2=A0 =C2=A0 override def map(value: (Boolea= n, Row)): R =3D ...
=C2=A0 =C2=A0 })

Bests,=
Godfrey

John Mathews <jmathews3773@gmail.com> =E4=BA=8E2020=E5= =B9=B46=E6=9C=8817=E6=97=A5=E5=91=A8=E4=B8=89 =E4=B8=8B=E5=8D=8812:13=E5=86= =99=E9=81=93=EF=BC=9A
Hello,

I am working on migrating = from the flink table-planner to the new blink one, and one problem I am run= ning into is that it doesn't seem like Blink has a concept of a CRow, u= nlike the original table-planner.

I am therefore s= truggling to figure out how to properly convert a retracting stream to a Si= ngleOutputStreamOperator when using just the Blink planner libraries.
=

E.g. in the old planner I could do something like this:= =C2=A0
SingleOutputStreamOperator<CRow> stream =3D tableEnv= ironment.toRetractStream(table, typeInfo)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .map(value -> new CRow(value.f= 1, value.f0);

but without the CRow, I'm not sure how to ac= complish this.

Any=C2=A0suggestions?
Thanks!
John


--000000000000db24a205a8527577--