flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Meghashyam Sandeep V <vr1meghash...@gmail.com>
Subject Re: Reg. custom sinks in Flink
Date Mon, 12 Dec 2016 15:15:39 GMT
Hi Till,

Thanks for the information.

1. What do you mean by 'subtask', is it every partition or every message in
the stream?

2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL
as I can't use a query when I have a datastream with Pojo?

CassandraSink.addSink(messageStream)
         .setClusterBuilder(new ClusterBuilder() {
             @Override
             protected Cluster buildCluster(Cluster.Builder builder) {
                 return buildCassandraCluster();
             }
         })
         .build();


Thanks,


On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <trohrmann@apache.org> wrote:

> Hi Meghashyam,
>
>    1.
>
>    You can perform initializations in the open method of the
>    RichSinkFunction interface. The open method will be called once for
>    every sub task when initializing it. If you want to share the resource
>    across multiple sub tasks running in the same JVM you can also store the
>    dbSession in a class variable.
>    2.
>
>    The Flink community is currently working on adding security support
>    including ssl encryption to Flink. So maybe in the future you can use
>    Flink’s Cassandra sink again.
>
> Cheers,
> Till
> ​
>
> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
> vr1meghashyam@gmail.com> wrote:
>
>> Thanks a lot for the quick reply Shannon.
>>
>> 1. I will create a class that extends SinkFunction and write my
>> connection logic there. My only question here is- will a dbSession be
>> created for each message/partition which might affect the performance?
>> Thats the reason why I added this line to create a connection once and use
>> it along the datastream. if(dbSession == null && store!=null) { dbSession
>> = getSession();}
>>
>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for my
>> C* cluster and I couldn't get it work with all my SSL
>> config(truststore,keystore etc) added to cluster building. I didn't find a
>> proper example with SSL enabled flink-connector-cassandra
>>
>>
>> Thanks
>>
>>
>>
>>
>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <scarey@expedia.com>
>> wrote:
>>
>>> You haven't really added a sink in Flink terminology, you're just
>>> performing a side effect within a map operator. So while it may work, if
>>> you want to add a sink proper you need have an object that extends
>>> SinkFunction or RichSinkFunction. The method call on the stream should be
>>> ".addSink(…)".
>>>
>>> Also, the dbSession isn't really Flink state as it will not vary based
>>> on the position in or content in the stream. It's a necessary helper
>>> object, yes, but you don't need Flink to checkpoint it.
>>>
>>> You can still use the sinks provided with flink-connector-cassandra and
>>> customize the cluster building by passing your own ClusterBuilder into the
>>> constructor.
>>>
>>> -Shannon
>>>
>>> From: Meghashyam Sandeep V <vr1meghashyam@gmail.com>
>>> Date: Friday, December 9, 2016 at 12:26 PM
>>> To: <user@flink.apache.org>, <dev@flink.apache.org>
>>> Subject: Reg. custom sinks in Flink
>>>
>>> Hi there,
>>>
>>> I have a flink streaming app where my source is Kafka and a custom sink
>>> to Cassandra(I can't use standard C* sink that comes with flink as I have
>>> customized auth to C*). I'm currently have the following:
>>>
>>> messageStream
>>>         .rebalance()
>>>
>>>         .map( s-> {
>>>
>>>     return mapper.readValue(s, JsonNode.class);)
>>>
>>>         .filter(//filter some messages)
>>>
>>>         .map(
>>>
>>>          (MapFunction<JsonNode, String>) message -> {
>>>
>>>          getDbSession.execute("QUERY_TO_EXEC")
>>>
>>>          })
>>>
>>> private static Session getDbSession() {
>>>     if(dbSession == null && store!=null) {
>>>         dbSession = getSession();
>>>     }
>>>
>>>     return dbSession;
>>> }
>>>
>>> 1. Is this the right way to add a custom sink? As you can see, I have dbSession
as a class variable here and I'm storing its state.
>>>
>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I
run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>>>
>>>
>>> Thanks
>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message