flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srikanth flink <flink.d...@gmail.com>
Subject Re: Flink SQL update-mode set to retract in env file.
Date Thu, 26 Sep 2019 09:47:05 GMT
Hi Terry Wang,

Thanks for quick reply.

I would like to understand more on your line " If the target table is a
type of Kafka which implments AppendStreamTableSink, the update-mode will
be append only".
If your statement defines retract mode could not be used for Kafka sinks as
it implements AppendStreamTableSink, but then the below code is working for
me, dumping data to Kafka:
DataStream<String> outStreamAgg = tableEnv.toRetractStream(resultTable,
Row.class).map(t -> {
Row r = t.f1;
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", r.getField(0).toString());
node.put("destination.ip", r.getField(1).toString());
node.put("cnt", Long.parseLong(r.getField(2).toString()));
return node.toString();
});
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

outStreamAgg.addSink(new FlinkKafkaProducer<String>("reconMultiAttempFail",
new SimpleStringSchema(),
kafkaProducerProperties));

Is it that the above functionality works only with Table API and not with
SQL?
Please explain.

Thanks
Srikanth



On Thu, Sep 26, 2019 at 1:57 PM Terry Wang <zjuwangg@gmail.com> wrote:

> Hi srikanth~
>
> The Flink SQL update-mode is inferred from the target table type.
> For now, there are three StreamTableSink type, `AppendStreamTableSink`
> `UpsertStreamTableSink` and `RetractStreamTableSink`.
> If the target table is a type of Kafka which implments
> AppendStreamTableSink, the update-mode will be append only.
> So if you want enable retract-mode you may need to insert into one kind of
> RetractStreamTableSink.
> Hope it helps you ~
>
>
>
> Best,
> Terry Wang
>
>
>
> 在 2019年9月26日,下午2:50,srikanth flink <flink.devv@gmail.com> 写道:
>
> How could I configure environment file for Flink SQL, update-mode: retract?
>
> I have this for append:
> properties:
>         - key: zookeeper.connect
>           value: localhost:2181
>         - key: bootstrap.servers
>           value: localhost:9092
>         - key: group.id
>           value: reconMultiAttempFail
>     format:
>       type: json
>       fail-on-missing-field: false
>       json-schema: >
>         {
>           type: 'object',
>           properties: {
>             'a': {
>                type: 'string'
>             },
>             'b': {
>                type: 'string'
>             },
>             'cnt': {
>                type: 'string'
>             }
>           }
>         }
>       derive-schema: false
>
>     schema:
>       - name: 'a'
>         type: VARCHAR
>      - name: 'b'
>         type: VARCHAR
>       - name: 'cnt'
>         type: BIGINT
>
> Couldn't find any document for the same.
>
> someone help me with the syntax.
>
> Thanks
> Srikanth
>
>
>

Mime
View raw message