From user-return-17762-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Jan 23 10:16:11 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 16245180621 for ; Tue, 23 Jan 2018 10:16:11 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 05993160C4D; Tue, 23 Jan 2018 09:16:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D5CA1160C17 for ; Tue, 23 Jan 2018 10:16:08 +0100 (CET) Received: (qmail 84049 invoked by uid 500); 23 Jan 2018 09:16:07 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 84039 invoked by uid 99); 23 Jan 2018 09:16:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jan 2018 09:16:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6F1731A03B4 for ; Tue, 23 Jan 2018 09:16:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.898 X-Spam-Level: * X-Spam-Status: No, score=1.898 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id w16fQwqCo1PR for ; Tue, 23 Jan 2018 09:16:00 +0000 (UTC) Received: from mail-ua0-f172.google.com (mail-ua0-f172.google.com [209.85.217.172]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id B32675F2AB for ; Tue, 23 Jan 2018 09:15:59 +0000 (UTC) Received: by mail-ua0-f172.google.com with SMTP id n2so7812957uak.9 for ; Tue, 23 Jan 2018 01:15:59 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=qjkkA6csc2MrQIxM2LUJoLJQNFbyF3rHUMxFsTyoW7Y=; b=MgmpSID7EeovIGqHnVJ4Vbi1w78aIBw4SC2HrjuF7uK5GkLacT7Aynwf27QBf8M0lC sVQysMU2J0BEzgt3u0SlfNkiFhcCUCFRk6W1KSMdLHWesbWTdUXKBBDwLGcYTy1joo2q c2k6A/zKQeD0IDqbRxzgksW2eWhr4mpeTkiWRGO0hvejOvuiNx0mAtWPGmWZ7+5FyVBX L7jaBqGNFhacpXYL/kV4qOVEO4mNFKEbcj/txHZEy9JptqIX6uBJd7OTqwGGvJRVjjQh LdmSamYDCvo0kNx4m0saplYG1Ej+oKp5ZGq6vZeto5srk050b3tBLLsMabTm7M4DlB+Q 7+Vw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=qjkkA6csc2MrQIxM2LUJoLJQNFbyF3rHUMxFsTyoW7Y=; b=o1b+LNA17BsVdxFc4SxrncDHEsIQ/PWaa/zMphgWIkkmUVxvcAlNoVvT0zci4dYC2E 14q1ERFzwAyJ0W2djrHGFhyJO4uTcqXCycRAX/3TK8BWVoXmWlI0HO6B9SGJqTJNFruH yUWXvb91ezCgXjNFKyL6jCza3/9Kx90EPKrp3zSVo0c9KJNTgqVhZ0MkhlIoJboRwrJd M9+9vn3Qrsjuxmavclz0mJE3YpksdV64NFsFs0PNACwexvyG98O6KpjtOAvgYXPjlnhI shI2F1vQB4UZWwzrHwBHxk/mIP+mRZJCzhNeAOjtWHIP5Hpqw7jGHSe4KCk1bfojLfyx sYOA== X-Gm-Message-State: AKwxyte9vHe3vRnySVET7EwXcRdhdNYy/lT8qZDku9/HbWMTX2ZdAj3k 4Td26bBRuoo6XaIBWnI7kCr2vxVy0l1pqSBY1HA= X-Google-Smtp-Source: AH8x224U3U6IasciaQZDpRCCb/rNp3D9OJADWhOCstwfYGbMxG5AFKUBezOh+5aJbmZx6EKUx2XnE+i7vlZHcnLcZog= X-Received: by 10.176.2.109 with SMTP id 100mr1410532uas.26.1516698952510; Tue, 23 Jan 2018 01:15:52 -0800 (PST) MIME-Version: 1.0 Received: by 10.103.147.140 with HTTP; Tue, 23 Jan 2018 01:15:11 -0800 (PST) In-Reply-To: References: <7D6AD151-330F-40E4-B86E-7CBA9F13B9DD@apache.org> <661A4105-BC0A-462B-B62B-0695797CE6FC@apache.org> From: Fabian Hueske Date: Tue, 23 Jan 2018 10:15:11 +0100 Message-ID: Subject: Re: Failing to recover once checkpoint fails To: Vishal Santoshi Cc: Aljoscha Krettek , user , Stefan Richter Content-Type: multipart/alternative; boundary="001a11467baa4bb7c105636dfe73" --001a11467baa4bb7c105636dfe73 Content-Type: text/plain; charset="UTF-8" Sorry for the late reply. I created FLINK-8487 [1] to track this problem @Vishal, can you have a look and check if if forgot some details? I logged the issue for Flink 1.3.2, is that correct? Please add more information if you think it is relevant. Thanks, Fabian [1] https://issues.apache.org/jira/browse/FLINK-8487 2018-01-18 22:14 GMT+01:00 Vishal Santoshi : > Or this one > > https://issues.apache.org/jira/browse/FLINK-4815 > > On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi < > vishal.santoshi@gmail.com> wrote: > >> ping. >> >> This happened again on production and it seems reasonable to abort >> when a checkpoint is not found rather than behave as if it is a brand new >> pipeline. >> >> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi < >> vishal.santoshi@gmail.com> wrote: >> >>> Folks sorry for being late on this. Can some body with the knowledge of >>> this code base create a jira issue for the above ? We have seen this more >>> than once on production. >>> >>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek >>> wrote: >>> >>>> Hi Vishal, >>>> >>>> Some relevant Jira issues for you are: >>>> >>>> - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping >>>> failed checkpoints >>>> - https://issues.apache.org/jira/browse/FLINK-4815: Automatic >>>> fallback to earlier checkpoint when checkpoint restore fails >>>> - https://issues.apache.org/jira/browse/FLINK-7783: Don't always >>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() >>>> >>>> Best, >>>> Aljoscha >>>> >>>> >>>> On 9. Oct 2017, at 09:06, Fabian Hueske wrote: >>>> >>>> Hi Vishal, >>>> >>>> it would be great if you could create a JIRA ticket with Blocker >>>> priority. >>>> Please add all relevant information of your detailed analysis, add a >>>> link to this email thread (see [1] for the web archive of the mailing >>>> list), and post the id of the JIRA issue here. >>>> >>>> Thanks for looking into this! >>>> >>>> Best regards, >>>> Fabian >>>> >>>> [1] https://lists.apache.org/list.html?user@flink.apache.org >>>> >>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi : >>>> >>>>> Thank you for confirming. >>>>> >>>>> >>>>> I think this is a critical bug. In essence any checkpoint store ( >>>>> hdfs/S3/File) will loose state if it is unavailable at resume. This >>>>> becomes all the more painful with your confirming that "failed >>>>> checkpoints killing the job" b'coz essentially it mean that if remote >>>>> store in unavailable during checkpoint than you have lost state ( till of >>>>> course you have a retry of none or an unbounded retry delay, a delay that >>>>> you *hope* the store revives in ) .. Remember the first retry >>>>> failure will cause new state according the code as written iff the remote >>>>> store is down. We would rather have a configurable property that >>>>> establishes our desire to abort something like a >>>>> "abort_retry_on_chkretrevalfailure" >>>>> >>>>> >>>>> In our case it is very important that we do not undercount a window, >>>>> one reason we use flink and it's awesome failure guarantees, as various >>>>> alarms sound ( we do anomaly detection on the time series ). >>>>> >>>>> Please create a jira ticket for us to follow or we could do it. >>>>> >>>>> >>>>> PS Not aborting on checkpointing, till a configurable limit is very >>>>> important too. >>>>> >>>>> >>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek >>>>> wrote: >>>>> >>>>>> Hi Vishal, >>>>>> >>>>>> I think you're right! And thanks for looking into this so deeply. >>>>>> >>>>>> With your last mail your basically saying, that the checkpoint could >>>>>> not be restored because your HDFS was temporarily down. If Flink had not >>>>>> deleted that checkpoint it might have been possible to restore it at a >>>>>> later point, right? >>>>>> >>>>>> Regarding failed checkpoints killing the job: yes, this is currently >>>>>> the expected behaviour but there are plans to change this. >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi >>>>>> wrote: >>>>>> >>>>>> I think this is the offending piece. There is a catch all Exception, >>>>>> which IMHO should understand a recoverable exception from an unrecoverable >>>>>> on. >>>>>> >>>>>> >>>>>> try { >>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch >>>>>> eckpointStateHandle); >>>>>> if (completedCheckpoint != null) { >>>>>> completedCheckpoints.add(completedCheckpoint); >>>>>> } >>>>>> } catch (Exception e) { >>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the >>>>>> completed " + >>>>>> "checkpoint store.", e); >>>>>> // remove the checkpoint with broken state handle >>>>>> removeBrokenStateHandle(checkpointStateHandle.f1, >>>>>> checkpointStateHandle.f0); >>>>>> } >>>>>> >>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >>>>>> vishal.santoshi@gmail.com> wrote: >>>>>> >>>>>>> So this is the issue and tell us that it is wrong. ZK had some state >>>>>>> ( backed by hdfs ) that referred to a checkpoint ( the same exact last >>>>>>> successful checkpoint that was successful before NN screwed us ). When the >>>>>>> JM tried to recreate the state and b'coz NN was down failed to retrieve the >>>>>>> CHK handle from hdfs and conveniently ( and I think very wrongly ) removed >>>>>>> the CHK from being considered and cleaned the pointer ( though failed as >>>>>>> was NN was down and is obvious from the dangling file in recovery ) . The >>>>>>> metadata itself was on hdfs and failure in retrieving should have been a >>>>>>> stop all, not going to trying doing magic exception rather than starting >>>>>>> from a blank state. >>>>>>> >>>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint >>>>>>> 44286 from state handle under /0000000000000044286. This indicates that the >>>>>>> retrieved state handle is broken. Try cleaning the state handle store. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>>>>>> vishal.santoshi@gmail.com> wrote: >>>>>>> >>>>>>>> Also note that the zookeeper recovery did ( sadly on the same >>>>>>>> hdfs cluster ) also showed the same behavior. It had the pointers to the >>>>>>>> chk point ( I think that is what it does, keeps metadata of where the >>>>>>>> checkpoint etc ) . It too decided to keep the recovery file from the >>>>>>>> failed state. >>>>>>>> >>>>>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>>>>> >>>>>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>>>>> >>>>>>>> This is getting a little interesting. What say you :) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>>>>>> vishal.santoshi@gmail.com> wrote: >>>>>>>> >>>>>>>>> Another thing I noted was this thing >>>>>>>>> >>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 >>>>>>>>> >>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 >>>>>>>>> >>>>>>>>> >>>>>>>>> Generally what Flink does IMHO is that it replaces the chk point >>>>>>>>> directory with a new one. I see it happening now. Every minute it replaces >>>>>>>>> the old directory. In this job's case however, it did not delete the >>>>>>>>> 2017-10-04 13:54 and hence the chk-44286 directory. This was the last >>>>>>>>> chk-44286 ( I think ) successfully created before NN had issues but as >>>>>>>>> is usual did not delete this chk-44286. It looks as if it started with a >>>>>>>>> blank slate ???????? Does this strike a chord ????? >>>>>>>>> >>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>>>>>> vishal.santoshi@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hello Fabian, >>>>>>>>>> First of all congratulations on this >>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has some natural >>>>>>>>>> pluses Flink's state management is far more advanced. With kafka as a >>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub is organic >>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue with windows on >>>>>>>>>> event time etc ) >>>>>>>>>> >>>>>>>>>> Coming back to this issue. We have that same >>>>>>>>>> kafka topic feeding a streaming druid datasource and we do not see any >>>>>>>>>> issue there, so so data loss on the source, kafka is not applicable. I am >>>>>>>>>> totally certain that the "retention" time was not an issue. It >>>>>>>>>> is 4 days of retention and we fixed this issue within 30 minutes. We could >>>>>>>>>> replay kafka with a new consumer group.id and that worked fine. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Note these properties and see if they strike a chord. >>>>>>>>>> >>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers >>>>>>>>>> is the default true. I bring this up to see whether flink will in any >>>>>>>>>> circumstance drive consumption on the kafka perceived offset rather than >>>>>>>>>> the one in the checkpoint. >>>>>>>>>> >>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set. >>>>>>>>>> The state is big enough though therefore IMHO no way the state is stored >>>>>>>>>> along with the meta data in JM ( or ZK ? ) . The reason I bring this up is >>>>>>>>>> to make sure when you say that the size has to be less than 1024bytes , you >>>>>>>>>> are talking about cumulative state of the pipeine. >>>>>>>>>> >>>>>>>>>> * We have a good sense of SP ( save point ) and CP ( checkpoint >>>>>>>>>> ) and certainly understand that they actually are not dissimilar. However >>>>>>>>>> in this case there were multiple attempts to restart the pipe before it >>>>>>>>>> finally succeeded. >>>>>>>>>> >>>>>>>>>> * Other hdfs related poperties. >>>>>>>>>> >>>>>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>>>>>> flink_hdfs_root %> >>>>>>>>>> >>>>>>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %> >>>>>>>>>> >>>>>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= flink_hdfs_root %> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Do these make sense ? Is there anything else I should look at. >>>>>>>>>> Please also note that it is the second time this has happened. The first >>>>>>>>>> time I was vacationing and was not privy to the state of the flink >>>>>>>>>> pipeline, but the net effect were similar. The counts for the first window >>>>>>>>>> after an internal restart dropped. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thank you for you patience and regards, >>>>>>>>>> >>>>>>>>>> Vishal >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Vishal, >>>>>>>>>>> >>>>>>>>>>> window operators are always stateful because the operator needs >>>>>>>>>>> to remember previously received events (WindowFunction) or intermediate >>>>>>>>>>> results (ReduceFunction). >>>>>>>>>>> Given the program you described, a checkpoint should include the >>>>>>>>>>> Kafka consumer offset and the state of the window operator. If the program >>>>>>>>>>> eventually successfully (i.e., without an error) recovered from the last >>>>>>>>>>> checkpoint, all its state should have been restored. Since the last >>>>>>>>>>> checkpoint was before HDFS went into safe mode, the program would have been >>>>>>>>>>> reset to that point. If the Kafka retention time is less than the time it >>>>>>>>>>> took to fix HDFS you would have lost data because it would have been >>>>>>>>>>> removed from Kafka. If that's not the case, we need to investigate this >>>>>>>>>>> further because a checkpoint recovery must not result in state loss. >>>>>>>>>>> >>>>>>>>>>> Restoring from a savepoint is not so much different from >>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed savepoint, >>>>>>>>>>> you can restart the job from that point. The main difference is that >>>>>>>>>>> checkpoints are only used for internal recovery and usually discarded once >>>>>>>>>>> the job is terminated while savepoints are retained. >>>>>>>>>>> >>>>>>>>>>> Regarding your question if a failed checkpoint should cause the >>>>>>>>>>> job to fail and recover I'm not sure what the current status is. >>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>>>>>>>> >>>>>>>>>>> Best, Fabian >>>>>>>>>>> >>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi < >>>>>>>>>>> vishal.santoshi@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> To add to it, my pipeline is a simple >>>>>>>>>>>> >>>>>>>>>>>> keyBy(0) >>>>>>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>>>>>> vishal.santoshi@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello folks, >>>>>>>>>>>>> >>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and >>>>>>>>>>>>> retried with potentially larger state. I had this situation >>>>>>>>>>>>> >>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>>>>>> * exception was thrown >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache. >>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE is not >>>>>>>>>>>>> supported in state standby. Visit https://s.apache.org/sbn >>>>>>>>>>>>> n-error >>>>>>>>>>>>> .................. >>>>>>>>>>>>> >>>>>>>>>>>>> at org.apache.flink.runtime.fs.hd >>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) >>>>>>>>>>>>> at org.apache.flink.core.fs.Safet >>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java >>>>>>>>>>>>> :111) >>>>>>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>>>>>> pointStreamFactory.java:132) >>>>>>>>>>>>> >>>>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint >>>>>>>>>>>>> failures, after the hdfs issues were resolved. >>>>>>>>>>>>> >>>>>>>>>>>>> I would not have worried about the restart, but it was evident >>>>>>>>>>>>> that I lost my operator state. Either it was my kafka consumer that kept on >>>>>>>>>>>>> advancing it's offset between a start and the next checkpoint failure ( a >>>>>>>>>>>>> minute's worth ) or the the operator that had partial aggregates was lost. >>>>>>>>>>>>> I have a 15 minute window of counts on a keyed operator >>>>>>>>>>>>> >>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned on. >>>>>>>>>>>>> >>>>>>>>>>>>> The questions thus are >>>>>>>>>>>>> >>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>>>>>>>> * Is the nature of the exception thrown have to do with any of >>>>>>>>>>>>> this b'coz suspend and resume from a save point work as expected ? >>>>>>>>>>>>> * And though I am pretty sure, are operators like the Window >>>>>>>>>>>>> operator stateful by drfault and thus if I have timeWindow(Time.of(window_ >>>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>>>>>> WindowFunction()), the state is managed by flink ? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> > --001a11467baa4bb7c105636dfe73 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Sorry for the late reply.

I created FLINK-8487 [1] to track this problem

@Vishal, can y= ou have a look and check if if forgot some details? I logged the issue for = Flink 1.3.2, is that correct?
Please add more information if = you think it is relevant.

Thanks,
Fabian

2018-= 01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santoshi@gmail.com>:

On Thu, Jan 18, 2018 a= t 4:13 PM, Vishal Santoshi <vishal.santoshi@gmail.com> wrote:
ping.=C2=A0
=C2=A0 =C2=A0 This happened again on production and it see= ms reasonable to abort when a checkpoint is not found rather than behave as= if it is a brand new pipeline.=C2=A0=C2=A0

On Tue, Jan 16, 2018 at 9:3= 3 AM, Vishal Santoshi <vishal.santoshi@gmail.com> wr= ote:
Folks sorry for bei= ng late on this. Can some body with the knowledge of this code base create = a jira issue for the above ? We have seen this more than once on production= .

On Mon, Oct 9, 2017 at 10:21 AM,= Aljoscha Krettek <aljoscha@apache.org> wrote:
Hi Vishal,

Some relevant Jira issues for yo= u are:

=C2=A0-=C2=A0https://issues.apache.org= /jira/browse/FLINK-4808:=C2=A0Allow skipping failed checkpoints
=C2=A0-=C2=A0https://issues.apache.org/jira/browse/FLINK-= 4815:=C2=A0Automatic fallback to earlier checkpoint when checkpoin= t restore fails
=C2=A0-=C2=A0https://issues.apache.org/jira/browse/FLINK-7783:=C2=A0Don't always remove checkpoints in Zoo= KeeperCompletedCheckpointStore#recover()

Best= ,
Aljoscha


On 9. Oct 2017, at 09:06, Fabian Hueske <fhueske@gmail.com> wrote:

H= i Vishal,

it would be great if you could create a JIRA ticket = with Blocker priority.
Please add all relevant information of your detai= led analysis, add a link to this email thread (see [1] for the web archive = of the mailing list), and post the id of the JIRA issue here.

= Thanks for looking into this!

Best regards,
Fabian

2017-10-06 15:59 GMT+02:00 Vishal Santoshi = <vishal.santoshi@gmail.com>:
Thank you for confirming.=C2=A0
=C2=A0 = =C2=A0 =C2=A0 =C2=A0

=C2=A0I think this is a criti= cal bug. In essence any checkpoint store ( hdfs/S3/File) =C2=A0will loose s= tate if it is unavailable at resume. This becomes all the more painful with= your confirming that =C2=A0"failed c= heckpoints killing the job" =C2=A0b'coz essentially it mean that i= f remote store in unavailable =C2=A0during checkpoint than you have lost st= ate ( till of course you have a retry of none or an unbounded retry delay, = a delay that you hope the store revives in ) .. Remember =C2=A0the first r= etry failure =C2=A0will cause new state according the code as written iff t= he remote store is down. We would rather have a configurable property that = establishes=C2=A0 our desire to abort something like a "abort_retry_on= _chkretrevalfailure"
=

In our case it is very important that we= do not undercount a window, one reason we use flink and it's awesome f= ailure guarantees, as various alarms sound ( we do anomaly detection on the= time series ).

Please create a jira ticket f= or us to follow or we could do it.


PS Not aborting on checkpointi= ng, till a configurable limit is very important too.


=
On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek= <aljoscha@apache.org> wrote:
Hi V= ishal,

I think you're right! And thanks for looking = into this so deeply.=C2=A0

With your last mail you= r basically saying, that the checkpoint could not be restored because your = HDFS was temporarily down. If Flink had not deleted that checkpoint it migh= t have been possible to restore it at a later point, right?

<= /div>
Regarding failed checkpoints killing the job: yes, this is curren= tly the expected behaviour but there are plans to change this.
Best,
Aljoscha

On= 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santoshi@gmail.com> wrote:
I think this is the offending pi= ece. There is a catch all Exception, which IMHO should understand a recover= able exception from an unrecoverable on.=C2=A0


try {
completedCheckpoin= t =3D retr= ieveCompletedCheckpoint(checkpointStateHandle);
= if (completedCheckpoint !=3D null) {
completedCheckpoints.add(completedCheckpoint);
}
} catch (Exception e) {
= LOG.warn("Could not retrieve checkpoint. Removing it from the = completed " +
"che= ckpoint store.", = e);
/= / remove the checkpoint with broken state handle
removeBrokenStateHandle(checkpo= intStateHandle.f1, checkpointStateHandle.f0);
= }

On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <vishal.santoshi@gmail.com> wrote:
So this is the issue and tell us that it is wrong. ZK had some state (= backed by hdfs ) that referred to a checkpoint ( the same exact last succ= essful checkpoint that was successful before NN screwed us ). When the JM t= ried to recreate the state and b'coz NN was down failed to retrieve the= CHK handle from hdfs and conveniently ( and I think very wrongly ) remov= ed the CHK from being considered and cleaned the pointer ( though failed as= was NN was down and is obvious from the dangling file in recovery ) . The = metadata itself was on hdfs and failure in retrieving should have been a st= op all, not going to trying doing magic exception rather than starting from= a blank state.

org.apache.flink.util.FlinkException: Could not retrieve = checkpoint 44286 from state handle under /0000000000000044286= . This indicates that the retrieved state handle is broken. Try cleaning th= e state handle store.






On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <vishal.santoshi@gmail.com> wrote:
Also note that =C2=A0the zookeeper recovery d= id =C2=A0( sadly on the same hdfs cluster ) also showed the same behavior. = It had the pointers to the chk point =C2=A0( I =C2=A0think that is what it = does, keeps metadata of where the checkpoint etc =C2=A0) .=C2=A0 It too dec= ided to keep the recovery file from the failed state.

-rw-r--r-- =C2=A0 3 root hadoop =C2=A0 =C2=A0 =C2=A0 7041 2017-10-04 13:55 /fl= ink-recovery/prod/completedCheckpoint6c9096bb9ed4

-rw-r--r-- =C2=A0 3 root hadoop =C2=A0 =C2=A0 =C2=A0 <= /span>7044 2017-10-05 10:07 /flink-recovery/prod/completedCheckpoint7c= 5a19300092

This is getting= a little interesting. What say you :)


