flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chakravarthy varaga <chakravarth...@gmail.com>
Subject Re: Flink Checkpoint runs slow for low load stream
Date Tue, 04 Oct 2016 17:20:12 GMT
Thanks for your prompt response Stephan.

    I'd wait for Flink 1.1.3 !!!

Best Regards
Varaga

On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <sewen@apache.org> wrote:

> The plan to release 1.1.3 is asap ;-)
>
> Waiting for last backported patched to get in, then release testing and
> release.
>
> If you want to test it today, you would need to manually build the
> release-1.1 branch.
>
> Best,
> Stephan
>
>
> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
> chakravarthyvp@gmail.com> wrote:
>
>> Hi Gordon,
>>
>>      Do I need to clone and build release-1.1 branch to test this?
>>      I currently use flinlk 1.1.2 runtime. When is the plan to release it
>> in 1.1.3?
>>
>> Best Regards
>> Varaga
>>
>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Helping out here: this is the PR for async Kafka offset committing -
>>> https://github.com/apache/flink/pull/2574.
>>> It has already been merged into the master and release-1.1 branches, so
>>> you can try out the changes now if you’d like.
>>> The change should also be included in the 1.1.3 release, which the Flink
>>> community is discussing to release soon.
>>>
>>> Will definitely be helpful if you can provide feedback afterwards!
>>>
>>> Best Regards,
>>> Gordon
>>>
>>>
>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>> chakravarthyvp@gmail.com) wrote:
>>>
>>> Hi Stephan,
>>>
>>>     Is the Async kafka offset commit released in 1.3.1?
>>>
>>> Varaga
>>>
>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>> chakravarthyvp@gmail.com> wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>>      That should be great. Let me know once the fix is done and the
>>>> snapshot version to use, I'll check and revert then.
>>>>      Can you also share the JIRA that tracks the issue?
>>>>
>>>>      With regards to offset commit issue, I'm not sure as to how to
>>>> proceed here. Probably I'll use your fix first and see if the problem
>>>> reoccurs.
>>>>
>>>> Thanks much
>>>> Varaga
>>>>
>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>>> @CVP
>>>>>
>>>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>>>> bytes) and the custom state (e).
>>>>>
>>>>> Here is an illustration of the checkpoint and what is stored (from the
>>>>> Flink docs).
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>> nals/stream_checkpointing.html
>>>>>
>>>>>
>>>>> I am quite puzzled why the offset committing problem occurs only for
>>>>> one input, and not for the other.
>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>>> Could you try out a snapshot version to see if that fixes your problem?
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>>      Thanks a million for your detailed explanation. I appreciate
it.
>>>>>>
>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>>>>>> zookeeper. There is only 1 instance (standalone) of zookeeper running
on my
>>>>>> localhost (ubuntu 14.04)
>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>>
>>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs
started
>>>>>> with no HA. I presume this does not use zookeeper anyways as it runs
as
>>>>>> standalone cluster.
>>>>>>
>>>>>>
>>>>>>      BTW., The kafka connector version that I use is as suggested
in
>>>>>> the flink connectors page
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *.        <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>
>>>>>>      Do you see any issues with versions?
>>>>>>
>>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>>
>>>>>>      2) There isn't detailed explanation on what states are stored
as
>>>>>> part of the checkpointing process. For ex.,  If I have pipeline like
>>>>>> *source -> map -> keyBy -> map -> sink, my assumption
on what's
>>>>>> stored is:*
>>>>>>
>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>
>>>>>> *         b) Intermediate states of each of the transformations in
>>>>>> the pipeline*
>>>>>>
>>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>>
>>>>>> *         d) Custom States (SayValueState as in my case) -
>>>>>> Essentially this is what I bother about storing.*
>>>>>> *         e) All of my operators*
>>>>>>
>>>>>>       Is my understanding right?
>>>>>>
>>>>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>>>>
>>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>>> operators (say I wish to store aggregated values part of the transformation)
>>>>>>
>>>>>> Best Regards
>>>>>> CVP
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <sewen@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks, the logs were very helpful!
>>>>>>>
>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>>>>> proper starting of checkpoints.
>>>>>>>
>>>>>>> Here is what is happening in detail:
>>>>>>>
>>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>>> checkpoint" message and when the point when the KafkaSource actually
starts
>>>>>>> the checkpoint is a long time (many seconds) - for one of the
Kafka Inputs
>>>>>>> (the other is fine).
>>>>>>>   - The only way this delayed can be introduced is if another
>>>>>>> checkpoint related operation (such as trigger() or notifyComplete()
) is
>>>>>>> still in progress when the checkpoint is started. Flink does
not perform
>>>>>>> concurrent checkpoint operations on a single operator, to ease
the
>>>>>>> concurrency model for users.
>>>>>>>   - The operation that is still in progress must be the committing
>>>>>>> of the offsets (to ZooKeeper or Kafka). That also explains why
this only
>>>>>>> happens once one side receives the first record. Before that,
there is
>>>>>>> nothing to commit.
>>>>>>>
>>>>>>>
>>>>>>> What Flink should fix:
>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()"
method.
>>>>>>>
>>>>>>> What you can fix:
>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input
works
>>>>>>> well, the other does not. Do they go against different sets of
brokers, or
>>>>>>> different ZooKeepers? Is the metadata for one input bad?
>>>>>>>   - In the next Flink version, you may opt-out of committing
offsets
>>>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>>>> checkpoints anyways.
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Stefan,
>>>>>>>>
>>>>>>>>     Please find my responses below.
>>>>>>>>
>>>>>>>>     - What source are you using for the slow input?
>>>>>>>> *     [CVP] - Both stream as pointed out in my first mail,
are
>>>>>>>> Kafka Streams*
>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>
>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment
as
>>>>>>>> below.*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>
>>>>>>>>
>>>>>>>> *      In terms of the state stored, the KS1 stream has payload
of
>>>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes...
basically
>>>>>>>> the operators perform flatmaps on 8 fields of tuple (all
fields are
>>>>>>>> primitives). If you look at the states' sizes in dashboard
they are in
>>>>>>>> Kb... *
>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>>> computation in the flink dashboard. 100K/second is low load
for flink's
>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot
state. I have
>>>>>>>> attached the Task Manager log (DEBUG) info if that will interest
you.
>>>>>>>>
>>>>>>>>      I have attached the checkpoints times' as .png from
the
>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 &
29 &30- you'd
>>>>>>>> see that the checkpoints take more than a minute in each
case. Before these
>>>>>>>> checkpoints, the KS2 stream did not have any events. As soon
as an
>>>>>>>> event(should be in bytes) was generated, the checkpoints
went slow and
>>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>>
>>>>>>>>    This log was collected from the standalone flink cluster
with 1
>>>>>>>> job manager & 2 TMs. 1 TM was running this application
with checkpointing
>>>>>>>> (parallelism=1)
>>>>>>>>
>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <sewen@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi!
>>>>>>>>>
>>>>>>>>> Let's try to figure that one out. Can you give us a bit
more
>>>>>>>>> information?
>>>>>>>>>
>>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>   - Can you try to see in the log if actually the state
snapshot
>>>>>>>>> takes that long, or if it simply takes long for the checkpoint
barriers to
>>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhueske@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi CVP,
>>>>>>>>>>
>>>>>>>>>> I'm not so much familiar with the internals of the
checkpointing
>>>>>>>>>> system, but maybe Stephan (in CC) has an idea what's
going on here.
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>
>>>>>>>>>>>     I have a stream application that has 2 stream
source as
>>>>>>>>>>> below.
>>>>>>>>>>>
>>>>>>>>>>>      KeyedStream<String, String> *ks1*
= ds1.keyBy("*") ;
>>>>>>>>>>>      KeyedStream<Tuple2<String, V>,
String> *ks2* =
>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>
>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>      //X is a CoFlatMapFunction that inserts
and removes
>>>>>>>>>>> elements from ks2 into a key-value state member.
Elements from ks1 are
>>>>>>>>>>> matched against that state. the CoFlatMapFunction
operator maintains
>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>
>>>>>>>>>>>      //ks1 is streaming about 100K events/sec
from kafka topic
>>>>>>>>>>>      //ks2 is streaming about 1 event every 10
minutes...
>>>>>>>>>>> Precisely when the 1st event is consumed from
this stream, checkpoint takes
>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>
>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>
>>>>>>>>>>> I tried to use checkpoint every 10 Secs using
a
>>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint
duration is almost 2
>>>>>>>>>>> minutes for many cases, while for the other cases
it varies from 100 ms to
>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot
of the dashboard for
>>>>>>>>>>> your reference.
>>>>>>>>>>>
>>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>>
>>>>>>>>>>>  Best Regards
>>>>>>>>>>> CVP
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message