From user-return-28789-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Jul 23 15:37:43 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 49AAA1802C7 for ; Tue, 23 Jul 2019 17:37:43 +0200 (CEST) Received: (qmail 43199 invoked by uid 500); 23 Jul 2019 15:37:41 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 43189 invoked by uid 99); 23 Jul 2019 15:37:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jul 2019 15:37:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 742341A3217 for ; Tue, 23 Jul 2019 15:37:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.101 X-Spam-Level: ** X-Spam-Status: No, score=2.101 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URIBL_CSS=0.1, URIBL_CSS_A=0.1, URI_HEX=0.1] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id P16TfCwYJdwq for ; Tue, 23 Jul 2019 15:37:38 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::243; helo=mail-lj1-x243.google.com; envelope-from=walterddr@gmail.com; receiver= Received: from mail-lj1-x243.google.com (mail-lj1-x243.google.com [IPv6:2a00:1450:4864:20::243]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id A7BE87E221 for ; Tue, 23 Jul 2019 15:37:37 +0000 (UTC) Received: by mail-lj1-x243.google.com with SMTP id d24so41485421ljg.8 for ; Tue, 23 Jul 2019 08:37:37 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=/zG4S1xd3OzKnV35ntyFGDvCt1ZHtAorIXKD1k+2xQw=; b=loyuh8jXQ2kJu+gNI36cCrBlYeKqw1DXkAWjnAr6iZHDKBPAVzeg1v7w07WmvPU8iE TRDR3kUSWypzwTq69BZS3I0GVYuFJVepD7/NXQTEFLay6p3mdkoNMFf/Qf6uWMm880ff mDeU1pa4rT6q+jcq94EuJkP77/picF4u8AjSfm/eX9TAiXal2ayUZASkdmmet106Ec+F 1z7g+vXl2egtpnBrQUrw7gpt/96CT6TtRNC0NqbFfJ6Hf+Fs3f6wi9kDjgg6QRolltqn /OoMkbnku9qstCnsEkY1VQaEtwOH7mhwDeXpXqT9f9dBgZcvkFHLAuhazZ/KZQIW1uPB yHGQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=/zG4S1xd3OzKnV35ntyFGDvCt1ZHtAorIXKD1k+2xQw=; b=EWk6IEn0D8+PTZnm4fh5fOgZbzjbPV08Y39osHh65q724d4h6KBNKbDOCT5sjHfLev ec6Ft2wGVHgP6LTLLATY6m/IbVKSBbHdbBhmRZDUISKxzl9y/YLGJTj8Mgu3kxIh4Idp pSpabozoz5KlL+WIks4sBB3e5KWJRFO6v2mkw3cQcI53+DU7NqeACqGjifUZLTg2+YiD IRvpEXejVO5amlTsKY76nlaGBLjUFbl84oD9grJqL2RhdOSKERWIYk016i3LC4uprwgi Jnxur3A6L4h48hW5q25fhjAboRafXW2IeL3qEUaH0uWQIzkTJOAKElJKy7f3MGUeVZ2q dkHQ== X-Gm-Message-State: APjAAAUP3HNm00X3gBmy1LTE9zR7ItpC28Mf78Lr1wrFFDqeEhKyChnt lLISiT9Ji0vsbSVmNenijpcg8z2noMhYFRjuaU4= X-Google-Smtp-Source: APXvYqylx6od/7FqOQx2AWf0aulb6Cbpmk/tYkxgZBSXVvbiXM/sxB0k5OLlhcTder2vzpXnsSGrQZyfc/pumPzFOxM= X-Received: by 2002:a2e:8802:: with SMTP id x2mr19914702ljh.200.1563896256977; Tue, 23 Jul 2019 08:37:36 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Rong Rong Date: Tue, 23 Jul 2019 08:37:25 -0700 Message-ID: Subject: Re: [Table API] ClassCastException when converting a table to DataStream To: Dongwon Kim Cc: Fabian Hueske , user Content-Type: multipart/alternative; boundary="000000000000dcde1d058e5af830" --000000000000dcde1d058e5af830 Content-Type: text/plain; charset="UTF-8" Hi Dongwon, Sorry for the late reply. I did try some experiment and seems like you are right: Setting the `.return()` type actually alter the underlying type of the DataStream from a GenericType into a specific RowTypeInfo. Please see the JIRA ticket [1] for more info. Regarding the approach, yes I think you cannot access the timer service from the table/SQL API at this moment so that might be the best approach. And as Fabian suggested, I don't think there's too much problem if you are not changing the type info underlying in your DataStream. I will follow up with this in the JIRA ticket. -- Rong [1] https://issues.apache.org/jira/browse/FLINK-13389 On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim wrote: > Hi Fabian, > > Thanks for clarification :-) > I could convert back and forth without worrying about it as I keep using > Row type during the conversion (even though fields are added). > > Best, > > Dongwon > > > > On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske wrote: > >> Hi Dongwon, >> >> regarding the question about the conversion: If you keep using the Row >> type and not adding/removing fields, the conversion is pretty much for free >> right now. >> It will be a MapFunction (sometimes even not function at all) that should >> be chained with the other operators. Hence, it should boil down to a >> function call. >> >> Best, Fabian >> >> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim < >> eastcirclek@gmail.com>: >> >>> Hi Rong, >>> >>> I have to dig deeper into the code to reproduce this error. This seems >>>> to be a bug to me and will update once I find anything. >>> >>> Thanks a lot for spending your time on this. >>> >>> However from what you explained, if I understand correctly you can do >>>> all of your processing within the TableAPI scope without converting it back >>>> and forth to DataStream. >>>> E.g. if your "map(a -> a)" placeholder represents some sort of map >>>> function that's simple enough, you can implement and connect with the table >>>> API via UserDefinedFunction[1]. >>>> As TableAPI becoming the first class citizen [2,3,4], this would be >>>> much cleaner implementation from my perspective. >>> >>> I also agree with you in that the first class citizen Table API will >>> make everything not only easier but also a lot cleaner. >>> We however contain some corner cases that force us to covert Table from >>> and to DataStream. >>> One such case is to append to Table a column showing the current >>> watermark of each record; there's no other way but to do that as >>> ScalarFunction doesn't allow us to get the runtime context information as >>> ProcessFunction does. >>> >>> I have a question regarding the conversion. >>> Do I have to worry about runtime performance penalty in case that I >>> cannot help but convert back and fourth to DataStream? >>> >>> Best, >>> >>> Dongwon >>> >>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong wrote: >>> >>>> Hi Dongwon, >>>> >>>> I have to dig deeper into the code to reproduce this error. This seems >>>> to be a bug to me and will update once I find anything. >>>> >>>> However from what you explained, if I understand correctly you can do >>>> all of your processing within the TableAPI scope without converting it back >>>> and forth to DataStream. >>>> E.g. if your "map(a -> a)" placeholder represents some sort of map >>>> function that's simple enough, you can implement and connect with the table >>>> API via UserDefinedFunction[1]. >>>> As TableAPI becoming the first class citizen [2,3,4], this would be >>>> much cleaner implementation from my perspective. >>>> >>>> -- >>>> Rong >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions >>>> [2] >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html >>>> [3] >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html >>>> [4] >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html >>>> >>>> >>>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim >>>> wrote: >>>> >>>>> Hi Rong, >>>>> >>>>> Thank you for reply :-) >>>>> >>>>> which Flink version are you using? >>>>> >>>>> I'm using Flink-1.8.0. >>>>> >>>>> what is the "sourceTable.getSchema().toRowType()" return? >>>>> >>>>> Row(time1: TimeIndicatorTypeInfo(rowtime)) >>>>> >>>>> what is the line *".map(a -> a)" *do and can you remove it? >>>>> >>>>> *".map(a->a)"* is just to illustrate a problem. >>>>> My actual code contains a process function (instead of .map() in the >>>>> snippet) which appends a new field containing watermark to a row. >>>>> If there were ways to get watermark inside a scalar UDF, I wouldn't >>>>> convert table to datastream and vice versa. >>>>> >>>>> if I am understanding correctly, you are also using "time1" as the >>>>>> rowtime, is that want your intension is to use it later as well? >>>>> >>>>> yup :-) >>>>> >>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only >>>>>> adds a type information hint about the return type of this operator. It is >>>>>> used in cases where Flink cannot determine automatically[1]. >>>>> >>>>> The reason why I specify >>>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type >>>>> information hint as you said. >>>>> That is needed later when I need to make another table like >>>>> "*Table anotherTable = tEnv.fromDataStream(stream);"*, >>>>> Without the type information hint, I've got an error >>>>> "*An input of GenericTypeInfo cannot be converted to Table. >>>>> Please specify the type of the input with a RowTypeInfo."* >>>>> That's why I give a type information hint in that way. >>>>> >>>>> Best, >>>>> >>>>> Dongwon >>>>> >>>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong >>>>> wrote: >>>>> >>>>>> Hi Dongwon, >>>>>> >>>>>> Can you provide a bit more information: >>>>>> which Flink version are you using? >>>>>> what is the "sourceTable.getSchema().toRowType()" return? >>>>>> what is the line *".map(a -> a)" *do and can you remove it? >>>>>> if I am understanding correctly, you are also using "time1" as the >>>>>> rowtime, is that want your intension is to use it later as well? >>>>>> >>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* >>>>>> only adds a type information hint about the return type of this operator. >>>>>> It is used in cases where Flink cannot determine automatically[1]. >>>>>> >>>>>> Thanks, >>>>>> Rong >>>>>> >>>>>> -- >>>>>> [1] >>>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 >>>>>> >>>>>> >>>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> Consider the following snippet: >>>>>>> >>>>>>>> Table sourceTable = getKafkaSource0(tEnv); >>>>>>>> DataStream stream = tEnv.toAppendStream(sourceTable, >>>>>>>> Row.class) >>>>>>>> >>>>>>>> * .map(a -> a) >>>>>>>> .returns(sourceTable.getSchema().toRowType());* >>>>>>>> stream.print(); >>>>>>>> >>>>>>> where sourceTable.printSchema() shows: >>>>>>> >>>>>>>> root >>>>>>>> |-- time1: TimeIndicatorTypeInfo(rowtime) >>>>>>> >>>>>>> >>>>>>> >>>>>>> This program returns the following exception: >>>>>>> >>>>>>>> Exception in thread "main" >>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >>>>>>>> at >>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) >>>>>>>> at app.metatron.test.Main2.main(Main2.java:231) >>>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot >>>>>>>> be cast to java.lang.Long* >>>>>>>> * at >>>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) >>>>>>>> at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) >>>>>>>> ... >>>>>>> >>>>>>> >>>>>>> The row serializer seems to try to deep-copy an instance of >>>>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. >>>>>>> Could anybody help me? >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> - Dongwon >>>>>>> >>>>>>> p.s. though removing .returns() makes everything okay, I need to do >>>>>>> that as I want to convert DataStream into another table later. >>>>>>> p.s. the source table is created as follows: >>>>>>> >>>>>>> private static final Table getKafkaSource0(StreamTableEnvironment >>>>>>>> tEnv) { >>>>>>>> ConnectorDescriptor connectorDescriptor = new Kafka() >>>>>>>> .version("universal") >>>>>>>> .topic("mytopic") >>>>>>>> .property("bootstrap.servers", "localhost:9092") >>>>>>>> .property("group.id", "mygroup") >>>>>>>> .startFromEarliest(); >>>>>>>> FormatDescriptor formatDescriptor = new Csv() >>>>>>>> .deriveSchema() >>>>>>>> .ignoreParseErrors() >>>>>>>> .fieldDelimiter(','); >>>>>>>> Schema schemaDescriptor = new Schema() >>>>>>>> .field("time1", SQL_TIMESTAMP()) >>>>>>>> .rowtime( >>>>>>>> new Rowtime() >>>>>>>> .timestampsFromField("rowTime") >>>>>>>> .watermarksPeriodicBounded(100) >>>>>>>> ); >>>>>>>> tEnv.connect(connectorDescriptor) >>>>>>>> .withFormat(formatDescriptor) >>>>>>>> .withSchema(schemaDescriptor) >>>>>>>> .inAppendMode() >>>>>>>> .registerTableSource("mysrc"); >>>>>>>> return tEnv.scan("mysrc"); >>>>>>>> } >>>>>>> >>>>>>> --000000000000dcde1d058e5af830 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Dongwon,=C2=A0

Sorry for the=C2=A0la= te reply. I did try some experiment and seems like you are right:=C2=A0
Setting the `.return()` type actually alter the underlying type of t= he DataStream from a GenericType into a specific RowTypeInfo. Please see th= e JIRA ticket [1] for more info.

Regarding the app= roach, yes I think you cannot access the timer service from the table/SQL A= PI at this moment so that might be the best approach.=C2=A0
And a= s Fabian suggested, I don't think there's too much problem if you a= re not changing the type info underlying in your DataStream. I will follow = up with this in the JIRA ticket.

--
Rong=


On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim <eastcirclek@gmail.com> wrote:
Hi Fabian,
Thanks for clarification :-)
I could convert back an= d forth without worrying about it as I keep using Row type during the conve= rsion (even though fields are added).

