flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinay patil <vinay18.pa...@gmail.com>
Subject Re: Flink Checkpoint runs slow for low load stream
Date Tue, 27 Sep 2016 15:20:45 GMT
Hi Stephan,

Ok, I think that may be taking lot of time, so when you say everything that
it stores does it mean that all the input to the window  is stored in state
backend.

For Ex: for my apply function, the input is is Iterable<DTO>, the DTO can
contain multiple elements, and the DTO contains roughly 50 fields

So do you mean that the complete DTO will be stored in the state backend ?
If yes then its probably better to use RocksDB as state backend.

Also I am using AWS Client Side Encryption for writing encrypted data to
S3, so may be that is also taking some time.

What do you think ?

Regards,
Vinay Patil

On Tue, Sep 27, 2016 at 3:51 AM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n9189h11@n4.nabble.com> wrote:

> @vinay - Window operators store everything in the state backend.
>
> On Mon, Sep 26, 2016 at 7:34 PM, vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=9189&i=0>> wrote:
>
>> I am not sure about that, I will run the pipeline on cluster and share
>> the details
>> Since window is a stateful operator , it will store only the key part in
>> the state backend and not the value , right ?
>>
>> Regards,
>> Vinay Patil
>>
>> On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=9182&i=0>> wrote:
>>
>>> @vinay - Is it in your case large state that causes slower checkpoints?
>>>
>>> On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9181&i=0>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am also facing this issue, in my case the data is flowing
>>>> continuously from the Kafka source, when I increase the checkpoint interval
>>>> to 60000, the data gets written to S3 sink.
>>>>
>>>> Is it because some operator is taking more time for processing, like in
>>>> my case I am using a time window of 1sec.
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache Flink
>>>> User Mailing List archive.] <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote:
>>>>
>>>>> Hi Stefan,
>>>>>
>>>>>     Please find my responses below.
>>>>>
>>>>>     - What source are you using for the slow input?
>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>> Streams*
>>>>>   - How large is the state that you are checkpointing?
>>>>>
>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>> below.*
>>>>>
>>>>>
>>>>>
>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> streamEnv.setStateBackend(new
>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>> streamEnv.enableCheckpointing(10000);*
>>>>>
>>>>>
>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>> Kb...*
>>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>>> that long, or if it simply takes long for the checkpoint barriers to
>>>>> travel through the stream due to a lot of backpressure?
>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>
>>>>>      I have attached the checkpoints times' as .png from the
>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30-
you'd
>>>>> see that the checkpoints take more than a minute in each case. Before
these
>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>
>>>>>    This log was collected from the standalone flink cluster with 1 job
>>>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>> (parallelism=1)
>>>>>
>>>>>     Please let me know if you need further info.,
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>>
wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>> information?
>>>>>>
>>>>>>   - What source are you using for the slow input?
>>>>>>   - How large is the state that you are checkpointing?
>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>> takes that long, or if it simply takes long for the checkpoint barriers
to
>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>>
wrote:
>>>>>>
>>>>>>> Hi CVP,
>>>>>>>
>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on
here.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>:
>>>>>>>
>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>
>>>>>>>>     I have a stream application that has 2 stream source
as below.
>>>>>>>>
>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*")
;
>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2*
=
>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>
>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes
elements
>>>>>>>> from ks2 into a key-value state member. Elements from ks1
are matched
>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>
>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka
topic
>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
Precisely
>>>>>>>> when the 1st event is consumed from this stream, checkpoint
takes 2 minutes
>>>>>>>> straight away.
>>>>>>>>
>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>
>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>> What I notice is that the checkpoint duration is almost 2
minutes for many
>>>>>>>> cases, while for the other cases it varies from 100 ms to
1.5 minutes
>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for
your reference.
>>>>>>>>
>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>
>>>>>>>>  Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>> *flink_job_Plan.png* (42K) Download Attachment
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png>
>>>>> *Flink-Checkpoint-Times.png* (65K) Download Attachment
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png>
>>>>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download
>>>>> Attachment
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp914
>>>>> 7p9176.html
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email [hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=1>
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here.
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> View this message in context: Re: Flink Checkpoint runs slow for low
>>>> load stream
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> at Nabble.com.
>>>>
>>>
>>>
>>>
>>> ------------------------------
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9181.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9182&i=1>
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Flink Checkpoint runs slow for low
>> load stream
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9182.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-
> Checkpoint-runs-slow-for-low-load-stream-tp9147p9189.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1h83@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9211.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Mime
View raw message