flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Lublinsky <boris.lublin...@lightbend.com>
Subject FlinkKafkaProducer usage
Date Thu, 02 Feb 2017 00:07:41 GMT
I am trying to write a quick sample of streaming word count using Beam APIs and FlinkBeamRunner.
The problem that I am getting into is that 
apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink)))
Does not work in this way - it assumes bounded stream and mine is unbounded.

I have not found any unbounded equivalent for Write, So I tried to implement a custom ParDo

 * Write content to Kafka.
static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, Tuple2<String,
Integer>> {

    private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink;
    private boolean opened = false;

    public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink){
        this.kafkaSink = kafkaSink;

    public void processElement(ProcessContext c) {
            kafkaSink.open(new Configuration());
            opened = true;
        Tuple2<String, Integer> record = c.element();
        try {
        }catch(Throwable t){
            System.out.println("Error writing record " + record + " to Kafka");

The problem with this approach is that ParDo is not initialized with Streaming context, that
FlinkKafkaConsumer relies upon, so open fails.

Any suggestions? 
View raw message