Best,
<= div>
Dongwon



On Tue, Jul 2= 3, 2019 at 8:15 PM Fabian Hueske <fhueske@gmail.com> wrote:
Hi Dongwon,

regarding the question about the conversion: If you keep = using the Row type and not adding/removing fields, the conversion is pretty= much for free right now.
It will be a MapFunction (sometimes eve= n not function at all) that should be chained with the other operators. Hen= ce, it should boil down to a function call.

Best, = Fabian

Am Sa., 20. Juli 2019 um 03:58=C2=A0Uhr schrieb Dongwon Kim= <eastcirclek= @gmail.com>:
Hi Rong,

I have to dig deeper into the code to reproduce this error. Th= is seems to be a bug to me and will update once I find anything.
Thanks a lot for spending your time on this.
However from what you e= xplained, if I understand correctly you can do all of your processing withi= n the TableAPI scope without converting it back and forth to DataStream.E.g. if your "map(a -> a)" placeholder represents some sort o= f map function that's simple enough, you can implement and connect with= the table API via UserDefinedFunction[1].
As TableAPI becoming the firs= t class citizen [2,3,4], this would be much cleaner implementation from my = perspective.=C2=A0
I also agree with you in that the= first class citizen Table API will make everything not only easier but als= o a lot cleaner.
We however contain some corner cases that = force us to covert Table from and to DataStream.
One such cas= e is to append to Table a column showing the current watermark of each reco= rd; there's no other way but to do that as ScalarFunction doesn't a= llow us to get the runtime context information as ProcessFunction does.

