flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Reg. custom sinks in Flink
Date Mon, 12 Dec 2016 15:56:43 GMT
(1) A subtask is a parallel instance of an operator and thus responsible
for a partition (possibly infinite) of the whole DataStream/DataSet.

(2) Maybe you can add this feature to Flink's Cassandra Sink.

Cheers,
Till

On Mon, Dec 12, 2016 at 4:30 PM, Meghashyam Sandeep V <
vr1meghashyam@gmail.com> wrote:

> Data piles up in Cassandra without TTL. Is there a workaround for this
> problem? Is there a way to specify my query and still use Pojo?
>
> Thanks,
>
> On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler <chesnay@apache.org>
> wrote:
>
>> Regarding 2) I don't think so. That would require access to the datastax
>> MappingManager.
>> We could add something similar as the ClusterBuilder for that though.
>>
>> Regards,
>> Chesnay
>>
>>
>> On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
>>
>> Hi Till,
>>
>> Thanks for the information.
>>
>> 1. What do you mean by 'subtask', is it every partition or every message
>> in the stream?
>>
>> 2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL
>> as I can't use a query when I have a datastream with Pojo?
>>
>> CassandraSink.addSink(messageStream)
>>          .setClusterBuilder(new ClusterBuilder() {
>>              @Override             protected Cluster buildCluster(Cluster.Builder
builder) {
>>                  return buildCassandraCluster();
>>              }
>>          })
>>          .build();
>>
>> Thanks,
>>
>>
>> On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <trohrmann@apache.org>
>> wrote:
>>
>>> Hi Meghashyam,
>>>
>>>    1.
>>>
>>>    You can perform initializations in the open method of the
>>>    RichSinkFunction interface. The open method will be called once for
>>>    every sub task when initializing it. If you want to share the resource
>>>    across multiple sub tasks running in the same JVM you can also store the
>>>    dbSession in a class variable.
>>>    2.
>>>
>>>    The Flink community is currently working on adding security support
>>>    including ssl encryption to Flink. So maybe in the future you can use
>>>    Flink’s Cassandra sink again.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
>>> <vr1meghashyam@gmail.com>vr1meghashyam@gmail.com> wrote:
>>>
>>>> Thanks a lot for the quick reply Shannon.
>>>>
>>>> 1. I will create a class that extends SinkFunction and write my
>>>> connection logic there. My only question here is- will a dbSession be
>>>> created for each message/partition which might affect the performance?
>>>> Thats the reason why I added this line to create a connection once and use
>>>> it along the datastream. if(dbSession == null && store!=null) { dbSession
>>>> = getSession();}
>>>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for
>>>> my C* cluster and I couldn't get it work with all my SSL
>>>> config(truststore,keystore etc) added to cluster building. I didn't find
a
>>>> proper example with SSL enabled flink-connector-cassandra
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <scarey@expedia.com>
>>>> wrote:
>>>>
>>>>> You haven't really added a sink in Flink terminology, you're just
>>>>> performing a side effect within a map operator. So while it may work,
if
>>>>> you want to add a sink proper you need have an object that extends
>>>>> SinkFunction or RichSinkFunction. The method call on the stream should
be
>>>>> ".addSink(…)".
>>>>>
>>>>> Also, the dbSession isn't really Flink state as it will not vary based
>>>>> on the position in or content in the stream. It's a necessary helper
>>>>> object, yes, but you don't need Flink to checkpoint it.
>>>>>
>>>>> You can still use the sinks provided with flink-connector-cassandra
>>>>> and customize the cluster building by passing your own ClusterBuilder
into
>>>>> the constructor.
>>>>>
>>>>> -Shannon
>>>>>
>>>>> From: Meghashyam Sandeep V < <vr1meghashyam@gmail.com>
>>>>> vr1meghashyam@gmail.com>
>>>>> Date: Friday, December 9, 2016 at 12:26 PM
>>>>> To: < <user@flink.apache.org>user@flink.apache.org>, <
>>>>> dev@flink.apache.org>
>>>>> Subject: Reg. custom sinks in Flink
>>>>>
>>>>> Hi there,
>>>>>
>>>>> I have a flink streaming app where my source is Kafka and a custom
>>>>> sink to Cassandra(I can't use standard C* sink that comes with flink
as I
>>>>> have customized auth to C*). I'm currently have the following:
>>>>>
>>>>> messageStream
>>>>>         .rebalance()
>>>>>
>>>>>         .map( s-> {
>>>>>
>>>>>     return mapper.readValue(s, JsonNode.class);)
>>>>>
>>>>>         .filter(//filter some messages)
>>>>>
>>>>>         .map(
>>>>>
>>>>>          (MapFunction<JsonNode, String>) message -> {
>>>>>
>>>>>          getDbSession.execute("QUERY_TO_EXEC")
>>>>>
>>>>>          })
>>>>>
>>>>> private static Session getDbSession() {
>>>>>     if(dbSession == null && store!=null) {
>>>>>         dbSession = getSession();
>>>>>     }
>>>>>
>>>>>     return dbSession;
>>>>> }
>>>>>
>>>>> 1. Is this the right way to add a custom sink? As you can see, I have
dbSession as a class variable here and I'm storing its state.
>>>>>
>>>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar).
When I run using flink with YARN on EMR I get a NPE at the session which is kind of weird.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>

Mime
View raw message