flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Terry Wang <zjuwa...@gmail.com>
Subject Re: Flink SQL update-mode set to retract in env file.
Date Thu, 26 Sep 2019 12:19:56 GMT
Hi, Srikanth~

In your code, 
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t
-> {});  has converted the resultTable into a DataStream that’s unrelated with tableApi,
And the following code `outStreamAgg.addSink(…)` is just a normall stream write to a FlinkKafka
sink function.
Your program code is a mixture of table api and dataStream programing not just single Table
API.

Best,
Terry Wang



> 在 2019年9月26日,下午5:47,srikanth flink <flink.devv@gmail.com> 写道:
> 
> Hi Terry Wang,
> 
> Thanks for quick reply.
> 
> I would like to understand more on your line " If the target table is a type of Kafka
which implments AppendStreamTableSink, the update-mode will be append only". 
> If your statement defines retract mode could not be used for Kafka sinks as it implements
AppendStreamTableSink, but then the below code is working for me, dumping data to Kafka:
> DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t
-> {
> Row r = t.f1;
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", r.getField(0).toString());
> node.put("destination.ip", r.getField(1).toString());
> node.put("cnt", Long.parseLong(r.getField(2).toString()));
> return node.toString();
> }); 
> Properties kafkaProducerProperties = new Properties();
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
> 
> outStreamAgg.addSink(new FlinkKafkaProducer<String>("reconMultiAttempFail", new
SimpleStringSchema(),
> kafkaProducerProperties));
> 
> Is it that the above functionality works only with Table API and not with SQL?
> Please explain.
> 
> Thanks
> Srikanth
> 
> 
> 
> On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <zjuwangg@gmail.com <mailto:zjuwangg@gmail.com>>
wrote:
> Hi srikanth~
> 
> The Flink SQL update-mode is inferred from the target table type.
> For now, there are three StreamTableSink type, `AppendStreamTableSink` `UpsertStreamTableSink`
and `RetractStreamTableSink`. 
> If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode
will be append only. 
> So if you want enable retract-mode you may need to insert into one kind of RetractStreamTableSink.
> Hope it helps you ~
> 
> 
> 
> Best,
> Terry Wang
> 
> 
> 
>> 在 2019年9月26日,下午2:50,srikanth flink <flink.devv@gmail.com <mailto:flink.devv@gmail.com>>
写道:
>> 
>> How could I configure environment file for Flink SQL, update-mode: retract?
>> 
>> I have this for append:
>> properties:         
>>         - key: zookeeper.connect
>>           value: localhost:2181
>>         - key: bootstrap.servers
>>           value: localhost:9092
>>         - key: group.id <http://group.id/>
>>           value: reconMultiAttempFail
>>     format:
>>       type: json
>>       fail-on-missing-field: false
>>       json-schema: >
>>         {
>>           type: 'object',
>>           properties: {
>>             'a': {
>>                type: 'string'
>>             },
>>             'b': {
>>                type: 'string'
>>             },
>>             'cnt': {
>>                type: 'string'
>>             }
>>           }
>>         }
>>       derive-schema: false
>> 
>>     schema:
>>       - name: 'a'
>>         type: VARCHAR
>>      - name: 'b'
>>         type: VARCHAR
>>       - name: 'cnt'
>>         type: BIGINT
>> 
>> Couldn't find any document for the same. 
>> 
>> someone help me with the syntax.
>> 
>> Thanks
>> Srikanth
>> 
> 


Mime
View raw message