flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job
Date Wed, 06 May 2020 14:52:33 GMT
Hi,

Flink will not do any casting between types. You either need to output 
to correct (Scala) Tuple type from the deserialization schema or insert 
a step (say a map function) that converts between the two types. The 
Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in 
common when it comes to the type system.

Best,
Aljoscha

On 06.05.20 01:42, Nick Bendtner wrote:
> Hi guys,
> In our flink job we use java source for deserializing a message from kafka
> using a kafka deserializer. Signature is as below.
> 
> 
> public class CustomAvroDeserializationSchema implements
>          KafkaDeserializationSchema<Tuple2<EventMetaData,GenericRecord>>
> 
> The other parts of the streaming job are in scala. When data has to be
> serialized I get this exception
> 
> 
> 
> 
> *java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 cannot
> be cast to scala.Product at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)*
> 
> Here is how I provide type info for serialization in the java
> deserialization class:
> 
> @Override
> public TypeInformation<Tuple2<EventMetaData, GenericRecord>> getProducedType()
{
>      return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new
> GenericRecordAvroTypeInfo(this
>              .writer));
> 
> Here is how I add the kafka source in scala :
> 
> private[flink] def sourceType(
>    deserialization: KafkaDeserializationSchema[(EventMetaData, GenericRecord)],
>    properties: Properties): FlinkKafkaConsumer[(EventMetaData,
> GenericRecord)] = {
>    val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
>      source.asJava,
>      deserialization,
>      properties)
>    consumer
> }
> 
> Any idea thoughts on how to interoperate between java tuple2 and scala case
> class ? Also using 1.9.1 version of flink-connector-kafka while the rest of
> the cluster uses 1.7.2 version of flink.
> 
> Best,
> Nick.
> 


Mime
View raw message