flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kenny Gorman <ke...@eventador.io>
Subject Re: Stumped writing to KafkaJSONSink
Date Wed, 18 Oct 2017 13:07:24 GMT
Yep we hung out and got it working. I should have replied sooner! Thx for the reply.

-kg

> On Oct 18, 2017, at 7:06 AM, Fabian Hueske <fhueske@gmail.com> wrote:
> 
> Hi Kenny,
> 
> this look almost correct. 
> The Table class has a method writeToSink(TableSink) that should address your use case
(so the same as yours but without the TableEnvironment argument).
> 
> Does that work for you?
> If not what kind of error and error message do you get?
> 
> Best, Fabian
> 
> 2017-10-18 1:28 GMT+02:00 Kenny Gorman <kenny@eventador.io>:
>> I am hoping you guys can help me. I am stumped how to actually write to Kafka using
Kafka09JsonTableSink using the Table API. Here is my code below, I am hoping you guys can
shed some light on how this should be done. I don’t see any methods for the actual write
to Kafka. I am probably doing something stupid. TIA.
>> 
>> Thanks!
>> Kenny
>> 
>> // run some SQL to filter results where a key is not null
>> String sql = "SELECT icao FROM flights WHERE icao is not null";
>> tableEnv.registerTableSource("flights", kafkaTableSource);
>> Table result = tableEnv.sql(sql);
>> 
>> // create a partition for the data going into kafka
>> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>> 
>> // create new tablesink of JSON to kafka
>> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>         params.getRequired("write-topic"),
>>         params.getProperties(),
>>         partition);
>> 
>> result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do this, but
no such method..
> 

Mime
View raw message