Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A49E1200D15 for ; Thu, 5 Oct 2017 16:14:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A2C711609E1; Thu, 5 Oct 2017 14:14:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4C28E1609DA for ; Thu, 5 Oct 2017 16:14:02 +0200 (CEST) Received: (qmail 28604 invoked by uid 500); 5 Oct 2017 14:13:56 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 28594 invoked by uid 99); 5 Oct 2017 14:13:56 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Oct 2017 14:13:56 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 777ABC21D2 for ; Thu, 5 Oct 2017 14:13:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id rOXSAQPRzhYN for ; Thu, 5 Oct 2017 14:13:52 +0000 (UTC) Received: from mail-oi0-f51.google.com (mail-oi0-f51.google.com [209.85.218.51]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id F25D05FCF2 for ; Thu, 5 Oct 2017 14:13:51 +0000 (UTC) Received: by mail-oi0-f51.google.com with SMTP id f3so24822435oia.2 for ; Thu, 05 Oct 2017 07:13:51 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=J7Jn5a3cfhNYaAAq+yaSNtDvMEvS2Tovx3dCtYKjGIw=; b=eyRrYmjtOfwIl4dVSqvrZPD2UEFiFk1PagRz7d1VF4J7XAMKnTcTarE/BIQCTNc5WK 8GkS0Okdffe+UIx7dvSfPc7FEj9ad7dIKxkNmXmbl8paCFxPqiRlRIqt7nnOAIIgg3OK zc+C+auLJh4zb/xSoUMV7TvlOhwnF4C1jnnv30mSHEAoHUVlaJBHidn43DAxz6FSkU+v pXiaJT5z7LQAS3oadSSPYJZUc+qG3u+ghWZPsllFSfy0krWht40IZRpqzzltMTXGl9Vb ddjKe8MoJp55uSFZaWmojEPOKXGb7eTSPolR74/5voazD/T/MBPdvpbcZxeu+gDdq7fB uh4w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=J7Jn5a3cfhNYaAAq+yaSNtDvMEvS2Tovx3dCtYKjGIw=; b=tmnJ5sCN6H4hUcWJw1RN9W5yoVPDqnKBPO4uTcr6UWuJ5sxE8MDEXqwdaokKN40r7b Gq/MZNzzYodCCCGm0BLNDUSL8xrHRkXtUXm92CvJb/aTfBp2zhB1divQWqX2yb8+VbXs 96iZrKO6CFusjaQLf6mtSKTrl9QIrOhU9mkRMsYNSjFmq2fpF9NZxGOHfGYQ9Kzo5+Sp qGV2q5/NLDElICqctVszLHzq82Adk837U1VMd/HYW3Uy0Lb0vTSjwnmykPdxK+NwnXUs dLsPk/FJC51eirPJagiS8nNTQKcipJ71rr68V+yc4eTVAbKLPrsUB2n3tJym2sPFXAsO MtJA== X-Gm-Message-State: AMCzsaUeBxaEjFppOc0luhTvGGjhZMcIb7lUMmWq2ERjBfriUleCNFgl 3WpMDGWlhBrIjrJfvuBp/Uc0zQoCpBbEgg4ntXM= X-Google-Smtp-Source: AOwi7QCd4aptTIDToslZULbAh/QCn3B3eTNcpKfe9QMUX4KNKSgA99NHkPdCwGSkDEafGVxEGjNJlRvyeXQlik97V/0= X-Received: by 10.157.6.162 with SMTP id 31mr1092547otx.341.1507212824332; Thu, 05 Oct 2017 07:13:44 -0700 (PDT) MIME-Version: 1.0 Received: by 10.202.53.197 with HTTP; Thu, 5 Oct 2017 07:13:43 -0700 (PDT) In-Reply-To: References: From: Vishal Santoshi Date: Thu, 5 Oct 2017 10:13:43 -0400 Message-ID: Subject: Re: Failing to recover once checkpoint fails To: Fabian Hueske Cc: user , Stefan Richter Content-Type: multipart/alternative; boundary="94eb2c04f9ecfecd00055acd54fc" archived-at: Thu, 05 Oct 2017 14:14:03 -0000 --94eb2c04f9ecfecd00055acd54fc Content-Type: text/plain; charset="UTF-8" Also note that the zookeeper recovery did ( sadly on the same hdfs cluster ) also showed the same behavior. It had the pointers to the chk point ( I think that is what it does, keeps metadata of where the checkpoint etc ) . It too decided to keep the recovery file from the failed state. -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 /flink-recovery/prod/completedCheckpoint7c5a19300092 This is getting a little interesting. What say you :) On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi wrote: > Another thing I noted was this thing > > drwxr-xr-x - root hadoop 0 2017-10-04 13:54 > /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 > > drwxr-xr-x - root hadoop 0 2017-10-05 09:15 > /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 > > > Generally what Flink does IMHO is that it replaces the chk point directory > with a new one. I see it happening now. Every minute it replaces the old > directory. In this job's case however, it did not delete the 2017-10-04 > 13:54 and hence the chk-44286 directory. This was the last chk-44286 ( I > think ) successfully created before NN had issues but as is usual did not > delete this chk-44286. It looks as if it started with a blank slate > ???????? Does this strike a chord ????? > > On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi > wrote: > >> Hello Fabian, >> First of all congratulations on this fabulous >> framework. I have worked with GDF and though GDF has some natural pluses >> Flink's state management is far more advanced. With kafka as a source it >> negates issues GDF has ( GDF integration with pub/sub is organic and that >> is to be expected but non FIFO pub/sub is an issue with windows on event >> time etc ) >> >> Coming back to this issue. We have that same kafka >> topic feeding a streaming druid datasource and we do not see any issue >> there, so so data loss on the source, kafka is not applicable. I am totally >> certain that the "retention" time was not an issue. It is 4 days of >> retention and we fixed this issue within 30 minutes. We could replay kafka >> with a new consumer group.id and that worked fine. >> >> >> Note these properties and see if they strike a chord. >> >> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the >> default true. I bring this up to see whether flink will in any circumstance >> drive consumption on the kafka perceived offset rather than the one in the >> checkpoint. >> >> * The state.backend.fs.memory-threshold: 0 has not been set. The state >> is big enough though therefore IMHO no way the state is stored along with >> the meta data in JM ( or ZK ? ) . The reason I bring this up is to make >> sure when you say that the size has to be less than 1024bytes , you are >> talking about cumulative state of the pipeine. >> >> * We have a good sense of SP ( save point ) and CP ( checkpoint ) and >> certainly understand that they actually are not dissimilar. However in this >> case there were multiple attempts to restart the pipe before it finally >> succeeded. >> >> * Other hdfs related poperties. >> >> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >> flink_hdfs_root %> >> >> state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %> >> >> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= flink_hdfs_root %> >> >> >> >> Do these make sense ? Is there anything else I should look at. Please >> also note that it is the second time this has happened. The first time I >> was vacationing and was not privy to the state of the flink pipeline, but >> the net effect were similar. The counts for the first window after an >> internal restart dropped. >> >> >> >> >> Thank you for you patience and regards, >> >> Vishal >> >> >> >> >> >> >> >> >> >> >> >> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske wrote: >> >>> Hi Vishal, >>> >>> window operators are always stateful because the operator needs to >>> remember previously received events (WindowFunction) or intermediate >>> results (ReduceFunction). >>> Given the program you described, a checkpoint should include the Kafka >>> consumer offset and the state of the window operator. If the program >>> eventually successfully (i.e., without an error) recovered from the last >>> checkpoint, all its state should have been restored. Since the last >>> checkpoint was before HDFS went into safe mode, the program would have been >>> reset to that point. If the Kafka retention time is less than the time it >>> took to fix HDFS you would have lost data because it would have been >>> removed from Kafka. If that's not the case, we need to investigate this >>> further because a checkpoint recovery must not result in state loss. >>> >>> Restoring from a savepoint is not so much different from automatic >>> checkpoint recovery. Given that you have a completed savepoint, you can >>> restart the job from that point. The main difference is that checkpoints >>> are only used for internal recovery and usually discarded once the job is >>> terminated while savepoints are retained. >>> >>> Regarding your question if a failed checkpoint should cause the job to >>> fail and recover I'm not sure what the current status is. >>> Stefan (in CC) should know what happens if a checkpoint fails. >>> >>> Best, Fabian >>> >>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi : >>> >>>> To add to it, my pipeline is a simple >>>> >>>> keyBy(0) >>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>> >>>> >>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>> vishal.santoshi@gmail.com> wrote: >>>> >>>>> Hello folks, >>>>> >>>>> As far as I know checkpoint failure should be ignored and retried with >>>>> potentially larger state. I had this situation >>>>> >>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>> * exception was thrown >>>>> >>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >>>>> Operation category WRITE is not supported in state standby. Visit >>>>> https://s.apache.org/sbnn-error >>>>> .................. >>>>> >>>>> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(Had >>>>> oopFileSystem.java:453) >>>>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs( >>>>> SafetyNetWrapperFileSystem.java:111) >>>>> at org.apache.flink.runtime.state.filesystem. >>>>> FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory. >>>>> java:132) >>>>> >>>>> * The pipeline came back after a few restarts and checkpoint failures, >>>>> after the hdfs issues were resolved. >>>>> >>>>> I would not have worried about the restart, but it was evident that I >>>>> lost my operator state. Either it was my kafka consumer that kept on >>>>> advancing it's offset between a start and the next checkpoint failure ( a >>>>> minute's worth ) or the the operator that had partial aggregates was lost. >>>>> I have a 15 minute window of counts on a keyed operator >>>>> >>>>> I am using ROCKS DB and of course have checkpointing turned on. >>>>> >>>>> The questions thus are >>>>> >>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>> * Why on restart did the operator state did not recreate ? >>>>> * Is the nature of the exception thrown have to do with any of this >>>>> b'coz suspend and resume from a save point work as expected ? >>>>> * And though I am pretty sure, are operators like the Window operator >>>>> stateful by drfault and thus if I have timeWindow(Time.of(window_size, >>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), the >>>>> state is managed by flink ? >>>>> >>>>> Thanks. >>>>> >>>> >>>> >>> >> > --94eb2c04f9ecfecd00055acd54fc Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Also note that =C2=A0the zookeeper recovery did =C2= =A0( sadly on the same hdfs cluster ) also showed the same behavior. It had= the pointers to the chk point =C2=A0( I =C2=A0think that is what it does, = keeps metadata of where the checkpoint etc =C2=A0) .=C2=A0 It too decided t= o keep the recovery file from the failed state.

