flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Reg. custom sink for Flink streaming
Date Thu, 03 Nov 2016 13:50:26 GMT
Hi,

a MapFunction should be the way to go for this use case.
What exactly is not working? Do you get an exception? Is the map method not
called?

Best, Fabian

2016-11-03 0:00 GMT+01:00 Sandeep Vakacharla <svakacharla@fanatics.com>:

> Hi there,
>
>
>
> I have the following use case-
>
>
>
> I have data coming from Kafka which I need to stream and write each
> message to a database. I’m using kafka-flink connector for streaming data
> from Kafka. I don’t want to use flink sinks to write date from stream.
>
>
>
> I’m doing the following which doesn’t seem to work-
>
>
>
> messageStream
>         .rebalance()
>         .map(*new *MapFunction<String, Object>() {
>             @Override
>             *public *String map(String value) {
>                 getDbSession().execute(*"insert into TABLE_XXX (key,
> event_timeuuid, data) " *+
>                         *"VALUES ("*+ i+*",null, value); "*);
>                 *return *value;
>             }
>         })
>
>
>
> How can I iterate over each message in the stream and do something with
> that message?
>
>
>
> Thanks
>
>
> Information contained in this e-mail message is confidential. This e-mail
> message is intended only for the personal use of the recipient(s) named
> above. If you are not an intended recipient, do not read, distribute or
> reproduce this transmission (including any attachments). If you have
> received this email in error, please immediately notify the sender by email
> reply and delete the original message.
>

Mime
View raw message