On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi= <vishal.santoshi@gmail.com> wrote:
Another thing I noted was this thing

dr= wxr-xr-x =C2=A0 - root hadoop=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 0 2017-10-04 13:54 /flink-checkpoints/prod/c4af8d<= wbr>fa864e2f9a51764de9f0725b39/chk-44286

drwxr-xr-x =C2=A0 - = root hadoop=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 0 2017-10-0= 5 09:15 /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428


Generally what Flink does IMHO is that it replaces the chk point direc= tory with a new one. I see it happening now. Every minute it replaces the o= ld directory.=C2=A0 In this job's case however, it did not delete the 2= 017-10-04 13:54 =C2=A0and hence the chk-44286 directory.=C2=A0 This was the= last chk-44286 ( =C2=A0I think =C2=A0) =C2=A0successfully created before N= N had issues but as is usual did not delete this =C2=A0chk-44286. It looks = as if it started with a blank slate ???????? Does this strike a chord ?????=


On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <<= a href=3D"mailto:vishal.santoshi@gmail.com" target=3D"_blank">vishal.santos= hi@gmail.com> wrote:
Hello Fabian,=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 First of all congratulations on this= fabulous framework. I have worked with GDF and though GDF has some natural= pluses Flink's state management is far more advanced. With kafka as a = source it negates issues GDF has ( GDF integration with pub/sub is organic = and that is to be expected but non FIFO pub/sub is an issue with windows on= event time etc )

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Coming back to this issue. We have th= at same kafka topic feeding a streaming druid datasource and we do not see = any issue there, so so data loss on the source, kafka is not applicable. I = am totally certain that the "retentio= n" time was not an issue. It is 4 days of retention and we fixed this = issue within 30 minutes. We could replay kafka with a new consumer group.id and that worked fine.= =C2=A0