-rw-r--r-- =C2=A0 3 root hadoop =C2=A0 =C2=A0 =C2=A0 7041 2017-10-04 13:55 /= flink-recovery/prod/completedCheckpoint6c9096bb9ed4

-rw-r--r-- =C2=A0 3 root hadoop = =C2=A0 =C2=A0 =C2=A0 7044 2017-10-05 10:07 /flink-recovery/prod/comp= letedCheckpoint7c5a19300092

This is getting a = little interesting. What say you :)


=

On Thu, Oct 5, 20= 17 at 9:26 AM, Vishal Santoshi <vishal.santoshi@gmail.com><= /span> wrote:
Another th= ing I noted was this thing

drwxr-xr-x =C2=A0 - root hadoop=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 0 2017-10-04 13:54 /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9= f0725b39/chk-44286

drwxr-xr-x =C2=A0 - root hadoop=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 0 2017-10-05 09:15 /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9= f0725b39/chk-45428


Generally what Flink does IMHO is that = it replaces the chk point directory with a new one. I see it happening now.= Every minute it replaces the old directory.=C2=A0 In this job's case h= owever, it did not delete the 2017-10-04 13:54 =C2=A0and hence the chk-4428= 6 directory.=C2=A0 This was the last chk-44286 ( =C2=A0I think =C2=A0) =C2= =A0successfully created before NN had issues but as is usual did not delete= this =C2=A0chk-44286. It looks as if it started with a blank slate ???????= ? Does this strike a chord ?????


