flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Flink Checkpoint runs slow for low load stream
Date Tue, 04 Oct 2016 16:36:10 GMT
The plan to release 1.1.3 is asap ;-)

Waiting for last backported patched to get in, then release testing and
release.

If you want to test it today, you would need to manually build the
release-1.1 branch.

Best,
Stephan


On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
chakravarthyvp@gmail.com> wrote:

> Hi Gordon,
>
>      Do I need to clone and build release-1.1 branch to test this?
>      I currently use flinlk 1.1.2 runtime. When is the plan to release it
> in 1.1.3?
>
> Best Regards
> Varaga
>
> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org>
> wrote:
>
>> Hi,
>>
>> Helping out here: this is the PR for async Kafka offset committing -
>> https://github.com/apache/flink/pull/2574.
>> It has already been merged into the master and release-1.1 branches, so
>> you can try out the changes now if you’d like.
>> The change should also be included in the 1.1.3 release, which the Flink
>> community is discussing to release soon.
>>
>> Will definitely be helpful if you can provide feedback afterwards!
>>
>> Best Regards,
>> Gordon
>>
>>
>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>> chakravarthyvp@gmail.com) wrote:
>>
>> Hi Stephan,
>>
>>     Is the Async kafka offset commit released in 1.3.1?
>>
>> Varaga
>>
>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>> chakravarthyvp@gmail.com> wrote:
>>
>>> Hi Stephan,
>>>
>>>      That should be great. Let me know once the fix is done and the
>>> snapshot version to use, I'll check and revert then.
>>>      Can you also share the JIRA that tracks the issue?
>>>
>>>      With regards to offset commit issue, I'm not sure as to how to
>>> proceed here. Probably I'll use your fix first and see if the problem
>>> reoccurs.
>>>
>>> Thanks much
>>> Varaga
>>>
>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> @CVP
>>>>
>>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>>> bytes) and the custom state (e).
>>>>
>>>> Here is an illustration of the checkpoint and what is stored (from the
>>>> Flink docs).
>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>> nals/stream_checkpointing.html
>>>>
>>>>
>>>> I am quite puzzled why the offset committing problem occurs only for
>>>> one input, and not for the other.
>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>> Could you try out a snapshot version to see if that fixes your problem?
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com> wrote:
>>>>
>>>>> Hi Stefan,
>>>>>
>>>>>      Thanks a million for your detailed explanation. I appreciate it.
>>>>>
>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>>>>> zookeeper. There is only 1 instance (standalone) of zookeeper running
on my
>>>>> localhost (ubuntu 14.04)
>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>
>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs started
>>>>> with no HA. I presume this does not use zookeeper anyways as it runs
as
>>>>> standalone cluster.
>>>>>
>>>>>
>>>>>      BTW., The kafka connector version that I use is as suggested in
>>>>> the flink connectors page
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *.        <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>> <version>1.1.1</version>         </dependency>*
>>>>>
>>>>>      Do you see any issues with versions?
>>>>>
>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>
>>>>>      2) There isn't detailed explanation on what states are stored as
>>>>> part of the checkpointing process. For ex.,  If I have pipeline like
>>>>> *source -> map -> keyBy -> map -> sink, my assumption on
what's stored
>>>>> is:*
>>>>>
>>>>> *         a) The source stream's custom watermarked records*
>>>>>
>>>>> *         b) Intermediate states of each of the transformations in the
>>>>> pipeline*
>>>>>
>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>
>>>>> *         d) Custom States (SayValueState as in my case) - Essentially
>>>>> this is what I bother about storing.*
>>>>> *         e) All of my operators*
>>>>>
>>>>>       Is my understanding right?
>>>>>
>>>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>>>
>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>> operators (say I wish to store aggregated values part of the transformation)
>>>>>
>>>>> Best Regards
>>>>> CVP
>>>>>
>>>>>
>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <sewen@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Thanks, the logs were very helpful!
>>>>>>
>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>>>> proper starting of checkpoints.
>>>>>>
>>>>>> Here is what is happening in detail:
>>>>>>
>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>> checkpoint" message and when the point when the KafkaSource actually
starts
>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka
Inputs
>>>>>> (the other is fine).
>>>>>>   - The only way this delayed can be introduced is if another
>>>>>> checkpoint related operation (such as trigger() or notifyComplete()
) is
>>>>>> still in progress when the checkpoint is started. Flink does not
perform
>>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>>> concurrency model for users.
>>>>>>   - The operation that is still in progress must be the committing
of
>>>>>> the offsets (to ZooKeeper or Kafka). That also explains why this
only
>>>>>> happens once one side receives the first record. Before that, there
is
>>>>>> nothing to commit.
>>>>>>
>>>>>>
>>>>>> What Flink should fix:
>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method.
>>>>>>
>>>>>> What you can fix:
>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>>>>> well, the other does not. Do they go against different sets of brokers,
or
>>>>>> different ZooKeepers? Is the metadata for one input bad?
>>>>>>   - In the next Flink version, you may opt-out of committing offsets
>>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>>> checkpoints anyways.
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>>     Please find my responses below.
>>>>>>>
>>>>>>>     - What source are you using for the slow input?
>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are
Kafka
>>>>>>> Streams*
>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>
>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment
as
>>>>>>> below.*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> streamEnv.setStateBackend(new
>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>
>>>>>>>
>>>>>>> *      In terms of the state stored, the KS1 stream has payload
of
>>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes...
basically
>>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields
are
>>>>>>> primitives). If you look at the states' sizes in dashboard they
are in
>>>>>>> Kb... *
>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>> computation in the flink dashboard. 100K/second is low load for
flink's
>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state.
I have
>>>>>>> attached the Task Manager log (DEBUG) info if that will interest
you.
>>>>>>>
>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29
&30- you'd
>>>>>>> see that the checkpoints take more than a minute in each case.
Before these
>>>>>>> checkpoints, the KS2 stream did not have any events. As soon
as an
>>>>>>> event(should be in bytes) was generated, the checkpoints went
slow and
>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>
>>>>>>>    This log was collected from the standalone flink cluster with
1
>>>>>>> job manager & 2 TMs. 1 TM was running this application with
checkpointing
>>>>>>> (parallelism=1)
>>>>>>>
>>>>>>>     Please let me know if you need further info.,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <sewen@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>> information?
>>>>>>>>
>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>> takes that long, or if it simply takes long for the checkpoint
barriers to
>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhueske@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi CVP,
>>>>>>>>>
>>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>>> system, but maybe Stephan (in CC) has an idea what's
going on here.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>
>>>>>>>>>>     I have a stream application that has 2 stream
source as below.
>>>>>>>>>>
>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*")
;
>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String>
*ks2* =
>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>
>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and
removes elements
>>>>>>>>>> from ks2 into a key-value state member. Elements
from ks1 are matched
>>>>>>>>>> against that state. the CoFlatMapFunction operator
maintains
>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>
>>>>>>>>>>      //ks1 is streaming about 100K events/sec from
kafka topic
>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>> Precisely when the 1st event is consumed from this
stream, checkpoint takes
>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>
>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>
>>>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>>>> What I notice is that the checkpoint duration is
almost 2 minutes for many
>>>>>>>>>> cases, while for the other cases it varies from 100
ms to 1.5 minutes
>>>>>>>>>> frequently. I'm attaching the snapshot of the dashboard
for your reference.
>>>>>>>>>>
>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>
>>>>>>>>>>  Best Regards
>>>>>>>>>> CVP
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message