flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chakravarthy varaga <chakravarth...@gmail.com>
Subject Re: Flink Checkpoint runs slow for low load stream
Date Wed, 04 Jan 2017 15:51:18 GMT
Hi Stephan, All,

     I just got a chance to try if 1.1.3 fixes slow check pointing on FS
backend. It seemed to have been fixed. Thanks for the fix.

     While testing this, with varying check point intervals, there seem to
be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
     Basically 15 secs seem to be the nominal value so far. anything below
this interval shoots the spikes too often. For us living with 15 sec
recovery is do-able and eventually catch up on recovery !

Best Regards
CVP

On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
chakravarthyvp@gmail.com> wrote:

> Thanks for your prompt response Stephan.
>
>     I'd wait for Flink 1.1.3 !!!
>
> Best Regards
> Varaga
>
> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> The plan to release 1.1.3 is asap ;-)
>>
>> Waiting for last backported patched to get in, then release testing and
>> release.
>>
>> If you want to test it today, you would need to manually build the
>> release-1.1 branch.
>>
>> Best,
>> Stephan
>>
>>
>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
>> chakravarthyvp@gmail.com> wrote:
>>
>>> Hi Gordon,
>>>
>>>      Do I need to clone and build release-1.1 branch to test this?
>>>      I currently use flinlk 1.1.2 runtime. When is the plan to release
>>> it in 1.1.3?
>>>
>>> Best Regards
>>> Varaga
>>>
>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> Helping out here: this is the PR for async Kafka offset committing -
>>>> https://github.com/apache/flink/pull/2574.
>>>> It has already been merged into the master and release-1.1 branches, so
>>>> you can try out the changes now if you’d like.
>>>> The change should also be included in the 1.1.3 release, which the
>>>> Flink community is discussing to release soon.
>>>>
>>>> Will definitely be helpful if you can provide feedback afterwards!
>>>>
>>>> Best Regards,
>>>> Gordon
>>>>
>>>>
>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>>> chakravarthyvp@gmail.com) wrote:
>>>>
>>>> Hi Stephan,
>>>>
>>>>     Is the Async kafka offset commit released in 1.3.1?
>>>>
>>>> Varaga
>>>>
>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com> wrote:
>>>>
>>>>> Hi Stephan,
>>>>>
>>>>>      That should be great. Let me know once the fix is done and the
>>>>> snapshot version to use, I'll check and revert then.
>>>>>      Can you also share the JIRA that tracks the issue?
>>>>>
>>>>>      With regards to offset commit issue, I'm not sure as to how to
>>>>> proceed here. Probably I'll use your fix first and see if the problem
>>>>> reoccurs.
>>>>>
>>>>> Thanks much
>>>>> Varaga
>>>>>
>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <sewen@apache.org>
>>>>> wrote:
>>>>>
>>>>>> @CVP
>>>>>>
>>>>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>>>>> bytes) and the custom state (e).
>>>>>>
>>>>>> Here is an illustration of the checkpoint and what is stored (from
>>>>>> the Flink docs).
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>>> nals/stream_checkpointing.html
>>>>>>
>>>>>>
>>>>>> I am quite puzzled why the offset committing problem occurs only
for
>>>>>> one input, and not for the other.
>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>>>> Could you try out a snapshot version to see if that fixes your
>>>>>> problem?
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>>      Thanks a million for your detailed explanation. I appreciate
it.
>>>>>>>
>>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to
>>>>>>> start zookeeper. There is only 1 instance (standalone) of zookeeper
running
>>>>>>> on my localhost (ubuntu 14.04)
>>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>>>
>>>>>>>      With regards to Flink cluster there's only 1 JM & 2
TMs started
>>>>>>> with no HA. I presume this does not use zookeeper anyways as
it runs as
>>>>>>> standalone cluster.
>>>>>>>
>>>>>>>
>>>>>>>      BTW., The kafka connector version that I use is as suggested
in
>>>>>>> the flink connectors page
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *.        <dependency>
>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>>
>>>>>>>      Do you see any issues with versions?
>>>>>>>
>>>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>>>
>>>>>>>      2) There isn't detailed explanation on what states are stored
>>>>>>> as part of the checkpointing process. For ex.,  If I have pipeline
like
>>>>>>> *source -> map -> keyBy -> map -> sink, my assumption
on what's
>>>>>>> stored is:*
>>>>>>>
>>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>>
>>>>>>> *         b) Intermediate states of each of the transformations
in
>>>>>>> the pipeline*
>>>>>>>
>>>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>>>
>>>>>>> *         d) Custom States (SayValueState as in my case) -
>>>>>>> Essentially this is what I bother about storing.*
>>>>>>> *         e) All of my operators*
>>>>>>>
>>>>>>>       Is my understanding right?
>>>>>>>
>>>>>>>      3) Is there a way in Flink to checkpoint only d) as stated
above
>>>>>>>
>>>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>>>> operators (say I wish to store aggregated values part of the
transformation)
>>>>>>>
>>>>>>> Best Regards
>>>>>>> CVP
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <sewen@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks, the logs were very helpful!
>>>>>>>>
>>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and
>>>>>>>> prevents proper starting of checkpoints.
>>>>>>>>
>>>>>>>> Here is what is happening in detail:
>>>>>>>>
>>>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>>>> checkpoint" message and when the point when the KafkaSource
actually starts
>>>>>>>> the checkpoint is a long time (many seconds) - for one of
the Kafka Inputs
>>>>>>>> (the other is fine).
>>>>>>>>   - The only way this delayed can be introduced is if another
>>>>>>>> checkpoint related operation (such as trigger() or notifyComplete()
) is
>>>>>>>> still in progress when the checkpoint is started. Flink does
not perform
>>>>>>>> concurrent checkpoint operations on a single operator, to
ease the
>>>>>>>> concurrency model for users.
>>>>>>>>   - The operation that is still in progress must be the committing
>>>>>>>> of the offsets (to ZooKeeper or Kafka). That also explains
why this only
>>>>>>>> happens once one side receives the first record. Before that,
there is
>>>>>>>> nothing to commit.
>>>>>>>>
>>>>>>>>
>>>>>>>> What Flink should fix:
>>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()"
method.
>>>>>>>>
>>>>>>>> What you can fix:
>>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka
Input
>>>>>>>> works well, the other does not. Do they go against different
sets of
>>>>>>>> brokers, or different ZooKeepers? Is the metadata for one
input bad?
>>>>>>>>   - In the next Flink version, you may opt-out of committing
>>>>>>>> offsets to Kafka/ZooKeeper all together. It is not important
for Flink's
>>>>>>>> checkpoints anyways.
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Stefan,
>>>>>>>>>
>>>>>>>>>     Please find my responses below.
>>>>>>>>>
>>>>>>>>>     - What source are you using for the slow input?
>>>>>>>>> *     [CVP] - Both stream as pointed out in my first
mail, are
>>>>>>>>> Kafka Streams*
>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>
>>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment
as
>>>>>>>>> below.*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *         final StreamExecutionEnvironment streamEnv
=
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *      In terms of the state stored, the KS1 stream has
payload of
>>>>>>>>> 100K events/second, while KS2 have about 1 event / 10
minutes... basically
>>>>>>>>> the operators perform flatmaps on 8 fields of tuple (all
fields are
>>>>>>>>> primitives). If you look at the states' sizes in dashboard
they are in
>>>>>>>>> Kb... *
>>>>>>>>>   - Can you try to see in the log if actually the state
snapshot
>>>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>>>> barriers to travel through the stream due to a lot of
backpressure?
>>>>>>>>>     [CVP] -There are no back pressure atleast from the
sample
>>>>>>>>> computation in the flink dashboard. 100K/second is low
load for flink's
>>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot
state. I have
>>>>>>>>> attached the Task Manager log (DEBUG) info if that will
interest you.
>>>>>>>>>
>>>>>>>>>      I have attached the checkpoints times' as .png from
the
>>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28
& 29 &30- you'd
>>>>>>>>> see that the checkpoints take more than a minute in each
case. Before these
>>>>>>>>> checkpoints, the KS2 stream did not have any events.
As soon as an
>>>>>>>>> event(should be in bytes) was generated, the checkpoints
went slow and
>>>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>>>
>>>>>>>>>    This log was collected from the standalone flink cluster
with 1
>>>>>>>>> job manager & 2 TMs. 1 TM was running this application
with checkpointing
>>>>>>>>> (parallelism=1)
>>>>>>>>>
>>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <sewen@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi!
>>>>>>>>>>
>>>>>>>>>> Let's try to figure that one out. Can you give us
a bit more
>>>>>>>>>> information?
>>>>>>>>>>
>>>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>   - Can you try to see in the log if actually the
state snapshot
>>>>>>>>>> takes that long, or if it simply takes long for the
checkpoint barriers to
>>>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>>>>
>>>>>>>>>> Greetings,
>>>>>>>>>> Stephan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhueske@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi CVP,
>>>>>>>>>>>
>>>>>>>>>>> I'm not so much familiar with the internals of
the checkpointing
>>>>>>>>>>> system, but maybe Stephan (in CC) has an idea
what's going on here.
>>>>>>>>>>>
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>
>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga
<
>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>>
>>>>>>>>>>>>     I have a stream application that has
2 stream source as
>>>>>>>>>>>> below.
>>>>>>>>>>>>
>>>>>>>>>>>>      KeyedStream<String, String> *ks1*
= ds1.keyBy("*") ;
>>>>>>>>>>>>      KeyedStream<Tuple2<String, V>,
String> *ks2* =
>>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>>
>>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>>      //X is a CoFlatMapFunction that inserts
and removes
>>>>>>>>>>>> elements from ks2 into a key-value state
member. Elements from ks1 are
>>>>>>>>>>>> matched against that state. the CoFlatMapFunction
operator maintains
>>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>>
>>>>>>>>>>>>      //ks1 is streaming about 100K events/sec
from kafka topic
>>>>>>>>>>>>      //ks2 is streaming about 1 event every
10 minutes...
>>>>>>>>>>>> Precisely when the 1st event is consumed
from this stream, checkpoint takes
>>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>>
>>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>>
>>>>>>>>>>>> I tried to use checkpoint every 10 Secs using
a
>>>>>>>>>>>> FsStateBackend... What I notice is that the
checkpoint duration is almost 2
>>>>>>>>>>>> minutes for many cases, while for the other
cases it varies from 100 ms to
>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the
snapshot of the dashboard for
>>>>>>>>>>>> your reference.
>>>>>>>>>>>>
>>>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>>>
>>>>>>>>>>>>  Best Regards
>>>>>>>>>>>> CVP
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message