flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yassine MARZOUGUI <y.marzou...@mindlytix.com>
Subject Re: Flink Checkpoint runs slow for low load stream
Date Mon, 24 Apr 2017 06:20:34 GMT
Hi all,

I am experiencing a similar problem but with HDFS as a source instead of
Kafka. I have a streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key
indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and
processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum
state that the job might hold. I noticed that the backpressure on the
operators #1 and #2 is High, and the split reader has only read 60 Mb out
of 1Gb source source file. I suspect this is because the ProcessFunction is
slow (on purpose). However looks like this affected the checkpoints which
are failing after the timeout (which is set to 2 hours).

In the job manager logs I keep getting warnings :

2017-04-23 19:32:38,827 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Received late message for now expired checkpoint attempt 8 from
210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.

Is the high backpressure the cause for the checkpoints being too slow? If
yes Is there a way to disbale the backpressure mechanism since the records
will be buffered in the rocksdb state after all which is backed by the disk?

Any help is appreciated. Thank you.

Best,
Yassine

On Jan 5, 2017 12:25, "Chakravarthy varaga" <chakravarthyvp@gmail.com>
wrote:

> BRILLIANT !!!
>
> Checkpoint times are consistent with 1.1.4...
>
> Thanks for your formidable support !
>
> Best Regards
> CVP
>
> On Wed, Jan 4, 2017 at 5:33 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Hi CVP,
>>
>> we recently release Flink 1.1.4, i.e., the next bugfix release of the
>> 1.1.x series with major robustness improvements [1].
>> You might want to give 1.1.4 a try as well.
>>
>> Best, Fabian
>>
>> [1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html
>>
>> 2017-01-04 16:51 GMT+01:00 Chakravarthy varaga <chakravarthyvp@gmail.com>
>> :
>>
>>> Hi Stephan, All,
>>>
>>>      I just got a chance to try if 1.1.3 fixes slow check pointing on FS
>>> backend. It seemed to have been fixed. Thanks for the fix.
>>>
>>>      While testing this, with varying check point intervals, there seem
>>> to be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
>>> secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
>>>      Basically 15 secs seem to be the nominal value so far. anything
>>> below this interval shoots the spikes too often. For us living with 15 sec
>>> recovery is do-able and eventually catch up on recovery !
>>>
>>> Best Regards
>>> CVP
>>>
>>> On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
>>> chakravarthyvp@gmail.com> wrote:
>>>
>>>> Thanks for your prompt response Stephan.
>>>>
>>>>     I'd wait for Flink 1.1.3 !!!
>>>>
>>>> Best Regards
>>>> Varaga
>>>>
>>>> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>>> The plan to release 1.1.3 is asap ;-)
>>>>>
>>>>> Waiting for last backported patched to get in, then release testing
>>>>> and release.
>>>>>
>>>>> If you want to test it today, you would need to manually build the
>>>>> release-1.1 branch.
>>>>>
>>>>> Best,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>
>>>>>> Hi Gordon,
>>>>>>
>>>>>>      Do I need to clone and build release-1.1 branch to test this?
>>>>>>      I currently use flinlk 1.1.2 runtime. When is the plan to
>>>>>> release it in 1.1.3?
>>>>>>
>>>>>> Best Regards
>>>>>> Varaga
>>>>>>
>>>>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <
>>>>>> tzulitai@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Helping out here: this is the PR for async Kafka offset committing
-
>>>>>>> https://github.com/apache/flink/pull/2574.
>>>>>>> It has already been merged into the master and release-1.1 branches,
>>>>>>> so you can try out the changes now if you’d like.
>>>>>>> The change should also be included in the 1.1.3 release, which
the
>>>>>>> Flink community is discussing to release soon.
>>>>>>>
>>>>>>> Will definitely be helpful if you can provide feedback afterwards!
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Gordon
>>>>>>>
>>>>>>>
>>>>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>>>>>> chakravarthyvp@gmail.com) wrote:
>>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>>     Is the Async kafka offset commit released in 1.3.1?
>>>>>>>
>>>>>>> Varaga
>>>>>>>
>>>>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Stephan,
>>>>>>>>
>>>>>>>>      That should be great. Let me know once the fix is done
and the
>>>>>>>> snapshot version to use, I'll check and revert then.
>>>>>>>>      Can you also share the JIRA that tracks the issue?
>>>>>>>>
>>>>>>>>      With regards to offset commit issue, I'm not sure as
to how to
>>>>>>>> proceed here. Probably I'll use your fix first and see if
the problem
>>>>>>>> reoccurs.
>>>>>>>>
>>>>>>>> Thanks much
>>>>>>>> Varaga
>>>>>>>>
>>>>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <sewen@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> @CVP
>>>>>>>>>
>>>>>>>>> Flink stores in checkpoints in your case only the Kafka
offsets
>>>>>>>>> (few bytes) and the custom state (e).
>>>>>>>>>
>>>>>>>>> Here is an illustration of the checkpoint and what is
stored (from
>>>>>>>>> the Flink docs).
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>>>>>> nals/stream_checkpointing.html
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am quite puzzled why the offset committing problem
occurs only
>>>>>>>>> for one input, and not for the other.
>>>>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3
as well.
>>>>>>>>> Could you try out a snapshot version to see if that fixes
your
>>>>>>>>> problem?
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga
<
>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Stefan,
>>>>>>>>>>
>>>>>>>>>>      Thanks a million for your detailed explanation.
I appreciate
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1*
was used to
>>>>>>>>>> start zookeeper. There is only 1 instance (standalone)
of zookeeper running
>>>>>>>>>> on my localhost (ubuntu 14.04)
>>>>>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1*
)
>>>>>>>>>>
>>>>>>>>>>      With regards to Flink cluster there's only 1
JM & 2 TMs
>>>>>>>>>> started with no HA. I presume this does not use zookeeper
anyways as it
>>>>>>>>>> runs as standalone cluster.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>      BTW., The kafka connector version that I use
is as suggested
>>>>>>>>>> in the flink connectors page
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *.        <dependency>
>>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>>>>>
>>>>>>>>>>      Do you see any issues with versions?
>>>>>>>>>>
>>>>>>>>>>      1) Do you have benchmarks wrt., to checkpointing
in flink?
>>>>>>>>>>
>>>>>>>>>>      2) There isn't detailed explanation on what
states are
>>>>>>>>>> stored as part of the checkpointing process. For
ex.,  If I have pipeline
>>>>>>>>>> like
>>>>>>>>>> *source -> map -> keyBy -> map -> sink,
my assumption on what's
>>>>>>>>>> stored is:*
>>>>>>>>>>
>>>>>>>>>> *         a) The source stream's custom watermarked
records*
>>>>>>>>>>
>>>>>>>>>> *         b) Intermediate states of each of the transformations
>>>>>>>>>> in the pipeline*
>>>>>>>>>>
>>>>>>>>>> *         c) Delta of Records stored from the previous
sink*
>>>>>>>>>>
>>>>>>>>>> *         d) Custom States (SayValueState as in my
case) -
>>>>>>>>>> Essentially this is what I bother about storing.*
>>>>>>>>>> *         e) All of my operators*
>>>>>>>>>>
>>>>>>>>>>       Is my understanding right?
>>>>>>>>>>
>>>>>>>>>>      3) Is there a way in Flink to checkpoint only
d) as stated
>>>>>>>>>> above
>>>>>>>>>>
>>>>>>>>>>      4) Can you apply checkpointing to only streams
and certain
>>>>>>>>>> operators (say I wish to store aggregated values
part of the transformation)
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>> CVP
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <sewen@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks, the logs were very helpful!
>>>>>>>>>>>
>>>>>>>>>>> TL:DR - The offset committing to ZooKeeper is
very slow and
>>>>>>>>>>> prevents proper starting of checkpoints.
>>>>>>>>>>>
>>>>>>>>>>> Here is what is happening in detail:
>>>>>>>>>>>
>>>>>>>>>>>   - Between the point when the TaskManager receives
the "trigger
>>>>>>>>>>> checkpoint" message and when the point when the
KafkaSource actually starts
>>>>>>>>>>> the checkpoint is a long time (many seconds)
- for one of the Kafka Inputs
>>>>>>>>>>> (the other is fine).
>>>>>>>>>>>   - The only way this delayed can be introduced
is if another
>>>>>>>>>>> checkpoint related operation (such as trigger()
or notifyComplete() ) is
>>>>>>>>>>> still in progress when the checkpoint is started.
Flink does not perform
>>>>>>>>>>> concurrent checkpoint operations on a single
operator, to ease the
>>>>>>>>>>> concurrency model for users.
>>>>>>>>>>>   - The operation that is still in progress must
be the
>>>>>>>>>>> committing of the offsets (to ZooKeeper or Kafka).
That also explains why
>>>>>>>>>>> this only happens once one side receives the
first record. Before that,
>>>>>>>>>>> there is nothing to commit.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> What Flink should fix:
>>>>>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()"
method.
>>>>>>>>>>>
>>>>>>>>>>> What you can fix:
>>>>>>>>>>>   - Have a look at your Kafka/ZooKeeper setup.
One Kafka Input
>>>>>>>>>>> works well, the other does not. Do they go against
different sets of
>>>>>>>>>>> brokers, or different ZooKeepers? Is the metadata
for one input bad?
>>>>>>>>>>>   - In the next Flink version, you may opt-out
of committing
>>>>>>>>>>> offsets to Kafka/ZooKeeper all together. It is
not important for Flink's
>>>>>>>>>>> checkpoints anyways.
>>>>>>>>>>>
>>>>>>>>>>> Greetings,
>>>>>>>>>>> Stephan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy
varaga <
>>>>>>>>>>> chakravarthyvp@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>>
>>>>>>>>>>>>     Please find my responses below.
>>>>>>>>>>>>
>>>>>>>>>>>>     - What source are you using for the slow
input?
>>>>>>>>>>>> *     [CVP] - Both stream as pointed out
in my first mail, are
>>>>>>>>>>>> Kafka Streams*
>>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>>
>>>>>>>>>>>> *[CVP] - I have enabled checkpointing on
the StreamEnvironment
>>>>>>>>>>>> as below.*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *         final StreamExecutionEnvironment
streamEnv =
>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *      In terms of the state stored, the
KS1 stream has payload
>>>>>>>>>>>> of 100K events/second, while KS2 have about
1 event / 10 minutes...
>>>>>>>>>>>> basically the operators perform flatmaps
on 8 fields of tuple (all fields
>>>>>>>>>>>> are primitives). If you look at the states'
sizes in dashboard they are in
>>>>>>>>>>>> Kb... *
>>>>>>>>>>>>   - Can you try to see in the log if actually
the state
>>>>>>>>>>>> snapshot takes that long, or if it simply
takes long for the
>>>>>>>>>>>> checkpoint barriers to travel through the
stream due to a lot
>>>>>>>>>>>> of backpressure?
>>>>>>>>>>>>     [CVP] -There are no back pressure atleast
from the sample
>>>>>>>>>>>> computation in the flink dashboard. 100K/second
is low load for flink's
>>>>>>>>>>>> benchmarks. I could not quite get the barriers
vs snapshot state. I have
>>>>>>>>>>>> attached the Task Manager log (DEBUG) info
if that will interest you.
>>>>>>>>>>>>
>>>>>>>>>>>>      I have attached the checkpoints times'
as .png from the
>>>>>>>>>>>> dashboard. Basically if you look at checkpoint
IDs 28 & 29 &30- you'd
>>>>>>>>>>>> see that the checkpoints take more than a
minute in each case. Before these
>>>>>>>>>>>> checkpoints, the KS2 stream did not have
any events. As soon as an
>>>>>>>>>>>> event(should be in bytes) was generated,
the checkpoints went slow and
>>>>>>>>>>>> subsequently a minute more for every checkpoint
thereafter.
>>>>>>>>>>>>
>>>>>>>>>>>>    This log was collected from the standalone
flink cluster
>>>>>>>>>>>> with 1 job manager & 2 TMs. 1 TM was
running this application with
>>>>>>>>>>>> checkpointing (parallelism=1)
>>>>>>>>>>>>
>>>>>>>>>>>>     Please let me know if you need further
info.,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan
Ewen <sewen@apache.org
>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let's try to figure that one out. Can
you give us a bit more
>>>>>>>>>>>>> information?
>>>>>>>>>>>>>
>>>>>>>>>>>>>   - What source are you using for the
slow input?
>>>>>>>>>>>>>   - How large is the state that you are
checkpointing?
>>>>>>>>>>>>>   - Can you try to see in the log if
actually the state
>>>>>>>>>>>>> snapshot takes that long, or if it simply
takes long for the checkpoint
>>>>>>>>>>>>> barriers to travel through the stream
due to a lot of backpressure?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Greetings,
>>>>>>>>>>>>> Stephan
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian
Hueske <
>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi CVP,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm not so much familiar with the
internals of the
>>>>>>>>>>>>>> checkpointing system, but maybe Stephan
(in CC) has an idea what's going on
>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy
varaga <
>>>>>>>>>>>>>> chakravarthyvp@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     I have a stream application
that has 2 stream source as
>>>>>>>>>>>>>>> below.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      KeyedStream<String, String>
*ks1* = ds1.keyBy("*") ;
>>>>>>>>>>>>>>>      KeyedStream<Tuple2<String,
V>, String> *ks2* =
>>>>>>>>>>>>>>> ds2.flatMap(split T into k-v
pairs).keyBy(0);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>>>>>      //X is a CoFlatMapFunction
that inserts and removes
>>>>>>>>>>>>>>> elements from ks2 into a key-value
state member. Elements from ks1 are
>>>>>>>>>>>>>>> matched against that state. the
CoFlatMapFunction operator maintains
>>>>>>>>>>>>>>> ValueState<Tuple2<Long,
Long>>;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      //ks1 is streaming about
100K events/sec from kafka
>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>      //ks2 is streaming about
1 event every 10 minutes...
>>>>>>>>>>>>>>> Precisely when the 1st event
is consumed from this stream, checkpoint takes
>>>>>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I tried to use checkpoint every
10 Secs using a
>>>>>>>>>>>>>>> FsStateBackend... What I notice
is that the checkpoint duration is almost 2
>>>>>>>>>>>>>>> minutes for many cases, while
for the other cases it varies from 100 ms to
>>>>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching
the snapshot of the dashboard for
>>>>>>>>>>>>>>> your reference.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      Is this an issue with flink
checkpointing?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Best Regards
>>>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message