I have a question regarding the conversion.
Do I have to worry about runtime performance penalty in case that I cannot= help but convert back and fourth to DataStream?

Best,

Dongwon

On Sat, Jul 20, 2019 at 12:4= 1 AM Rong Rong <walterddr@gmail.com> wrote:
Hi Dongwon,

I have = to dig deeper into the code to reproduce this error. This seems to be a bug= to me and will update once I find anything.

Howev= er from what you explained, if I understand correctly you can do all of you= r processing within the TableAPI scope without converting it back and forth= to DataStream.
E.g. if your "map(a -> a)" placehold= er represents some sort of map function that's simple enough, you can i= mplement and connect with the table API via UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much= cleaner implementation from my perspective.=C2=A0

--
Rong

=

=

On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirclek@gmail.com> wrote:<= br>
Hi Rong,

Thank you for reply :-)

which Flink version ar= e you using?=C2=A0
I'm using Flink-1.8.0.
what is the "= ;sourceTable.getSchema().toRowType()" return?
Row(tim= e1: TimeIndicatorTypeInfo(rowtime))

what is the line=C2=A0".map(a -&= gt; a)"=C2=A0do and can you remove it?
".= map(a->a)"=C2=A0is just to illustrate a problem.
= My actual code contains a process function (instead of .map() in the snippe= t) which appends a new field containing watermark to a row.
If th= ere were ways to get watermark inside a scalar UDF, I wouldn't convert = table to datastream and vice versa.

