Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B02CD200CFC for ; Thu, 28 Sep 2017 13:44:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AE94C1609CD; Thu, 28 Sep 2017 11:44:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D2DA71609C2 for ; Thu, 28 Sep 2017 13:44:03 +0200 (CEST) Received: (qmail 69952 invoked by uid 500); 28 Sep 2017 11:44:02 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 69942 invoked by uid 99); 28 Sep 2017 11:44:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Sep 2017 11:44:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 20E64181187 for ; Thu, 28 Sep 2017 11:44:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.73 X-Spam-Level: ** X-Spam-Status: No, score=2.73 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, KAM_LOTSOFHASH=0.25, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id NsGVitSjVTLz for ; Thu, 28 Sep 2017 11:43:56 +0000 (UTC) Received: from mail-wm0-f42.google.com (mail-wm0-f42.google.com [74.125.82.42]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 701DC5F5F7 for ; Thu, 28 Sep 2017 11:43:55 +0000 (UTC) Received: by mail-wm0-f42.google.com with SMTP id i131so1683230wma.0 for ; Thu, 28 Sep 2017 04:43:55 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=dTqpnb496R9lFgo51Z/5X3ODQ+UAZSBST5FsX5v/ELU=; b=LtL/sZE120swiXmOcpJaagzu1alRsaOYWmWr2MrQ7RPfVqWASXCmbSq5KyK4T7E6GD 5x+uAyRTvnvbmSuX9/BE5i1HaQc3jfltYVZBYE/JgvPG5COPo9+YEaxlUTWeZ6B5OFf5 mOy99T0lEOdepgQUn/HWvMP6pHTJB4BAJn0g6xdVZn/Nk+c0AH7AgCDN+6S3ulvJ8GTd 4BW2hmQtR2XtOB1pdoZYBSGUrQtm+2/c/yKPD6pOVmu85SdX4R2UXn4p+ihnjv4CPh4h C/UhpSuHaNYesCpme+M8sivUp/i/ddKfL0nrohLVmIQoVh/kK+Hi0ZogLP0k3zhD9kt7 vV+w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=dTqpnb496R9lFgo51Z/5X3ODQ+UAZSBST5FsX5v/ELU=; b=jOLxvzGiAzpRj7nO7n3MNQX49b3kPKnsqcY9wfm/vcyHeoetvul/podJnTPB4h/Mvl shr0/ro9dLWZaFIM1mkAiEjjjxhE//VNLcVRT3bSKmtNwYjI8/OD/DYgiG83EOn2cBhd PWHKGMF3o7Cn/6QuuoNyDVO1ZiwxHFwhTkXN9BOiRiQpWztHe/sy7ogW6SZS7XURzIJn otT/dwtzviI+IyvXfgr4+wdSRVQGaQVQYr0tEnzVyG5h+vGseMkZ73XJaIfkelzgs1HX 7pi2xQGD3R7fhiRSjRJyoVPanDrIFR7zXLM65DMAQAeMwnp4JbSbseGYhfJ+U0IeEtdv zBDg== X-Gm-Message-State: AMCzsaUd78O/P1gYQz6slmshvxx+AB7tVkFWrx7+QYdzvK2D5LflZOXK L52m5Q7ikC77FVnWsbl45i7DQw== X-Google-Smtp-Source: AOwi7QBWOMKT063KS19Mt4kkGmbmWzqrGuLAdzcE3HpeciF+ZYh1+YZlRIIEwDE2mHXj7+nhc7u5GQ== X-Received: by 10.28.218.209 with SMTP id r200mr800010wmg.97.1506599027778; Thu, 28 Sep 2017 04:43:47 -0700 (PDT) Received: from skynet.fritz.box (dslb-094-222-124-158.094.222.pools.vodafone-ip.de. [94.222.124.158]) by smtp.gmail.com with ESMTPSA id v2sm442766wmf.40.2017.09.28.04.43.45 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 28 Sep 2017 04:43:46 -0700 (PDT) From: Stefan Richter Message-Id: <7BB17283-BD94-42D6-977A-F1FDACFEE9FA@data-artisans.com> Content-Type: multipart/alternative; boundary="Apple-Mail=_A84A07F5-7FE0-47F0-BE1A-96165A7C7217" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: Stream Task seems to be blocked after checkpoint timeout Date: Thu, 28 Sep 2017 13:43:44 +0200 In-Reply-To: Cc: user , Stephan Ewen To: Tony Wei References: <3C2BE3A4-9790-47A2-A81C-C5C66BB51841@data-artisans.com> <80FDB464-82B0-4B9B-970B-37C4A069171B@data-artisans.com> <7A3FC617-953C-469F-AB73-FD1C9A4B23A4@data-artisans.com> X-Mailer: Apple Mail (2.3273) archived-at: Thu, 28 Sep 2017 11:44:05 -0000 --Apple-Mail=_A84A07F5-7FE0-47F0-BE1A-96165A7C7217 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi, the gap between the sync and the async part does not mean too much. What = happens per task is that all operators go through their sync part, and = then one thread executes all the async parts, one after the other. So if = an async part starts late, this is just because it started only after = another async part finished. I have one more question about your job, because it involves = communication with external systems, like S3 and a database. Are you = sure that they cannot sometimes become a bottleneck, block, and bring = down your job. in particular: is the same S3 used to serve the operator = and checkpointing and what is your sustained read/write rate there and = the maximum number of connections? You can try to use the backpressure = metric and try to identify the first operator (counting from the sink) = that indicates backpressure. Best, Stefan > Am 28.09.2017 um 12:59 schrieb Tony Wei : >=20 > Hi, >=20 > Sorry. This is the correct one. >=20 > Best Regards, > Tony Wei >=20 > 2017-09-28 18:55 GMT+08:00 Tony Wei >: > Hi Stefan,=20 >=20 > Sorry for providing partial information. The attachment is the full = logs for checkpoint #1577. >=20 > Why I would say it seems that asynchronous part was not executed = immediately is due to all synchronous parts were all finished at = 2017-09-27 13:49. > Did that mean the checkpoint barrier event had already arrived at the = operator and started as soon as when the JM triggered the checkpoint? >=20 > Best Regards, > Tony Wei >=20 > 2017-09-28 18:22 GMT+08:00 Stefan Richter >: > Hi, >=20 > I agree that the memory consumption looks good. If there is only one = TM, it will run inside one JVM. As for the 7 minutes, you mean the = reported end-to-end time? This time measurement starts when the = checkpoint is triggered on the job manager, the first contributor is = then the time that it takes for the checkpoint barrier event to travel = with the stream to the operators. If there is back pressure and a lot of = events are buffered, this can introduce delay to this first part, = because barriers must not overtake data for correctness. After the = barrier arrives at the operator, next comes the synchronous part of the = checkpoint, which is typically short running and takes a snapshot of the = state (think of creating an immutable version, e.g. through copy on = write). In the asynchronous part, this snapshot is persisted to DFS. = After that the timing stops and is reported together with the = acknowledgement to the job manager.=20 >=20 > So, I would assume if reporting took 7 minutes end-to-end, and the = async part took 4 minutes, it is likely that it took around 3 minutes = for the barrier event to travel with the stream. About the debugging, I = think it is hard to figure out what is going on with the DFS if you = don=E2=80=99t have metrics on that. Maybe you could attach a sampler to = the TM=E2=80=99s jvm and monitor where time is spend for the = snapshotting? >=20 > I am also looping in Stephan, he might have more suggestions. >=20 > Best, > Stefan >=20 >> Am 28.09.2017 um 11:25 schrieb Tony Wei >: >>=20 >> Hi Stefan, >>=20 >> These are some telemetry information, but I don't have history = information about gc. >>=20 >> >> >>=20 >> 1) Yes, my state is not large. >> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. = Since this is a POC, we might move to AWS in the future or use HDFS in = the same cluster. However, how can I recognize the problem is this. >> 3) It seems memory usage is bounded. I'm not sure if the status = showed above is fine. >>=20 >> There is only one TM in my cluster for now, so all tasks are running = on that machine. I think that means they are in the same JVM, right? >> Besides taking so long on asynchronous part, there is another = question is that the late message showed that this task was delay for = almost 7 minutes, but the log showed it only took 4 minutes. >> It seems that it was somehow waiting for being executed. Are there = some points to find out what happened? >>=20 >> For the log information, what I means is it is hard to recognize = which checkpoint id that asynchronous parts belong to if the checkpoint = takes more time and there are more concurrent checkpoints taking place. >> Also, it seems that asynchronous part might be executed right away if = there is no resource from thread pool. It is better to measure the time = between creation time and processing time, and log it and checkpoint id = with the original log that showed what time the asynchronous part took. >>=20 >> Best Regards, >> Tony Wei >>=20 >> 2017-09-28 16:25 GMT+08:00 Stefan Richter = >: >> Hi, >>=20 >> when the async part takes that long I would have 3 things to look at: >>=20 >> 1) Is your state so large? I don=E2=80=99t think this applies in your = case, right? >> 2) Is something wrong with writing to DFS (network, disks, etc)? >> 3) Are we running low on memory on that task manager? >>=20 >> Do you have telemetry information about used heap and gc pressure on = the problematic task? However, what speaks against the memory problem = hypothesis is that future checkpoints seem to go through again. What I = find very strange is that within the reported 4 minutes of the async = part the only thing that happens is: open dfs output stream, iterate the = in-memory state and write serialized state data to dfs stream, then = close the stream. No locks or waits in that section, so I would assume = that for one of the three reasons I gave, writing the state is terribly = slow. >>=20 >> Those snapshots should be able to run concurrently, for example so = that users can also take savepoints even when a checkpoint was = triggered and is still running, so there is no way to guarantee that the = previous parts have finished, this is expected behaviour. Which waiting = times are you missing in the log? I think the information about when a = checkpoint is triggered, received by the TM, performing the sync and = async part and acknowledgement time should all be there?. >>=20 >> Best, >> Stefan >>=20 >>=20 >>=20 >>> Am 28.09.2017 um 08:18 schrieb Tony Wei >: >>>=20 >>> Hi Stefan, >>>=20 >>> The checkpoint on my job has been subsumed again. There are some = questions that I don't understand. >>>=20 >>> Log in JM : >>> 2017-09-27 13:45:15,686 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed = checkpoint 1576 (174693180 bytes in 21597 ms). >>> 2017-09-27 13:49:42,795 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering = checkpoint 1577 @ 1506520182795 >>> 2017-09-27 13:54:42,795 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering = checkpoint 1578 @ 1506520482795 >>> 2017-09-27 13:55:13,105 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed = checkpoint 1578 (152621410 bytes in 19109 ms). >>> 2017-09-27 13:56:37,103 WARN = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received = late message for now expired checkpoint attempt 1577 from = 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b.... >>> 2017-09-27 13:59:42,795 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering = checkpoint 1579 @ 1506520782795 >>>=20 >>> Log in TM: >>> 2017-09-27 13:56:37,105 INFO = org.apache.flink.runtime.state.DefaultOperatorStateBackend - = DefaultOperatorStateBackend snapshot (File Stream Factory @ = s3://tony-dev/flink- <>checkpoints/7c039572b13346f1b17dcc0ace2b72c2, = asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task = Threads] took 240248 ms. >>>=20 >>> I think the log in TM might be the late message for #1577 in JM, = because #1576, #1578 had been finished and #1579 hadn't been started at = 13:56:37. >>> If there is no mistake on my words, I am wondering why the time it = took was 240248 ms (4 min). It seems that it started late than = asynchronous tasks in #1578. >>> Is there any way to guarantee the previous asynchronous parts of = checkpoints will be executed before the following. >>>=20 >>> Moreover, I think it will be better to have more information in INFO = log, such as waiting time and checkpoint id, in order to trace the = progress of checkpoint conveniently. >>>=20 >>> What do you think? Do you have any suggestion for me to deal with = these problems? Thank you. >>>=20 >>> Best Regards, >>> Tony Wei >>>=20 >>> 2017-09-27 17:11 GMT+08:00 Tony Wei >: >>> Hi Stefan, >>>=20 >>> Here is the summary for my streaming job's checkpoint after = restarting at last night. >>>=20 >>> >>>=20 >>> This is the distribution of alignment buffered from the last 12 = hours. >>>=20 >>> >>>=20 >>> And here is the buffer out pool usage during chk #1140 ~ #1142. For = chk #1245 and #1246, you can check the picture I sent before. >>>=20 >>> >>>=20 >>> AFAIK, the back pressure rate usually is in LOW status, sometimes = goes up to HIGH, and always OK during the night. >>>=20 >>> Best Regards, >>> Tony Wei >>>=20 >>>=20 >>> 2017-09-27 16:54 GMT+08:00 Stefan Richter = >: >>> Hi Tony, >>>=20 >>> are your checkpoints typically close to the timeout boundary? =46rom = what I see, writing the checkpoint is relatively fast but the time from = the checkpoint trigger to execution seems very long. This is typically = the case if your job has a lot of backpressure and therefore the = checkpoint barriers take a long time to travel to the operators, because = a lot of events are piling up in the buffers. Do you also experience = large alignments for your checkpoints? >>>=20 >>> Best, >>> Stefan =20 >>>=20 >>>> Am 27.09.2017 um 10:43 schrieb Tony Wei >: >>>>=20 >>>> Hi Stefan, >>>>=20 >>>> It seems that I found something strange from JM's log. >>>>=20 >>>> It had happened more than once before, but all subtasks would = finish their checkpoint attempts in the end. >>>>=20 >>>> 2017-09-26 01:23:28,690 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering = checkpoint 1140 @ 1506389008690 >>>> 2017-09-26 01:28:28,690 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering = checkpoint 1141 @ 1506389308690 >>>> 2017-09-26 01:33:28,690 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering = checkpoint 1142 @ 1506389608690 >>>> 2017-09-26 01:33:28,691 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint = 1140 expired before completing. >>>> 2017-09-26 01:38:28,691 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint = 1141 expired before completing. >>>> 2017-09-26 01:40:38,044 WARN = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received = late message for now expired checkpoint attempt 1140 from = c63825d15de0fef55a1d148adcf4467e of job 7c039572b... >>>> 2017-09-26 01:40:53,743 WARN = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received = late message for now expired checkpoint attempt 1141 from = c63825d15de0fef55a1d148adcf4467e of job 7c039572b... >>>> 2017-09-26 01:41:19,332 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed = checkpoint 1142 (136733704 bytes in 457413 ms). >>>>=20 >>>> For chk #1245 and #1246, there was no late message from TM. You can = refer to the TM log. The full completed checkpoint attempt will have 12 = (... asynchronous part) logs in general, but #1245 and #1246 only got 10 = logs. >>>>=20 >>>> 2017-09-26 10:08:28,690 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering = checkpoint 1245 @ 1506420508690 >>>> 2017-09-26 10:13:28,690 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering = checkpoint 1246 @ 1506420808690 >>>> 2017-09-26 10:18:28,691 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint = 1245 expired before completing. >>>> 2017-09-26 10:23:28,691 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint = 1246 expired before completing. >>>>=20 >>>> Moreover, I listed the directory for checkpoints on S3 and saw = there were two states not discarded successfully. In general, there will = be 16 parts for a completed checkpoint state. >>>>=20 >>>> 2017-09-26 18:08:33 36919 = tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7= ca5-ee34-45a5-bf0b-11cc1fc67ab8 >>>> 2017-09-26 18:13:34 37419 = tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c= 6c4-8c74-465d-8509-5fea4ed25af6 >>>>=20 >>>> Hope these informations are helpful. Thank you. >>>>=20 >>>> Best Regards, >>>> Tony Wei >>>>=20 >>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter = >: >>>> Hi, >>>>=20 >>>> thanks for the information. Unfortunately, I have no immediate idea = what the reason is from the given information. I think most helpful = could be a thread dump, but also metrics on the operator operator level = to figure out which part of the pipeline is the culprit. >>>>=20 >>>> Best, >>>> Stefan >>>>=20 >>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei >: >>>>>=20 >>>>> Hi Stefan, >>>>>=20 >>>>> There is no unknown exception in my full log. The Flink version is = 1.3.2. >>>>> My job is roughly like this. >>>>>=20 >>>>> env.addSource(Kafka) >>>>> .map(ParseKeyFromRecord) >>>>> .keyBy() >>>>> .process(CountAndTimeoutWindow) >>>>> .asyncIO(UploadToS3) >>>>> .addSink(UpdateDatabase) >>>>>=20 >>>>> It seemed all tasks stopped like the picture I sent in the last = email. >>>>>=20 >>>>> I will keep my eye on taking a thread dump from that JVM if this = happens again. >>>>>=20 >>>>> Best Regards, >>>>> Tony Wei >>>>>=20 >>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter = >: >>>>> Hi, >>>>>=20 >>>>> that is very strange indeed. I had a look at the logs and there is = no error or exception reported. I assume there is also no exception in = your full logs? Which version of flink are you using and what operators = were running in the task that stopped? If this happens again, would it = be possible to take a thread dump from that JVM? >>>>>=20 >>>>> Best, >>>>> Stefan >>>>>=20 >>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei >: >>>>> > >>>>> > Hi, >>>>> > >>>>> > Something weird happened on my streaming job. >>>>> > >>>>> > I found my streaming job seems to be blocked for a long time and = I saw the situation like the picture below. (chk #1245 and #1246 were = all finishing 7/8 tasks then marked timeout by JM. Other checkpoints = failed with the same state like #1247 util I restarted TM.) >>>>> > >>>>> > >>>>> > >>>>> > I'm not sure what happened, but the consumer stopped fetching = records, buffer usage is 100% and the following task did not seem to = fetch data anymore. Just like the whole TM was stopped. >>>>> > >>>>> > However, after I restarted TM and force the job restarting from = the latest completed checkpoint, everything worked again. And I don't = know how to reproduce it. >>>>> > >>>>> > The attachment is my TM log. Because there are many user logs = and sensitive information, I only remain the log from = `org.apache.flink...`. >>>>> > >>>>> > My cluster setting is one JM and one TM with 4 available slots. >>>>> > >>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and = max concurrent number is 3. >>>>> > >>>>> > Please let me know if it needs more information to find out what = happened on my streaming job. Thanks for your help. >>>>> > >>>>> > Best Regards, >>>>> > Tony Wei >>>>> > >>>>>=20 >>>>>=20 >>>>=20 >>>>=20 >>>=20 >>>=20 >>>=20 >>=20 >>=20 >=20 >=20 >=20 > --Apple-Mail=_A84A07F5-7FE0-47F0-BE1A-96165A7C7217 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8
Hi,

the gap between the sync and the async = part does not mean too much. What happens per task is that all operators = go through their sync part, and then one thread executes all the async = parts, one after the other. So if an async part starts late, this is = just because it started only after another async part = finished.

I = have one more question about your job, because it involves communication = with external systems, like S3 and a database. Are you sure that they = cannot sometimes become a bottleneck, block, and bring down your job. in = particular: is the same S3 used to serve the operator and checkpointing = and what is your sustained read/write rate there and the maximum number = of connections? You can try to use the backpressure metric and try to = identify the first operator (counting from the sink) that indicates = backpressure.

Best,
Stefan

Am = 28.09.2017 um 12:59 schrieb Tony Wei <tony19920430@gmail.com>:

Hi,

Sorry. = This is the correct one.

Best Regards,
Tony Wei

2017-09-28= 18:55 GMT+08:00 Tony Wei <tony19920430@gmail.com>:
Hi Stefan, 

Sorry for providing partial information. The attachment is = the full logs for checkpoint #1577.

Why I would say it seems that = asynchronous part was not executed immediately is due to all synchronous = parts were all finished at 2017-09-27 13:49.
Did= that mean the checkpoint barrier event had already arrived at the = operator and started as soon as when the JM triggered the = checkpoint?

Best= Regards,
Tony Wei

2017-09-28 18:22 GMT+08:00 Stefan = Richter <s.richter@data-artisans.com>:
Hi,

I agree that the memory consumption = looks good. If there is only one TM, it will run inside one JVM. As for = the 7 minutes, you mean the reported end-to-end time? This time = measurement starts when the checkpoint is triggered on the job manager, = the first contributor is then the time that it takes for the checkpoint = barrier event to travel with the stream to the operators. If there is = back pressure and a lot of events are buffered, this can introduce delay = to this first part, because barriers must not overtake data for = correctness. After the barrier arrives at the operator, next comes the = synchronous part of the checkpoint, which is typically short running and = takes a snapshot of the state (think of creating an immutable version, = e.g. through copy on write). In the asynchronous part, this snapshot is = persisted to DFS. After that the timing stops and is reported together = with the acknowledgement to the job manager. 

So, I would assume if = reporting took 7 minutes end-to-end, and the async part took 4 minutes, = it is likely that it took around 3 minutes for the barrier event to = travel with the stream. About the debugging, I think it is hard to = figure out what is going on with the DFS if you don=E2=80=99t have = metrics on that. Maybe you could attach a sampler to the TM=E2=80=99s = jvm and monitor where time is spend for the snapshotting?

I am also looping in = Stephan, he might have more suggestions.

Best,
Stefan

Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920430@gmail.com>:

Hi Stefan,

These = are some telemetry information, but I don't have history information = about gc.

<???? 2017-09-2 8 =E4=B8=8B=E5=8D=884.51.26.png><= /div>
<???? 2017-09-2 8 =E4=B8=8B=E5=8D=884.51.11.png><= br class=3D"">

1) Yes, my state is not = large.
2) My DFS is S3, but my cluster is out of = AWS. It might be a problem. Since this is a POC, we might move to AWS in = the future or use HDFS in the same cluster. However, how can = I recognize the problem is this.
3) It seems = memory usage is bounded. I'm not sure if the status showed above is = fine.

