flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chakravarthy varaga <chakravarth...@gmail.com>
Subject Re: Flink Checkpoint runs slow for low load stream
Date Thu, 05 Jan 2017 11:25:19 GMT
BRILLIANT !!!

Checkpoint times are consistent with 1.1.4...

Thanks for your formidable support !

Best Regards
CVP

On Wed, Jan 4, 2017 at 5:33 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi CVP,
>
> we recently release Flink 1.1.4, i.e., the next bugfix release of the
> 1.1.x series with major robustness improvements [1].
> You might want to give 1.1.4 a try as well.
>
> Best, Fabian
>
> [1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html
>
> 2017-01-04 16:51 GMT+01:00 Chakravarthy varaga <chakravarthyvp@gmail.com>:
>
>> Hi Stephan, All,
>>
>>      I just got a chance to try if 1.1.3 fixes slow check pointing on FS
>> backend. It seemed to have been fixed. Thanks for the fix.
>>
>>      While testing this, with varying check point intervals, there seem
>> to be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
>> secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
>>      Basically 15 secs seem to be the nominal value so far. anything
>> below this interval shoots the spikes too often. For us living with 15 sec
>> recovery is do-able and eventually catch up on recovery !
>>
>> Best Regards
>> CVP
>>
>> On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
>> chakravarthyvp@gmail.com> wrote:
>>
>>> Thanks for your prompt response Stephan.
>>>
>>>     I'd wait for Flink 1.1.3 !!!
>>>
>>> Best Regards
>>> Varaga
>>>
>>> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> The plan to release 1.1.3 is asap ;-)
>>>>
>>>> Waiting for last backported patched to get in, then release testing and
>>>> release.
>>>>
>>>> If you want to test it today, you would need to manually build the
>>>> release-1.1 branch.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
>>>> chakravarthyvp@gmail.com> wrote:
>>>>
>>>>> Hi Gordon,
>>>>>
>>>>>      Do I need to clone and build release-1.1 branch to test this?
>>>>>      I currently use flinlk 1.1.2 runtime. When is the plan to release
>>>>> it in 1.1.3?
>>>>>
>>>>> Best Regards
>>>>> Varaga
>>>>>
>>>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <
>>>>> tzulitai@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Helping out here: this is the PR for async Kafka offset committing
-
>>>>>> https://github.com/apache/flink/pull/2574.
>>>>>> It has already been merged into the master and release-1.1 branches,
>>>>>> so you can try out the changes now if you’d like.
>>>>>> The change should also be included in the 1.1.3 release, which the
>>>>>> Flink community is discussing to release soon.
>>>>>>
>>>>>> Will definitely be helpful if you can provide feedback afterwards!
>>>>>>
>>>>>> Best Regards,
>>>>>> Gordon
>>>>>>
>>>>>>
>>>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>>>>> chakravarthyvp@gmail.com) wrote:
>>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>>     Is the Async kafka offset commit released in 1.3.1?
>>>>>>
>>>>>> Varaga
>>>>>>
>>>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>>      That should be great. Let me know once the fix is done and
the
>>>>>>> snapshot version to use, I'll check and revert then.
>>>>>>>      Can you also share the JIRA that tracks the issue?
>>>>>>>
>>>>>>>      With regards to offset commit issue, I'm not sure as to
how to
>>>>>>> proceed here. Probably I'll use your fix first and see if the
problem
>>>>>>> reoccurs.
>>>>>>>
>>>>>>> Thanks much
>>>>>>> Varaga
>>>>>>>
>>>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <sewen@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> @CVP
>>>>>>>>
>>>>>>>> Flink stores in checkpoints in your case only the Kafka offsets
>>>>>>>> (few bytes) and the custom state (e).
>>>>>>>>
>>>>>>>> Here is an illustration of the checkpoint and what is stored
(from
>>>>>>>> the Flink docs).
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>>>>> nals/stream_checkpointing.html
>>>>>>>>
>>>>>>>>
>>>>>>>> I am quite puzzled why the offset committing problem occurs
only
>>>>>>>> for one input, and not for the other.
>>>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as
well.
>>>>>>>> Could you try out a snapshot version to see if that fixes
your
>>>>>>>> problem?
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Stefan,
>>>>>>>>>
>>>>>>>>>      Thanks a million for your detailed explanation.
I appreciate
>>>>>>>>> it.
>>>>>>>>>
>>>>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was
used to
>>>>>>>>> start zookeeper. There is only 1 instance (standalone)
of zookeeper running
>>>>>>>>> on my localhost (ubuntu 14.04)
>>>>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1*
)
>>>>>>>>>
>>>>>>>>>      With regards to Flink cluster there's only 1 JM
& 2 TMs
>>>>>>>>> started with no HA. I presume this does not use zookeeper
anyways as it
>>>>>>>>> runs as standalone cluster.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>      BTW., The kafka connector version that I use is
as suggested
>>>>>>>>> in the flink connectors page
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *.        <dependency>
>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>>>>
>>>>>>>>>      Do you see any issues with versions?
>>>>>>>>>
>>>>>>>>>      1) Do you have benchmarks wrt., to checkpointing
in flink?
>>>>>>>>>
>>>>>>>>>      2) There isn't detailed explanation on what states
are stored
>>>>>>>>> as part of the checkpointing process. For ex.,  If I
have pipeline like
>>>>>>>>> *source -> map -> keyBy -> map -> sink, my
assumption on what's
>>>>>>>>> stored is:*
>>>>>>>>>
>>>>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>>>>
>>>>>>>>> *         b) Intermediate states of each of the transformations
in
>>>>>>>>> the pipeline*
>>>>>>>>>
>>>>>>>>> *         c) Delta of Records stored from the previous
sink*
>>>>>>>>>
>>>>>>>>> *         d) Custom States (SayValueState as in my case)
-
>>>>>>>>> Essentially this is what I bother about storing.*
>>>>>>>>> *         e) All of my operators*
>>>>>>>>>
>>>>>>>>>       Is my understanding right?
>>>>>>>>>
>>>>>>>>>      3) Is there a way in Flink to checkpoint only d)
as stated
>>>>>>>>> above
>>>>>>>>>
>>>>>>>>>      4) Can you apply checkpointing to only streams and
certain
>>>>>>>>> operators (say I wish to store aggregated values part
of the transformation)
>>>>>>>>>
>>>>>>>>> Best Regards
>>>>>>>>> CVP
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <sewen@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks, the logs were very helpful!
>>>>>>>>>>
>>>>>>>>>> TL:DR - The offset committing to ZooKeeper is very
slow and
>>>>>>>>>> prevents proper starting of checkpoints.
>>>>>>>>>>
>>>>>>>>>> Here is what is happening in detail:
>>>>>>>>>>
>>>>>>>>>>   - Between the point when the TaskManager receives
the "trigger
>>>>>>>>>> checkpoint" message and when the point when the KafkaSource
actually starts
>>>>>>>>>> the checkpoint is a long time (many seconds) - for
one of the Kafka Inputs
>>>>>>>>>> (the other is fine).
>>>>>>>>>>   - The only way this delayed can be introduced is
if another
>>>>>>>>>> checkpoint related operation (such as trigger() or
notifyComplete() ) is
>>>>>>>>>> still in progress when the checkpoint is started.
Flink does not perform
>>>>>>>>>> concurrent checkpoint operations on a single operator,
to ease the
>>>>>>>>>> concurrency model for users.
>>>>>>>>>>   - The operation that is still in progress must
be the
>>>>>>>>>> committing of the offsets (to ZooKeeper or Kafka).
That also explains why
>>>>>>>>>> this only happens once one side receives the first
record. Before that,
>>>>>>>>>> there is nothing to commit.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What Flink should fix:
>>>>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()"
method.
>>>>>>>>>>
>>>>>>>>>> What you can fix:
>>>>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One
Kafka Input
>>>>>>>>>> works well, the other does not. Do they go against
different sets of
>>>>>>>>>> brokers, or different ZooKeepers? Is the metadata
for one input bad?
>>>>>>>>>>   - In the next Flink version, you may opt-out of
committing
>>>>>>>>>> offsets to Kafka/ZooKeeper all together. It is not
important for Flink's
>>>>>>>>>> checkpoints anyways.
>>>>>>>>>>
>>>>>>>>>> Greetings,
>>>>>>>>>> Stephan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga
<
>>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>
>>>>>>>>>>>     Please find my responses below.
>>>>>>>>>>>
>>>>>>>>>>>     - What source are you using for the slow
input?
>>>>>>>>>>> *     [CVP] - Both stream as pointed out in my
first mail, are
>>>>>>>>>>> Kafka Streams*
>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>
>>>>>>>>>>> *[CVP] - I have enabled checkpointing on the
StreamEnvironment
>>>>>>>>>>> as below.*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *         final StreamExecutionEnvironment streamEnv
=
>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *      In terms of the state stored, the KS1
stream has payload
>>>>>>>>>>> of 100K events/second, while KS2 have about 1
event / 10 minutes...
>>>>>>>>>>> basically the operators perform flatmaps on 8
fields of tuple (all fields
>>>>>>>>>>> are primitives). If you look at the states' sizes
in dashboard they are in
>>>>>>>>>>> Kb... *
>>>>>>>>>>>   - Can you try to see in the log if actually
the state snapshot
>>>>>>>>>>> takes that long, or if it simply takes long for
the checkpoint
>>>>>>>>>>> barriers to travel through the stream due to
a lot of backpressure?
>>>>>>>>>>>     [CVP] -There are no back pressure atleast
from the sample
>>>>>>>>>>> computation in the flink dashboard. 100K/second
is low load for flink's
>>>>>>>>>>> benchmarks. I could not quite get the barriers
vs snapshot state. I have
>>>>>>>>>>> attached the Task Manager log (DEBUG) info if
that will interest you.
>>>>>>>>>>>
>>>>>>>>>>>      I have attached the checkpoints times' as
.png from the
>>>>>>>>>>> dashboard. Basically if you look at checkpoint
IDs 28 & 29 &30- you'd
>>>>>>>>>>> see that the checkpoints take more than a minute
in each case. Before these
>>>>>>>>>>> checkpoints, the KS2 stream did not have any
events. As soon as an
>>>>>>>>>>> event(should be in bytes) was generated, the
checkpoints went slow and
>>>>>>>>>>> subsequently a minute more for every checkpoint
thereafter.
>>>>>>>>>>>
>>>>>>>>>>>    This log was collected from the standalone
flink cluster with
>>>>>>>>>>> 1 job manager & 2 TMs. 1 TM was running this
application with checkpointing
>>>>>>>>>>> (parallelism=1)
>>>>>>>>>>>
>>>>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen
<sewen@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi!
>>>>>>>>>>>>
>>>>>>>>>>>> Let's try to figure that one out. Can you
give us a bit more
>>>>>>>>>>>> information?
>>>>>>>>>>>>
>>>>>>>>>>>>   - What source are you using for the slow
input?
>>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>>   - Can you try to see in the log if actually
the state
>>>>>>>>>>>> snapshot takes that long, or if it simply
takes long for the checkpoint
>>>>>>>>>>>> barriers to travel through the stream due
to a lot of backpressure?
>>>>>>>>>>>>
>>>>>>>>>>>> Greetings,
>>>>>>>>>>>> Stephan
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske
<
>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi CVP,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm not so much familiar with the internals
of the
>>>>>>>>>>>>> checkpointing system, but maybe Stephan
(in CC) has an idea what's going on
>>>>>>>>>>>>> here.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy
varaga <
>>>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     I have a stream application that
has 2 stream source as
>>>>>>>>>>>>>> below.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      KeyedStream<String, String>
*ks1* = ds1.keyBy("*") ;
>>>>>>>>>>>>>>      KeyedStream<Tuple2<String,
V>, String> *ks2* =
>>>>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>>>>      //X is a CoFlatMapFunction that
inserts and removes
>>>>>>>>>>>>>> elements from ks2 into a key-value
state member. Elements from ks1 are
>>>>>>>>>>>>>> matched against that state. the CoFlatMapFunction
operator maintains
>>>>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      //ks1 is streaming about 100K
events/sec from kafka topic
>>>>>>>>>>>>>>      //ks2 is streaming about 1 event
every 10 minutes...
>>>>>>>>>>>>>> Precisely when the 1st event is consumed
from this stream, checkpoint takes
>>>>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I tried to use checkpoint every 10
Secs using a
>>>>>>>>>>>>>> FsStateBackend... What I notice is
that the checkpoint duration is almost 2
>>>>>>>>>>>>>> minutes for many cases, while for
the other cases it varies from 100 ms to
>>>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching
the snapshot of the dashboard for
>>>>>>>>>>>>>> your reference.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      Is this an issue with flink
checkpointing?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Best Regards
>>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message