On T= hu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <vishal.santoshi@gma= il.com> wrote:
Hello Fabian,=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 First of all congratulations on this fabulo= us framework. I have worked with GDF and though GDF has some natural pluses= Flink's state management is far more advanced. With kafka as a source = it negates issues GDF has ( GDF integration with pub/sub is organic and tha= t is to be expected but non FIFO pub/sub is an issue with windows on event = time etc )

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Coming back to this issue. We have that same= kafka topic feeding a streaming druid datasource and we do not see any iss= ue there, so so data loss on the source, kafka is not applicable. I am tota= lly certain that the "retention"= time was not an issue. It is 4 days of retention and we fixed this issue w= ithin 30 minutes. We could replay kafka with a new consumer group.id and that worked fine.=C2=A0


Note these properties and see if they strike a chord.

* The=C2=A0setCommitOf= fsetsOnCheckpoints(boolean)=C2=A0for kafka consumers is the default true. I bring this up to see whether = flink will in any circumstance drive consumption on the kafka perceived off= set rather than the one in the checkpoint.

* = The=C2=A0state.backend.fs.memory-threshold: 0=C2=A0has not been set.=C2=A0 The state is big = enough though therefore IMHO no way the state is stored along with the meta= data in JM ( or ZK ? ) . The reason I bring this up is to make sure when y= ou say that the size has to be less than 1024bytes , you are talking about = cumulative=C2=A0state of the pipeine.