There is = only one TM in my cluster for now, so all tasks are running on that = machine. I think that means they are in the same JVM, right?
Besides taking so long on asynchronous = part, there is another question is that the late message showed that = this task was delay for almost 7 minutes, but the log showed it only = took 4 minutes.
It seems that it was somehow = waiting for being executed. Are there some points to find out what = happened?

For = the log information, what I means is it is hard to recognize which = checkpoint id that asynchronous parts belong to if the checkpoint takes = more time and there are more concurrent checkpoints taking = place.
Also, it seems that asynchronous part might = be executed right away if there is no resource from thread pool. It is = better to measure the time between creation time and processing time, = and log it and checkpoint id with the original log that showed what time = the asynchronous part took.

Best Regards,
Tony = Wei

2017-09-28 16:25 GMT+08:00 Stefan = Richter <s.richter@data-artisans.com>:
Hi,

when the async part takes that long I = would have 3 things to look at:

1) Is your state so large? I don=E2=80=99= t think this applies in your case, right?
2) Is = something wrong with writing to DFS (network, disks, etc)?
3) Are we running low on memory on that task = manager?

Do = you have telemetry information about used heap and gc pressure on the = problematic task? However, what speaks against the memory problem = hypothesis is that future checkpoints seem to go through again. What I = find very strange is that within the reported 4 minutes of the async = part the only thing that happens is: open dfs output stream, iterate the = in-memory state and write serialized state data to dfs stream, then = close the stream. No locks or waits in that section, so I would assume = that for one of the three reasons I gave, writing the state is terribly = slow.

