Hi Terry Wang,

Thanks for quick reply.

I would like to understand more on your line " If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only".
If your statement defines retract mode could not be used for Kafka sinks as it implements AppendStreamTableSink, but then the below code is working for me, dumping data to Kafka:
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable, Row.class).map(t -> {
Row r = t.f1;
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", r.getField(0).toString());
node.put("destination.ip", r.getField(1).toString());
node.put("cnt", Long.parseLong(r.getField(2).toString()));
return node.toString();
});
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

outStreamAgg.addSink(new FlinkKafkaProducer<String>("reconMultiAttempFail", new SimpleStringSchema(),
kafkaProducerProperties));

Is it that the above functionality works only with Table API and not with SQL?
Please explain.

Thanks
Srikanth



On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <zjuwangg@gmail.com> wrote:
Hi srikanth~

The Flink SQL update-mode is inferred from the target table type.
For now, there are three StreamTableSink type, `AppendStreamTableSink` `UpsertStreamTableSink` and `RetractStreamTableSink`. 
If the target table is a type of Kafka which implments AppendStreamTableSink, the update-mode will be append only. 
So if you want enable retract-mode you may need to insert into one kind of RetractStreamTableSink.
Hope it helps you ~



Best,
Terry Wang



在 2019年9月26日,下午2:50,srikanth flink <flink.devv@gmail.com> 写道:

How could I configure environment file for Flink SQL, update-mode: retract?

I have this for append:
properties:        
        - key: zookeeper.connect
          value: localhost:2181
        - key: bootstrap.servers
          value: localhost:9092
        - key: group.id
          value: reconMultiAttempFail
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'a': {
               type: 'string'
            },
            'b': {
               type: 'string'
            },
            'cnt': {
               type: 'string'
            }
          }
        }
      derive-schema: false

    schema:
      - name: 'a'
        type: VARCHAR
     - name: 'b'
        type: VARCHAR
      - name: 'cnt'
        type: BIGINT

Couldn't find any document for the same.

someone help me with the syntax.

Thanks
Srikanth