flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rong Rong <walter...@gmail.com>
Subject Re: [Table API] ClassCastException when converting a table to DataStream<Row>
Date Tue, 23 Jul 2019 15:37:25 GMT
Hi Dongwon,

Sorry for the late reply. I did try some experiment and seems like you are
right:
Setting the `.return()` type actually alter the underlying type of the
DataStream from a GenericType into a specific RowTypeInfo. Please see the
JIRA ticket [1] for more info.

Regarding the approach, yes I think you cannot access the timer service
from the table/SQL API at this moment so that might be the best approach.
And as Fabian suggested, I don't think there's too much problem if you are
not changing the type info underlying in your DataStream. I will follow up
with this in the JIRA ticket.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-13389

On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim <eastcirclek@gmail.com> wrote:

> Hi Fabian,
>
> Thanks for clarification :-)
> I could convert back and forth without worrying about it as I keep using
> Row type during the conversion (even though fields are added).
>
> Best,
>
> Dongwon
>
>
>
> On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> regarding the question about the conversion: If you keep using the Row
>> type and not adding/removing fields, the conversion is pretty much for free
>> right now.
>> It will be a MapFunction (sometimes even not function at all) that should
>> be chained with the other operators. Hence, it should boil down to a
>> function call.
>>
>> Best, Fabian
>>
>> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
>> eastcirclek@gmail.com>:
>>
>>> Hi Rong,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
>>>> to be a bug to me and will update once I find anything.
>>>
>>> Thanks a lot for spending your time on this.
>>>
>>> However from what you explained, if I understand correctly you can do
>>>> all of your processing within the TableAPI scope without converting it back
>>>> and forth to DataStream.
>>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>>> function that's simple enough, you can implement and connect with the table
>>>> API via UserDefinedFunction[1].
>>>> As TableAPI becoming the first class citizen [2,3,4], this would be
>>>> much cleaner implementation from my perspective.
>>>
>>> I also agree with you in that the first class citizen Table API will
>>> make everything not only easier but also a lot cleaner.
>>> We however contain some corner cases that force us to covert Table from
>>> and to DataStream.
>>> One such case is to append to Table a column showing the current
>>> watermark of each record; there's no other way but to do that as
>>> ScalarFunction doesn't allow us to get the runtime context information as
>>> ProcessFunction does.
>>>
>>> I have a question regarding the conversion.
>>> Do I have to worry about runtime performance penalty in case that I
>>> cannot help but convert back and fourth to DataStream?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <walterddr@gmail.com> wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> I have to dig deeper into the code to reproduce this error. This seems
>>>> to be a bug to me and will update once I find anything.
>>>>
>>>> However from what you explained, if I understand correctly you can do
>>>> all of your processing within the TableAPI scope without converting it back
>>>> and forth to DataStream.
>>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>>> function that's simple enough, you can implement and connect with the table
>>>> API via UserDefinedFunction[1].
>>>> As TableAPI becoming the first class citizen [2,3,4], this would be
>>>> much cleaner implementation from my perspective.
>>>>
>>>> --
>>>> Rong
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>>>> [2]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>>>> [3]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>>>> [4]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>>>
>>>>
>>>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <eastcirclek@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Rong,
>>>>>
>>>>> Thank you for reply :-)
>>>>>
>>>>> which Flink version are you using?
>>>>>
>>>>> I'm using Flink-1.8.0.
>>>>>
>>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>>
>>>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>>>
>>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>>
>>>>> *".map(a->a)"* is just to illustrate a problem.
>>>>> My actual code contains a process function (instead of .map() in the
>>>>> snippet) which appends a new field containing watermark to a row.
>>>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>>>> convert table to datastream and vice versa.
>>>>>
>>>>> if I am understanding correctly, you are also using "time1" as the
>>>>>> rowtime, is that want your intension is to use it later as well?
>>>>>
>>>>> yup :-)
>>>>>
>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>>>>> adds a type information hint about the return type of this operator.
It is
>>>>>> used in cases where Flink cannot determine automatically[1].
>>>>>
>>>>> The reason why I specify
>>>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>>>> information hint as you said.
>>>>> That is needed later when I need to make another table like
>>>>>    "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>>>> Without the type information hint, I've got an error
>>>>>    "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>>>>> Please specify the type of the input with a RowTypeInfo."*
>>>>> That's why I give a type information hint in that way.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dongwon
>>>>>
>>>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walterddr@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Dongwon,
>>>>>>
>>>>>> Can you provide a bit more information:
>>>>>> which Flink version are you using?
>>>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>>> if I am understanding correctly, you are also using "time1" as the
>>>>>> rowtime, is that want your intension is to use it later as well?
>>>>>>
>>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>>>>> only adds a type information hint about the return type of this operator.
>>>>>> It is used in cases where Flink cannot determine automatically[1].
>>>>>>
>>>>>> Thanks,
>>>>>> Rong
>>>>>>
>>>>>> --
>>>>>> [1]
>>>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirclek@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Consider the following snippet:
>>>>>>>
>>>>>>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>>>>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>>>>> Row.class)
>>>>>>>>
>>>>>>>> *      .map(a -> a)
>>>>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>>>>     stream.print();
>>>>>>>>
>>>>>>> where sourceTable.printSchema() shows:
>>>>>>>
>>>>>>>> root
>>>>>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  This program returns the following exception:
>>>>>>>
>>>>>>>> Exception in thread "main"
>>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp
cannot
>>>>>>>> be cast to java.lang.Long*
>>>>>>>> * at
>>>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>>>>> at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>>>>> at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>> The row serializer seems to try to deep-copy an instance of
>>>>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>>>>> Could anybody help me?
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> - Dongwon
>>>>>>>
>>>>>>> p.s. though removing .returns() makes everything okay, I need
to do
>>>>>>> that as I want to convert DataStream<Row> into another
table later.
>>>>>>> p.s. the source table is created as follows:
>>>>>>>
>>>>>>> private static final Table getKafkaSource0(StreamTableEnvironment
>>>>>>>> tEnv) {
>>>>>>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>>>>       .version("universal")
>>>>>>>>       .topic("mytopic")
>>>>>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>>>>>       .property("group.id", "mygroup")
>>>>>>>>       .startFromEarliest();
>>>>>>>>     FormatDescriptor formatDescriptor = new Csv()
>>>>>>>>       .deriveSchema()
>>>>>>>>       .ignoreParseErrors()
>>>>>>>>       .fieldDelimiter(',');
>>>>>>>>     Schema schemaDescriptor = new Schema()
>>>>>>>>       .field("time1", SQL_TIMESTAMP())
>>>>>>>>       .rowtime(
>>>>>>>>         new Rowtime()
>>>>>>>>           .timestampsFromField("rowTime")
>>>>>>>>           .watermarksPeriodicBounded(100)
>>>>>>>>       );
>>>>>>>>     tEnv.connect(connectorDescriptor)
>>>>>>>>       .withFormat(formatDescriptor)
>>>>>>>>       .withSchema(schemaDescriptor)
>>>>>>>>       .inAppendMode()
>>>>>>>>       .registerTableSource("mysrc");
>>>>>>>>     return tEnv.scan("mysrc");
>>>>>>>>   }
>>>>>>>
>>>>>>>

Mime
View raw message