flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Reg. custom sinks in Flink
Date Mon, 12 Dec 2016 15:26:43 GMT
Regarding 2) I don't think so. That would require access to the datastax 
MappingManager.
We could add something similar as the ClusterBuilder for that though.

Regards,
Chesnay

On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
> Hi Till,
>
> Thanks for the information.
>
> 1. What do you mean by 'subtask', is it every partition or every 
> message in the stream?
>
> 2. I tried using CassandraSink with a Pojo. Is there a way to specify 
> TTL as I can't use a query when I have a datastream with Pojo?
>
> CassandraSink.addSink(messageStream)
>           .setClusterBuilder(new ClusterBuilder() {
>               @Override protected Cluster buildCluster(Cluster.Builder builder) {
>                   return buildCassandraCluster();
>               }
>           })
>           .build();
> Thanks,
>
> On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <trohrmann@apache.org 
> <mailto:trohrmann@apache.org>> wrote:
>
>     Hi Meghashyam,
>
>     1.
>
>         You can perform initializations in the open method of the
>         |RichSinkFunction| interface. The |open| method will be called
>         once for every sub task when initializing it. If you want to
>         share the resource across multiple sub tasks running in the
>         same JVM you can also store the |dbSession| in a class variable.
>
>     2.
>
>         The Flink community is currently working on adding security
>         support including ssl encryption to Flink. So maybe in the
>         future you can use Flink’s Cassandra sink again.
>
>     Cheers,
>     Till
>
>     ​
>
>     On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V
>     <vr1meghashyam@gmail.com <mailto:vr1meghashyam@gmail.com>> wrote:
>
>         Thanks a lot for the quick reply Shannon.
>
>         1. I will create a class that extends SinkFunction and write
>         my connection logic there. My only question here is- will a
>         dbSession be created for each message/partition which might
>         affect the performance? Thats the reason why I added this line
>         to create a connection once and use it along the datastream.
>         if(dbSession == null && store!=null) { dbSession = getSession();}
>         2. I couldn't use flink-connector-cassandra as I have SSL
>         enabled for my C* cluster and I couldn't get it work with all
>         my SSL config(truststore,keystore etc) added to cluster
>         building. I didn't find a proper example with SSL enabled
>         flink-connector-cassandra
>
>
>         Thanks
>
>
>
>
>         On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey
>         <scarey@expedia.com <mailto:scarey@expedia.com>> wrote:
>
>             You haven't really added a sink in Flink terminology,
>             you're just performing a side effect within a map
>             operator. So while it may work, if you want to add a sink
>             proper you need have an object that extends SinkFunction
>             or RichSinkFunction. The method call on the stream should
>             be ".addSink(…)".
>
>             Also, the dbSession isn't really Flink state as it will
>             not vary based on the position in or content in the
>             stream. It's a necessary helper object, yes, but you don't
>             need Flink to checkpoint it.
>
>             You can still use the sinks provided with
>             flink-connector-cassandra and customize the cluster
>             building by passing your own ClusterBuilder into the
>             constructor.
>
>             -Shannon
>
>             From: Meghashyam Sandeep V <vr1meghashyam@gmail.com
>             <mailto:vr1meghashyam@gmail.com>>
>             Date: Friday, December 9, 2016 at 12:26 PM
>             To: <user@flink.apache.org
>             <mailto:user@flink.apache.org>>, <dev@flink.apache.org
>             <mailto:dev@flink.apache.org>>
>             Subject: Reg. custom sinks in Flink
>
>             Hi there,
>
>             I have a flink streaming app where my source is Kafka and
>             a custom sink to Cassandra(I can't use standard C* sink
>             that comes with flink as I have customized auth to C*).
>             I'm currently have the following:
>
>             messageStream
>                      .rebalance()
>
>                      .map( s-> {
>
>                  returnmapper.readValue(s, JsonNode.class);)
>
>                      .filter(//filter some messages)
>
>                      .map(
>
>                       (MapFunction<JsonNode, String>) message -> {
>
>                       getDbSession.execute("QUERY_TO_EXEC")
>
>                       })
>
>             private static Session getDbSession() {
>                  if(dbSession ==null &&store!=null) {
>                      dbSession = getSession();
>                  }
>
>                  return dbSession;
>             }
>
>             1. Is this the right way to add a custom sink? As you can see, I have dbSession
as a class variable here and I'm storing its state.
>
>             2. This setup works fine in a standalone flink (java -jar MyJar.jar). When
I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>
>             Thanks
>

Mime
View raw message