flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job
Date Wed, 06 May 2020 16:09:49 GMT
No, I think that should be all right.

On 06.05.20 16:57, Vishwas Siravara wrote:
> Thanks I figured that would be the case. I m using the flink tuple type in
> the map functions ,so there is no casting required now. Can you think of
> any downsides of using flink tuples in scala code, especially since the
> flink tuple is in the java api package in flink ?
> 
> Best,
> Nick.
> 
> On Wed, May 6, 2020 at 9:52 AM Aljoscha Krettek <aljoscha@apache.org> wrote:
> 
>> Hi,
>>
>> Flink will not do any casting between types. You either need to output
>> to correct (Scala) Tuple type from the deserialization schema or insert
>> a step (say a map function) that converts between the two types. The
>> Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in
>> common when it comes to the type system.
>>
>> Best,
>> Aljoscha
>>
>> On 06.05.20 01:42, Nick Bendtner wrote:
>>> Hi guys,
>>> In our flink job we use java source for deserializing a message from
>> kafka
>>> using a kafka deserializer. Signature is as below.
>>>
>>>
>>> public class CustomAvroDeserializationSchema implements
>>>           KafkaDeserializationSchema<Tuple2<EventMetaData,GenericRecord>>
>>>
>>> The other parts of the streaming job are in scala. When data has to be
>>> serialized I get this exception
>>>
>>>
>>>
>>>
>>> *java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2
>> cannot
>>> be cast to scala.Product at
>>>
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>> at
>>>
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>> at
>>>
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)*
>>>
>>> Here is how I provide type info for serialization in the java
>>> deserialization class:
>>>
>>> @Override
>>> public TypeInformation<Tuple2<EventMetaData, GenericRecord>>
>> getProducedType() {
>>>       return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new
>>> GenericRecordAvroTypeInfo(this
>>>               .writer));
>>>
>>> Here is how I add the kafka source in scala :
>>>
>>> private[flink] def sourceType(
>>>     deserialization: KafkaDeserializationSchema[(EventMetaData,
>> GenericRecord)],
>>>     properties: Properties): FlinkKafkaConsumer[(EventMetaData,
>>> GenericRecord)] = {
>>>     val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
>>>       source.asJava,
>>>       deserialization,
>>>       properties)
>>>     consumer
>>> }
>>>
>>> Any idea thoughts on how to interoperate between java tuple2 and scala
>> case
>>> class ? Also using 1.9.1 version of flink-connector-kafka while the rest
>> of
>>> the cluster uses 1.7.2 version of flink.
>>>
>>> Best,
>>> Nick.
>>>
>>
>>
> 


Mime
View raw message