flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shannon Carey <sca...@expedia.com>
Subject Re: Reg. custom sinks in Flink
Date Fri, 09 Dec 2016 18:54:59 GMT
You haven't really added a sink in Flink terminology, you're just performing a side effect
within a map operator. So while it may work, if you want to add a sink proper you need have
an object that extends SinkFunction or RichSinkFunction. The method call on the stream should
be ".addSink(…)".

Also, the dbSession isn't really Flink state as it will not vary based on the position in
or content in the stream. It's a necessary helper object, yes, but you don't need Flink to
checkpoint it.

You can still use the sinks provided with flink-connector-cassandra and customize the cluster
building by passing your own ClusterBuilder into the constructor.

-Shannon

From: Meghashyam Sandeep V <vr1meghashyam@gmail.com<mailto:vr1meghashyam@gmail.com>>
Date: Friday, December 9, 2016 at 12:26 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>, <dev@flink.apache.org<mailto:dev@flink.apache.org>>
Subject: Reg. custom sinks in Flink

Hi there,

I have a flink streaming app where my source is Kafka and a custom sink to Cassandra(I can't
use standard C* sink that comes with flink as I have customized auth to C*). I'm currently
have the following:


messageStream
        .rebalance()

        .map( s-> {

    return mapper.readValue(s, JsonNode.class);)

        .filter(//filter some messages)

        .map(

         (MapFunction<JsonNode, String>) message -> {

         getDbSession.execute("QUERY_TO_EXEC")

         })

private static Session getDbSession() {
    if(dbSession == null && store!=null) {
        dbSession = getSession();
    }

    return dbSession;
}

1. Is this the right way to add a custom sink? As you can see, I have dbSession as a class
variable here and I'm storing its state.

2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I run using flink
with YARN on EMR I get a NPE at the session which is kind of weird.


Thanks
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message