Those = snapshots should be able to run concurrently, for example so that users = can also take savepoints  even when a checkpoint was triggered and = is still running, so there is no way to guarantee that the previous = parts have finished, this is expected behaviour. Which waiting times are = you missing in the log? I think the information about when a checkpoint = is triggered, received by the TM, performing the sync and async part and = acknowledgement time should all be there?.

Best,
Stefan



Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920430@gmail.com>:

Hi Stefan,

The checkpoint on my job has = been subsumed again. There are some questions that I don't = understand.

Log = in JM :
2017-09-27 13:45:15,686 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
2017-09-27 13:49:42,795 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Triggering checkpoint 1577 @ 1506520182795
2017-09-27 13:54:42,795 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Triggering checkpoint 1578 @ 1506520482795
2017-09-27 13:55:13,105 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
2017-09-27 13:56:37,103 WARN = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Received late message for now expired checkpoint attempt 1577 from = 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
2017-09-27 13:59:42,795 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Triggering checkpoint 1579 @ 1506520782795

Log in TM:
2017-09-27 13:56:37,105 INFO = org.apache.flink.runtime.state.DefaultOperatorStateBackend= - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2, = asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task Threads] took 240248 ms.

I think the log in TM = might be the late message for #1577 in JM, because #1576, #1578 had been = finished and #1579 hadn't been started at 13:56:37.
If there is no mistake on my words, I am wondering why the = time it took was 240248 ms (4 min). It seems that it started late than = asynchronous tasks in #1578.
Is there any way = to guarantee the previous asynchronous parts of checkpoints will be = executed before the following.