<= div>
Note these properties and see if they strike a chord.

* The=C2=A0setCommitOffsetsOn= Checkpoints(boolean)=C2=A0for = kafka consumers is the default true. I bring this up to see whether flink w= ill in any circumstance drive consumption on the kafka perceived offset rat= her than the one in the checkpoint.

* The=C2= =A0state.backend.fs.memory-threshold: 0=C2=A0has not been set.=C2=A0 The state is big enough= though therefore IMHO no way the state is stored along with the meta data = in JM ( or ZK ? ) . The reason I bring this up is to make sure when you say= that the size has to be less than 1024bytes , you are talking about cumula= tive=C2=A0state of the pipeine.

* We have a g= ood sense of SP ( save point ) =C2=A0and CP ( checkpoint ) and certainly un= derstand that they actually are not dissimilar. However in this case there = were multiple attempts to restart the pipe before it finally succeeded.=C2= =A0

* Other hdfs related poperties.
=C2=A0
=C2=A0state.backend.fs.checkpointdi= r: hdfs:///flink-checkpoints/<%=3D flink_hdfs_root %>
 state.savepoints=
.dir: hdfs:///flink-savepoints/<%=3D flink_hdfs_root %>
 recovery.zookeeper.sto=
rageDir: hdfs:///flink-recovery/<%=3D flink_hdfs_root %>
=