* We ha= ve a good sense of SP ( save point ) =C2=A0and CP ( checkpoint ) and certai= nly understand that they actually are not dissimilar. However in this case = there were multiple attempts to restart the pipe before it finally succeede= d.=C2=A0

* Other hdfs related poperties.
=C2=A0
=C2=A0state.b= ackend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=3D flink_hdfs= _root %>
 state.savepoints.dir: hdfs:///flink-savepoints/<%=3D fli=
nk_hdfs_root %>
 recovery.zookeeper.storageDir: hdfs:///flink-recovery/&=
lt;%=3D flink_hdfs_root %>


Do thes= e make sense ? Is there anything else I should look at.=C2=A0 Please also n= ote that it is the second time this has happened. The first time I was vaca= tioning and was not privy to the state of the flink pipeline, but the net e= ffect were similar. The counts for the first window after an internal resta= rt dropped.=C2=A0




Thank you fo= r you patience and regards,

Vishal




<= div>


<= span style=3D"font-size:12.8px">


=
On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <= fhueske@gmail.com> wrote:
<= div dir=3D"ltr">
Hi Vishal,

window operators are alwa= ys stateful because the operator needs to remember previously received even= ts (WindowFunction) or intermediate results (ReduceFunction).
Given the program you described, a checkpoint should include the Kafka con= sumer offset and the state of the window operator. If the program eventuall= y successfully (i.e., without an error) recovered from the last checkpoint,= all its state should have been restored. Since the last checkpoint was bef= ore HDFS went into safe mode, the program would have been reset to that poi= nt. If the Kafka retention time is less than the time it took to fix HDFS y= ou would have lost data because it would have been removed from Kafka. If t= hat's not the case, we need to investigate this further because a check= point recovery must not result in state loss.

Rest= oring from a savepoint is not so much different from automatic checkpoint r= ecovery. Given that you have a completed savepoint, you can restart the job= from that point. The main difference is that checkpoints are only used for= internal recovery and usually discarded once the job is terminated while s= avepoints are retained.

Regarding your questi= on if a failed checkpoint should cause the job to fail and recover I'm = not sure what the current status is.
Stefan (in CC) should know w= hat happens if a checkpoint fails.

Best, Fabian

2017-10-05 2:20 GMT+02:00 = Vishal Santoshi <vishal.santoshi@gmail.com>:
To add to it, my pipeline is a = simple=C2=A0

keyBy(0)        .timeWindow(Time.of(window_size,=
 TimeUnit.MINUTES))
.allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
.reduce(new ReduceFunction()= , new WindowFunc= tion())
<= br>
On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santos= hi <vishal.santoshi@gmail.com> wrote:
Hello= folks,

As far as I know checkpoint failure shoul= d be ignored and retried with potentially larger state. I had this situatio= n

* hdfs went into a safe mode b'coz of Name = Node issues
* exception was thrown

=C2=A0 =C2=A0 org.a= pache.hadoop.ipc.RemoteExcepti= on(org.apache.hadoop.ipc.Stand= byException): Operation category WRITE is not supported in state standby. V= isit=C2=A0https://s.apache.org/sbn<= wbr>n-error
=C2=A0 =C2=A0 ..................
<= br style=3D"font-size:12.8px">=C2=A0 =C2= =A0 at org.apache.flink.runtime.fs.= hdfs.HadoopFileSystem.mkdirs(H= adoopFileSystem.java:453)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.c= ore.fs.SafetyNetWrapperFileSys= tem.mkdirs(SafetyNetWrapperFileSystem.java:111)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runt= ime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)

* The pipeline came back after a few= restarts and checkpoint failures, after the hdfs issues were resolved.

I would not have worried about the restart, but it w= as evident that I lost my operator state. Either it was my kafka consumer t= hat kept on advancing it's offset between a start and the next checkpoi= nt failure ( a minute's worth ) or the the operator that had partial ag= gregates was lost. I have a 15 minute window of counts on a keyed operator<= /span>

I am using ROCKS DB and of course have checkpoint= ing turned on.

The questions thus are

* Should a pipeline be restarted if checkpoint fails ?

* Why on r= estart did the operator state did not recreate ?
* Is the nature of the exceptio= n thrown have to do with any of this b'coz suspend and resume from a sa= ve point work as expected ?
* And though I am pretty sure, are operators like th= e Window operator stateful by drfault and thus if I have timeWindow(Time.of= (window_size, TimeUnit.MINUTES= )).reduce(new ReduceFunction(), new WindowFunction()), the state is managed= by flink ?

Thanks.





--94eb2c04f9ecfecd00055acd54fc--