Moreover, I think it will be better to = have more information in INFO log, such as waiting time and checkpoint = id, in order to trace the progress of checkpoint conveniently.

What do you think? Do = you have any suggestion for me to deal with these problems? Thank = you.

Best = Regards,
Tony Wei

2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920430@gmail.com>:
Hi Stefan,

Here is the summary for my streaming = job's checkpoint after restarting at last night.

<???? 2017-09-2 7 = =E4=B8=8B=E5=8D=884.56.30.png>

This is the distribution = of alignment buffered from the last 12 hours.

<???? 2017-09-2 7 = =E4=B8=8B=E5=8D=885.05.11.png>

And here = is the buffer out pool usage during chk #1140 ~ #1142. For chk #1245 and = #1246, you can check the picture I sent before.

 <???? 2017-09-2 7 = =E4=B8=8B=E5=8D=885.01.24.png>

AFAIK, the back pressure = rate usually is in LOW status, sometimes goes up to HIGH, and always OK = during the night.

Best Regards,
Tony Wei


2017-09-27 16:54 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>:
Hi Tony,

are your checkpoints typically close to = the timeout boundary? =46rom what I see, writing the checkpoint is = relatively fast but the time from the checkpoint trigger to execution = seems very long. This is typically the case if your job has a lot of = backpressure and therefore the checkpoint barriers take a long time to = travel to the operators, because a lot of events are piling up in the = buffers. Do you also experience large alignments for your = checkpoints?

