flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Flink Checkpoint runs slow for low load stream
Date Wed, 04 Jan 2017 17:33:44 GMT
Hi CVP,

we recently release Flink 1.1.4, i.e., the next bugfix release of the 1.1.x
series with major robustness improvements [1].
You might want to give 1.1.4 a try as well.

Best, Fabian

[1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html

2017-01-04 16:51 GMT+01:00 Chakravarthy varaga <chakravarthyvp@gmail.com>:

> Hi Stephan, All,
>
>      I just got a chance to try if 1.1.3 fixes slow check pointing on FS
> backend. It seemed to have been fixed. Thanks for the fix.
>
>      While testing this, with varying check point intervals, there seem to
> be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
> secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
>      Basically 15 secs seem to be the nominal value so far. anything below
> this interval shoots the spikes too often. For us living with 15 sec
> recovery is do-able and eventually catch up on recovery !
>
> Best Regards
> CVP
>
> On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
> chakravarthyvp@gmail.com> wrote:
>
>> Thanks for your prompt response Stephan.
>>
>>     I'd wait for Flink 1.1.3 !!!
>>
>> Best Regards
>> Varaga
>>
>> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> The plan to release 1.1.3 is asap ;-)
>>>
>>> Waiting for last backported patched to get in, then release testing and
>>> release.
>>>
>>> If you want to test it today, you would need to manually build the
>>> release-1.1 branch.
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
>>> chakravarthyvp@gmail.com> wrote:
>>>
>>>> Hi Gordon,
>>>>
>>>>      Do I need to clone and build release-1.1 branch to test this?
>>>>      I currently use flinlk 1.1.2 runtime. When is the plan to release
>>>> it in 1.1.3?
>>>>
>>>> Best Regards
>>>> Varaga
>>>>
>>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <
>>>> tzulitai@apache.org> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Helping out here: this is the PR for async Kafka offset committing -
>>>>> https://github.com/apache/flink/pull/2574.
>>>>> It has already been merged into the master and release-1.1 branches,
>>>>> so you can try out the changes now if you’d like.
>>>>> The change should also be included in the 1.1.3 release, which the
>>>>> Flink community is discussing to release soon.
>>>>>
>>>>> Will definitely be helpful if you can provide feedback afterwards!
>>>>>
>>>>> Best Regards,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>>>> chakravarthyvp@gmail.com) wrote:
>>>>>
>>>>> Hi Stephan,
>>>>>
>>>>>     Is the Async kafka offset commit released in 1.3.1?
>>>>>
>>>>> Varaga
>>>>>
>>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>>      That should be great. Let me know once the fix is done and the
>>>>>> snapshot version to use, I'll check and revert then.
>>>>>>      Can you also share the JIRA that tracks the issue?
>>>>>>
>>>>>>      With regards to offset commit issue, I'm not sure as to how
to
>>>>>> proceed here. Probably I'll use your fix first and see if the problem
>>>>>> reoccurs.
>>>>>>
>>>>>> Thanks much
>>>>>> Varaga
>>>>>>
>>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <sewen@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> @CVP
>>>>>>>
>>>>>>> Flink stores in checkpoints in your case only the Kafka offsets
(few
>>>>>>> bytes) and the custom state (e).
>>>>>>>
>>>>>>> Here is an illustration of the checkpoint and what is stored
(from
>>>>>>> the Flink docs).
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>>>> nals/stream_checkpointing.html
>>>>>>>
>>>>>>>
>>>>>>> I am quite puzzled why the offset committing problem occurs only
for
>>>>>>> one input, and not for the other.
>>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>>>>> Could you try out a snapshot version to see if that fixes your
>>>>>>> problem?
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Stefan,
>>>>>>>>
>>>>>>>>      Thanks a million for your detailed explanation. I appreciate
>>>>>>>> it.
>>>>>>>>
>>>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used
to
>>>>>>>> start zookeeper. There is only 1 instance (standalone) of
zookeeper running
>>>>>>>> on my localhost (ubuntu 14.04)
>>>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1*
)
>>>>>>>>
>>>>>>>>      With regards to Flink cluster there's only 1 JM &
2 TMs
>>>>>>>> started with no HA. I presume this does not use zookeeper
anyways as it
>>>>>>>> runs as standalone cluster.
>>>>>>>>
>>>>>>>>
>>>>>>>>      BTW., The kafka connector version that I use is as suggested
>>>>>>>> in the flink connectors page
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *.        <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>>>
>>>>>>>>      Do you see any issues with versions?
>>>>>>>>
>>>>>>>>      1) Do you have benchmarks wrt., to checkpointing in
flink?
>>>>>>>>
>>>>>>>>      2) There isn't detailed explanation on what states are
stored
>>>>>>>> as part of the checkpointing process. For ex.,  If I have
pipeline like
>>>>>>>> *source -> map -> keyBy -> map -> sink, my assumption
on what's
>>>>>>>> stored is:*
>>>>>>>>
>>>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>>>
>>>>>>>> *         b) Intermediate states of each of the transformations
in
>>>>>>>> the pipeline*
>>>>>>>>
>>>>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>>>>
>>>>>>>> *         d) Custom States (SayValueState as in my case)
-
>>>>>>>> Essentially this is what I bother about storing.*
>>>>>>>> *         e) All of my operators*
>>>>>>>>
>>>>>>>>       Is my understanding right?
>>>>>>>>
>>>>>>>>      3) Is there a way in Flink to checkpoint only d) as
stated
>>>>>>>> above
>>>>>>>>
>>>>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>>>>> operators (say I wish to store aggregated values part of
the transformation)
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <sewen@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks, the logs were very helpful!
>>>>>>>>>
>>>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow
and
>>>>>>>>> prevents proper starting of checkpoints.
>>>>>>>>>
>>>>>>>>> Here is what is happening in detail:
>>>>>>>>>
>>>>>>>>>   - Between the point when the TaskManager receives the
"trigger
>>>>>>>>> checkpoint" message and when the point when the KafkaSource
actually starts
>>>>>>>>> the checkpoint is a long time (many seconds) - for one
of the Kafka Inputs
>>>>>>>>> (the other is fine).
>>>>>>>>>   - The only way this delayed can be introduced is if
another
>>>>>>>>> checkpoint related operation (such as trigger() or notifyComplete()
) is
>>>>>>>>> still in progress when the checkpoint is started. Flink
does not perform
>>>>>>>>> concurrent checkpoint operations on a single operator,
to ease the
>>>>>>>>> concurrency model for users.
>>>>>>>>>   - The operation that is still in progress must be the
committing
>>>>>>>>> of the offsets (to ZooKeeper or Kafka). That also explains
why this only
>>>>>>>>> happens once one side receives the first record. Before
that, there is
>>>>>>>>> nothing to commit.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What Flink should fix:
>>>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()"
method.
>>>>>>>>>
>>>>>>>>> What you can fix:
>>>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka
Input
>>>>>>>>> works well, the other does not. Do they go against different
sets of
>>>>>>>>> brokers, or different ZooKeepers? Is the metadata for
one input bad?
>>>>>>>>>   - In the next Flink version, you may opt-out of committing
>>>>>>>>> offsets to Kafka/ZooKeeper all together. It is not important
for Flink's
>>>>>>>>> checkpoints anyways.
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga
<
>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Stefan,
>>>>>>>>>>
>>>>>>>>>>     Please find my responses below.
>>>>>>>>>>
>>>>>>>>>>     - What source are you using for the slow input?
>>>>>>>>>> *     [CVP] - Both stream as pointed out in my first
mail, are
>>>>>>>>>> Kafka Streams*
>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>
>>>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment
as
>>>>>>>>>> below.*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *         final StreamExecutionEnvironment streamEnv
=
>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *      In terms of the state stored, the KS1 stream
has payload
>>>>>>>>>> of 100K events/second, while KS2 have about 1 event
/ 10 minutes...
>>>>>>>>>> basically the operators perform flatmaps on 8 fields
of tuple (all fields
>>>>>>>>>> are primitives). If you look at the states' sizes
in dashboard they are in
>>>>>>>>>> Kb... *
>>>>>>>>>>   - Can you try to see in the log if actually the
state snapshot
>>>>>>>>>> takes that long, or if it simply takes long for the
checkpoint
>>>>>>>>>> barriers to travel through the stream due to a lot
of backpressure?
>>>>>>>>>>     [CVP] -There are no back pressure atleast from
the sample
>>>>>>>>>> computation in the flink dashboard. 100K/second is
low load for flink's
>>>>>>>>>> benchmarks. I could not quite get the barriers vs
snapshot state. I have
>>>>>>>>>> attached the Task Manager log (DEBUG) info if that
will interest you.
>>>>>>>>>>
>>>>>>>>>>      I have attached the checkpoints times' as .png
from the
>>>>>>>>>> dashboard. Basically if you look at checkpoint IDs
28 & 29 &30- you'd
>>>>>>>>>> see that the checkpoints take more than a minute
in each case. Before these
>>>>>>>>>> checkpoints, the KS2 stream did not have any events.
As soon as an
>>>>>>>>>> event(should be in bytes) was generated, the checkpoints
went slow and
>>>>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>>>>
>>>>>>>>>>    This log was collected from the standalone flink
cluster with
>>>>>>>>>> 1 job manager & 2 TMs. 1 TM was running this
application with checkpointing
>>>>>>>>>> (parallelism=1)
>>>>>>>>>>
>>>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <sewen@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi!
>>>>>>>>>>>
>>>>>>>>>>> Let's try to figure that one out. Can you give
us a bit more
>>>>>>>>>>> information?
>>>>>>>>>>>
>>>>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>   - Can you try to see in the log if actually
the state snapshot
>>>>>>>>>>> takes that long, or if it simply takes long for
the checkpoint barriers to
>>>>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>>>>>
>>>>>>>>>>> Greetings,
>>>>>>>>>>> Stephan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske
<
>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi CVP,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm not so much familiar with the internals
of the
>>>>>>>>>>>> checkpointing system, but maybe Stephan (in
CC) has an idea what's going on
>>>>>>>>>>>> here.
>>>>>>>>>>>>
>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>
>>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga
<
>>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>>>
>>>>>>>>>>>>>     I have a stream application that
has 2 stream source as
>>>>>>>>>>>>> below.
>>>>>>>>>>>>>
>>>>>>>>>>>>>      KeyedStream<String, String>
*ks1* = ds1.keyBy("*") ;
>>>>>>>>>>>>>      KeyedStream<Tuple2<String,
V>, String> *ks2* =
>>>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>>>
>>>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>>>      //X is a CoFlatMapFunction that
inserts and removes
>>>>>>>>>>>>> elements from ks2 into a key-value state
member. Elements from ks1 are
>>>>>>>>>>>>> matched against that state. the CoFlatMapFunction
operator maintains
>>>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>>>
>>>>>>>>>>>>>      //ks1 is streaming about 100K events/sec
from kafka topic
>>>>>>>>>>>>>      //ks2 is streaming about 1 event
every 10 minutes...
>>>>>>>>>>>>> Precisely when the 1st event is consumed
from this stream, checkpoint takes
>>>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>>>
>>>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I tried to use checkpoint every 10 Secs
using a
>>>>>>>>>>>>> FsStateBackend... What I notice is that
the checkpoint duration is almost 2
>>>>>>>>>>>>> minutes for many cases, while for the
other cases it varies from 100 ms to
>>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching
the snapshot of the dashboard for
>>>>>>>>>>>>> your reference.
>>>>>>>>>>>>>
>>>>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Best Regards
>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message