if I am understanding correctly, you are a= lso using "time1" as the rowtime, is that want your intension is = to use it later as well?
yup :-)=C2=A0

As far a= s I know=C2=A0".returns(sourceTable.getSchema().toRowType());"= =C2=A0only adds a type information hint about the return type of this o= perator. It is used in cases where Flink cannot determine automatically[1].= =C2=A0
The reason why I specify=C2=A0".= returns(sourceTable.getSchema().toRowType());"=C2=A0is to give a t= ype information hint as you said.
That is needed later when I nee= d to make another table like
=C2=A0 =C2=A0"Table anotherT= able =3D tEnv.fromDataStream(stream);",
Without the type= information hint, I've got an error=C2=A0
=C2=A0 =C2=A0"= ;An input of GenericTypeInfo<Row> cannot be converted to Table. Pl= ease specify the type of the input with a RowTypeInfo."
= That's why I give a type information hint in that way.
=
Best,

Dongwon

=
On Fri, Ju= l 19, 2019 at 12:39 AM Rong Rong <walterddr@gmail.com> wrote:
Hi Dongwon,

=
Can you provide a bit more information:=C2=A0
which Fl= ink version are you using?=C2=A0
what is the "sourceTable.ge= tSchema().toRowType()" return?
what is the line=C2=A0&quo= t;.map(a -> a)"=C2=A0do and can you remove it?
if I am= understanding correctly, you are also using "time1" as the rowti= me, is that want your intension is to use it later as well?