Best,
Stefan  

Am = 27.09.2017 um 10:43 schrieb Tony Wei <tony19920430@gmail.com>:

Hi Stefan,

It seems that I found something strange = from JM's log.

It had happened more than once before, but all subtasks would = finish their checkpoint attempts in the end.

2017-09-26 01:23:28,690 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Triggering checkpoint 1140 @ 1506389008690
2017-09-26 01:28:28,690 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Triggering checkpoint 1141 @ 1506389308690
2017-09-26 01:33:28,690 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Triggering checkpoint 1142 @ 1506389608690
2017-09-26 01:33:28,691 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Checkpoint 1140 expired before completing.
2017-09-26 01:38:28,691 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Checkpoint 1141 expired before completing.
2017-09-26 01:40:38,044 WARN = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Received late message for now expired checkpoint attempt 1140 from = c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
2017-09-26 01:40:53,743 WARN = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Received late message for now expired checkpoint attempt 1141 from = c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
2017-09-26 01:41:19,332 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Completed checkpoint 1142 (136733704 bytes in 457413 ms).

For = chk #1245 and #1246, there was no late message from TM. You can refer to = the TM log. The full completed checkpoint attempt will have 12 = (... asynchronous part) logs in general, but #1245 and #1246 only = got 10 logs.

2017-09-26 10:08:28,690 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Triggering checkpoint 1245 @ 1506420508690
2017-09-26 10:13:28,690 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Triggering checkpoint 1246 @ 1506420808690
2017-09-26 10:18:28,691 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Checkpoint 1245 expired before completing.
2017-09-26 10:23:28,691 INFO = org.apache.flink.runtime.checkpoint.CheckpointCoordinator = - Checkpoint 1246 expired before completing.

