flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kenny Gorman <ke...@eventador.io>
Subject Stumped writing to KafkaJSONSink
Date Tue, 17 Oct 2017 23:28:09 GMT
I am hoping you guys can help me. I am stumped how to actually write to Kafka using Kafka09JsonTableSink
using the Table API. Here is my code below, I am hoping you guys can shed some light on how
this should be done. I don’t see any methods for the actual write to Kafka. I am probably
doing something stupid. TIA.

Thanks!
Kenny

// run some SQL to filter results where a key is not null
String sql = "SELECT icao FROM flights WHERE icao is not null";
tableEnv.registerTableSource("flights", kafkaTableSource);
Table result = tableEnv.sql(sql);

// create a partition for the data going into kafka
FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
        params.getRequired("write-topic"),
        params.getProperties(),
        partition);

result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do this, but no such
method..
Mime
View raw message