flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Lublinsky <boris.lublin...@lightbend.com>
Subject FlinkKafkaProducer usage
Date Thu, 02 Feb 2017 00:07:41 GMT
I am trying to write a quick sample of streaming word count using Beam APIs and FlinkBeamRunner.
The problem that I am getting into is that 
apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink)))
Does not work in this way - it assumes bounded stream and mine is unbounded.

I have not found any unbounded equivalent for Write, So I tried to implement a custom ParDo
function:

/**
 * Write content to Kafka.
 *
 */
static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, Tuple2<String,
Integer>> {

    private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink;
    private boolean opened = false;

    public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink){
        this.kafkaSink = kafkaSink;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        if(!opened){
            kafkaSink.open(new Configuration());
            opened = true;
        }
        Tuple2<String, Integer> record = c.element();
        try {
            kafkaSink.invoke(record);
        }catch(Throwable t){
            System.out.println("Error writing record " + record + " to Kafka");
            t.printStackTrace();
        }
    }
}


The problem with this approach is that ParDo is not initialized with Streaming context, that
FlinkKafkaConsumer relies upon, so open fails.


Any suggestions? 
Mime
View raw message