= Do these make sense ? Is there anything el= se I should look at.=C2=A0 Please also note that it is the second time this= has happened. The first time I was vacationing and was not privy to the st= ate of the flink pipeline, but the net effect were similar. The counts for = the first window after an internal restart dropped.=C2=A0
=



Thank you for you patience and regards,<= /div>

Vishal






<= div>


=

On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hue= ske <fhueske@gmail.com> wrote:
Hi Vishal,

window operators ar= e always stateful because the operator needs to remember previously receive= d events (WindowFunction) or intermediate results (ReduceFunction).
Given the program you described, a checkpoint should include the Kaf= ka consumer offset and the state of the window operator. If the program eve= ntually successfully (i.e., without an error) recovered from the last check= point, all its state should have been restored. Since the last checkpoint w= as before HDFS went into safe mode, the program would have been reset to th= at point. If the Kafka retention time is less than the time it took to fix = HDFS you would have lost data because it would have been removed from Kafka= . If that's not the case, we need to investigate this further because a= checkpoint recovery must not result in state loss.

Restoring from a savepoint is not so much different from automatic checkp= oint recovery. Given that you have a completed savepoint, you can restart t= he job from that point. The main difference is that checkpoints are only us= ed for internal recovery and usually discarded once the job is terminated w= hile savepoints are retained.

