flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Malt <jm...@yelp.com>
Subject ClassCastException when writing to a Kafka stream with Flink + Python
Date Tue, 14 Aug 2018 23:16:07 GMT
Hi,

I'm trying to write to a Kafka stream in a Flink job using the new Python
streaming API.

My program looks like this:

def main(factory):

    props = Properties()
    props.setProperty("bootstrap.servers",configs['kafkaBroker'])

    consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']],
SimpleStringSchema(), props)
    producer = FlinkKafkaProducer010(configs['kafkaWriteTopic'],
SimpleStringSchema(), props)

    env = factory.get_execution_environment()

    stream = env.add_java_source(consumer)

    stream.output() # this works (prints to a .out file)
    stream.add_sink(producer) # producing to this causes the exception

    env.execute()

I'm getting a ClassCastException when trying to output to the
FlinkKafkaProducer:

java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast
to java.lang.String
	at org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)
	at org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355)
	at org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48)
	at org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)


It seems that the Python string isn't getting converted to a
java.lang.String, which should happen automatically in Jython.

I've tried adding a MapFunction that maps each input to String(input)where
String is the constructor for java.lang.String. This made no difference; I
get the same error.

Any ideas?

Thanks,

Joe Malt

Software Engineering Intern
Yelp

Mime
View raw message