flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Meghashyam Sandeep V <vr1meghash...@gmail.com>
Subject Re: Reg. custom sinks in Flink
Date Fri, 09 Dec 2016 19:05:30 GMT
Thanks a lot for the quick reply Shannon.

1. I will create a class that extends SinkFunction and write my connection
logic there. My only question here is- will a dbSession be created for each
message/partition which might affect the performance? Thats the reason why
I added this line to create a connection once and use it along the
datastream. if(dbSession == null && store!=null) { dbSession =

2. I couldn't use flink-connector-cassandra as I have SSL enabled for my C*
cluster and I couldn't get it work with all my SSL
config(truststore,keystore etc) added to cluster building. I didn't find a
proper example with SSL enabled flink-connector-cassandra


On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <scarey@expedia.com> wrote:

> You haven't really added a sink in Flink terminology, you're just
> performing a side effect within a map operator. So while it may work, if
> you want to add a sink proper you need have an object that extends
> SinkFunction or RichSinkFunction. The method call on the stream should be
> ".addSink(…)".
> Also, the dbSession isn't really Flink state as it will not vary based on
> the position in or content in the stream. It's a necessary helper object,
> yes, but you don't need Flink to checkpoint it.
> You can still use the sinks provided with flink-connector-cassandra and
> customize the cluster building by passing your own ClusterBuilder into the
> constructor.
> -Shannon
> From: Meghashyam Sandeep V <vr1meghashyam@gmail.com>
> Date: Friday, December 9, 2016 at 12:26 PM
> To: <user@flink.apache.org>, <dev@flink.apache.org>
> Subject: Reg. custom sinks in Flink
> Hi there,
> I have a flink streaming app where my source is Kafka and a custom sink to
> Cassandra(I can't use standard C* sink that comes with flink as I have
> customized auth to C*). I'm currently have the following:
> messageStream
>         .rebalance()
>         .map( s-> {
>     return mapper.readValue(s, JsonNode.class);)
>         .filter(//filter some messages)
>         .map(
>          (MapFunction<JsonNode, String>) message -> {
>          getDbSession.execute("QUERY_TO_EXEC")
>          })
> private static Session getDbSession() {
>     if(dbSession == null && store!=null) {
>         dbSession = getSession();
>     }
>     return dbSession;
> }
> 1. Is this the right way to add a custom sink? As you can see, I have dbSession as a
class variable here and I'm storing its state.
> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using
flink with YARN on EMR I get a NPE at the session which is kind of weird.
> Thanks

View raw message