Regarding your = question if a failed checkpoint should cause the job to fail and recover I&= #39;m not sure what the current status is.
Stefan (in CC) should = know what happens if a checkpoint fails.

Best, Fab= ian

2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vis= hal.santoshi@gmail.com>:
To add to it, my pipeline is a simple=C2=A0

keyBy(0)
.timeWindow(Time.of(window_size, TimeUnit.MINUTES))
.allowedLa= teness(Time.of(late_by, TimeUnit.SECONDS))
.reduce(new ReduceFunction(), new WindowFunction())

On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <vishal.santoshi@gmail.com> wrote:
Hello folks= ,

As far as I know checkpoint failure should be i= gnored and retried with potentially larger state. I had this situation

* hdfs went into a safe mode b'coz of Name Node i= ssues
* exception was thrown

=C2=A0 =C2=A0 org.apache.= hadoop.ipc.RemoteException(org= .apache.hadoop.ipc.StandbyExce= ption): Operation category WRITE is not supported in state standby. Visit= =C2=A0https://s.apache.org/sbn= n-error
= =C2=A0 =C2=A0 ..................

=C2=A0 =C2=A0 at= org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.core.= fs.SafetyNetWrapperFileSystem.= mkdirs(SafetyNetWrapperFileSystem.java:111)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.= state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.j= ava:132)

* The pipeline came back after a few res= tarts and checkpoint failures, after the hdfs issues were resolved.<= br style=3D"font-size:12.8px">
I would not have worried about the restart, but it was e= vident that I lost my operator state. Either it was my kafka consumer that = kept on advancing it's offset between a start and the next checkpoint f= ailure ( a minute's worth ) or the the operator that had partial aggreg= ates was lost. I have a 15 minute window of counts on a keyed operator

I am using ROCKS DB and of course have checkpointing = turned on.

The questions thus are

* Should a pipeline be restarted if checkpoint fails ?* Why on resta= rt did the operator state did not recreate ?
* Is the nature of the exception th= rown have to do with any of this b'coz suspend and resume from a save p= oint work as expected ?
* And though I am pretty sure, are operators like the Wi= ndow operator stateful by drfault and thus if I have timeWindow(Time.of(win= dow_size, TimeUnit.MINUTES)).r= educe(new ReduceFunction(), new WindowFunction()), the state is managed by = flink ?

Thanks.














--001a11467baa4bb7c105636dfe73--