As far as I know ".returns(sourceTable.getSchema().toRowTy= pe());" only adds a type information hint about the return type of= this operator. It is used in cases where Flink cannot determine automatica= lly[1].=C2=A0

Thanks,
Rong
--


On Wed, Jul 17, 201= 9 at 1:29 AM Dongwon Kim <eastcirclek@gmail.com> wrote:
Hello,
Consider the following snippet:
=C2=A0 =C2=A0 Table sourceTable =3D getKafkaSource0(= tEnv);
=C2=A0 =C2=A0 DataStream<Row> stream =3D tEnv.toAppendStrea= m(sourceTable, Row.class)
=C2=A0 =C2=A0 =C2=A0 .map(a -> a)
=C2= =A0 =C2=A0 =C2=A0 .returns(sourceTable.getSchema().toRowType());

=C2= =A0 =C2=A0 stream.print();
where sourceTable.printSche= ma() shows:
ro= ot
=C2=A0|-- time1: TimeIndicatorTypeInfo(rowtime)

=

=C2=A0This program returns the following exceptio= n:
Exception in thre= ad "main" org.apache.flink.runtime.client.JobExecutionException: = Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.t= oJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.min= icluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.ap= ache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalSt= reamEnvironment.java:123)
at org.apache.flink.streaming.api.environment= .StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) at app.metatron.test.Main2.main(Main2.java:231)
Caused by: java.la= ng.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long<= /b>
at org.apache.flink.api.common.typeutils.base.LongSerializer.cop= y(LongSerializer.java:32)
at org.apache.flink.api.java.typeutils.ru= ntime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api= .java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
...

The row serializer seems to try to deep-copy = an instance of java.sql.Timestamp using LongSerializer instead of SqlTimest= ampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. th= ough removing .returns() makes everything okay, I need to do that as I want= to convert DataStream<Row> into another table later.
p.s. = the source table is created as follows:

private static final Table getKafkaSourc= e0(StreamTableEnvironment tEnv) {
=C2=A0 =C2=A0 ConnectorDescriptor conn= ectorDescriptor =3D new Kafka()
=C2=A0 =C2=A0 =C2=A0 .version("univ= ersal")
=C2=A0 =C2=A0 =C2=A0 .topic("mytopic")
=C2=A0 = =C2=A0 =C2=A0 .property("bootstrap.servers", "localhost:9092= ")
=C2=A0 =C2=A0 =C2=A0 .property("group.id", "mygroup")
=C2=A0 =C2= =A0 =C2=A0 .startFromEarliest();
=C2=A0 =C2=A0 FormatDescriptor formatDe= scriptor =3D new Csv()
=C2=A0 =C2=A0 =C2=A0 .deriveSchema()
=C2=A0 = =C2=A0 =C2=A0 .ignoreParseErrors()
=C2=A0 =C2=A0 =C2=A0 .fieldDelimiter(= ',');
=C2=A0 =C2=A0 Schema schemaDescriptor =3D new Schema()=C2= =A0 =C2=A0 =C2=A0
=C2=A0 =C2=A0 =C2=A0 .field("time1", SQL_TIM= ESTAMP())
=C2=A0 =C2=A0 =C2=A0 .rowtime(
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = new Rowtime()
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .timestampsFromField(&q= uot;rowTime")
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .watermarksPeriodi= cBounded(100)
=C2=A0 =C2=A0 =C2=A0 );
=C2=A0 =C2=A0 tEnv.connect(conn= ectorDescriptor)
=C2=A0 =C2=A0 =C2=A0 .withFormat(formatDescriptor)
= =C2=A0 =C2=A0 =C2=A0 .withSchema(schemaDescriptor)
=C2=A0 =C2=A0 =C2=A0 = .inAppendMode()
=C2=A0 =C2=A0 =C2=A0 .registerTableSource("mysrc&qu= ot;);
=C2=A0 =C2=A0 return tEnv.scan("mysrc");
=C2=A0 }
--000000000000dcde1d058e5af830--