Moreover, I listed the = directory for checkpoints on S3 and saw there were two states not = discarded successfully. In general, there will be 16 parts for a = completed checkpoint state.

2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
2017-09-26 18:13:34 = 37419 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6

Hope these informations are helpful. = Thank you.

Best = Regards,
Tony Wei

2017-09-27= 16:14 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>:
Hi,

thanks for the information. = Unfortunately, I have no immediate idea what the reason is from the = given information. I think most helpful could be a thread dump, but also = metrics on the operator operator level to figure out which part of the = pipeline is the culprit.

Best,
Stefan

Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920430@gmail.com>:

Hi = Stefan,

There is no = unknown exception in my full log. The Flink version is 1.3.2.
My job is roughly like this.

env.addSource(Kafka)
  .map(ParseKeyFromRecord)
  = .keyBy()
  .process(CountAndTimeoutWindow)
  .asyncIO(UploadToS3)
  .addSink(UpdateDatabase)

It seemed all tasks stopped like the = picture I sent in the last email.

I will keep my eye on taking a thread = dump from that JVM if this happens again.

Best Regards,
Tony = Wei

2017-09-26 23:46 GMT+08:00 Stefan Richter <s.richter@data-artisans.com>:
Hi,

that is very strange indeed. I had a look at the logs and there is no = error or exception reported. I assume there is also no exception in your = full logs? Which version of flink are you using and what operators were = running in the task that stopped? If this happens again, would it be = possible to take a thread dump from that JVM?

Best,
Stefan

> Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com>:
>
> Hi,
>
> Something weird happened on my streaming job.
>
> I found my streaming job seems to be blocked for a long time and I = saw the situation like the picture below. (chk #1245 and #1246 were all = finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed = with the same state like #1247 util I restarted TM.)
>
> <snapshot.png>
>
> I'm not sure what happened, but the consumer stopped fetching = records, buffer usage is 100% and the following task did not seem to = fetch data anymore. Just like the whole TM was stopped.
>
> However, after I restarted TM and force the job restarting from the = latest completed checkpoint, everything worked again. And I don't know = how to reproduce it.
>
> The attachment is my TM log. Because there are many user logs and = sensitive information, I only remain the log from = `org.apache.flink...`.
>
> My cluster setting is one JM and one TM with 4 available slots.
>
> Streaming job uses all slots, checkpoint interval is 5 mins and max = concurrent number is 3.
>
> Please let me know if it needs more information to find out what = happened on my streaming job. Thanks for your help.
>
> Best Regards,
> Tony Wei
> <flink-root-taskmanager-0-partial.log>











<chk_ = 1577.log>
= --Apple-Mail=_A84A07F5-7FE0-47F0-BE1A-96165A7C7217--