From user-return-20386-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Jun 5 12:01:05 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id A930F180674 for ; Tue, 5 Jun 2018 12:01:04 +0200 (CEST) Received: (qmail 62252 invoked by uid 500); 5 Jun 2018 10:01:03 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 62178 invoked by uid 99); 5 Jun 2018 10:01:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jun 2018 10:01:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 550E018084F for ; Tue, 5 Jun 2018 10:01:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.08 X-Spam-Level: *** X-Spam-Status: No, score=3.08 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, HTML_OBFUSCATE_05_10=0.001, KAM_LINEPADDING=1.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id vMCu9_PBFG3Q for ; Tue, 5 Jun 2018 10:00:59 +0000 (UTC) Received: from mail-lf0-f67.google.com (mail-lf0-f67.google.com [209.85.215.67]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 81A325F27B for ; Tue, 5 Jun 2018 10:00:59 +0000 (UTC) Received: by mail-lf0-f67.google.com with SMTP id n3-v6so2572043lfe.12 for ; Tue, 05 Jun 2018 03:00:59 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=XJ/iP5qoCxkUb1jl4/CVcUqzeKKgLQm2Bvw5MfJTFDY=; b=EkRkjJiH7sjfQSni50iKxBrZ0rePgkRFq+99BXEUm8bjpp0sMkKFXbeisRMSaIemjc s57xhFiYvWu7Z2QrspWUfac6lkuNCaowBJRBVz1js0kU66W60msO2vne/K/DRtINNlkZ g7drw5ab5TyChhXsxm480ozrBaXoouHfnVy621+qeoWOfftjvz5a0M2v86edXtoHv95W C2TUFOpZLQDP2mcT6yTNzrRPBQ6CjMIr/xtefOZMKRevUzIA8i8YtQBLS7sXtnXGNVLe TidWxTaRNuvIZIFhWXhlf3D5iF7qNhZ3yr+odtjR3Jlc5w3oiM9+VMuksagrzyUdqTWO KYvw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=XJ/iP5qoCxkUb1jl4/CVcUqzeKKgLQm2Bvw5MfJTFDY=; b=fE2djhL0oci+ogobSM1oZ7TW7QU3M6V4Z2v9eJuqtavF/tR/Y4RrG+ILyz4Cc/5Yzs B0cm/XTtgUp89+0opvZZUYWLKcK5SiBAiMGQcplyf2gvvbgGj1IwoQoxqb75eyYqAjHp 0luQ97/D47hdR5onY0MPTqQuSuYMB5rOOg8Y9dEOCXrc8HfzocQc+2VIy8TINUttNDYD 4ooWlHIRvqI3oY9QrrCUbgm4Bi5butQhyYUFDmll8/0u2oVjnoD7JyBCbgNAGj6nv/AE d851gf4ngcQyghBekYgVN86aPvH9O0fhSgn6SP5rMx7LyirI5f7C6/WMs0rAssb/FVbZ nL8A== X-Gm-Message-State: APt69E1VhGgJrMfZke39KMGI1U/kWDFrx5BKqn3W8F0v0h+cbF6wyosb nR77AjOus46yRHoFC1NrtTdNemg3Ria5eJeahhU= X-Google-Smtp-Source: ADUXVKKNdhMEd+t0+GUq580y8aqM+MzrLnnr0wX3Q7xlpnoPcGne/VE6CpF/j2X+hZTXhqbvWClSub/bAxT1IEEWgLY= X-Received: by 2002:a19:1099:: with SMTP id 25-v6mr1195133lfq.112.1528192859009; Tue, 05 Jun 2018 03:00:59 -0700 (PDT) MIME-Version: 1.0 Received: by 2002:a2e:2415:0:0:0:0:0 with HTTP; Tue, 5 Jun 2018 03:00:18 -0700 (PDT) In-Reply-To: References: <23a628a6-113b-3072-b366-13b655adc2bf@apache.org> From: Fabian Hueske Date: Tue, 5 Jun 2018 12:00:18 +0200 Message-ID: Subject: Re: NPE in flink sql over-window To: "Yan Zhou [FDS Science]" Cc: Dawid Wysakowicz , user Content-Type: multipart/alternative; boundary="000000000000827b2e056de220ae" --000000000000827b2e056de220ae Content-Type: text/plain; charset="UTF-8" Hi Yan, Thanks for providing the logs and opening the JIRA issue! Let's continue the discussion there. Best, Fabian 2018-06-05 1:26 GMT+02:00 Yan Zhou [FDS Science] : > Hi Fabian, > > I added some trace logs in ProcTimeBoundedRangeOver and think it should > be a bug. The log should explain how cleanup_time_1 bypasses the needToCleanupState > check and causes NPE. A jira ticket [1] is created. > > Best > Yan > > > *[ts:1528149296456] [label:state_ttl_update] register for cleanup at > 1528150096456(CLEANUP_TIME_1), because of Row:(orderId:001,userId:U123)* > *[ts:1528149296456] [label:register_pt] register for process input at > 1528149296457, because of Row:(orderId:001,userId:U123)* > *[ts:1528149296458] [label:state_apply] ontimer at 1528149296457, apply > Row:(orderId:001,userId:U123) to accumulator* > > *[ts:1528149885813] [label:state_ttl_update] register at > 1528150685813(CLEANUP_TIME_2), because of Row:(orderId:002,userId:U123)* > *[ts:1528149885813] [label:register_pt] register for process input at > 1528149885814, because of Row:(orderId:002,userId:U123)* > *[ts:1528149885814] [label:state_apply] ontimer at 1528149885814, apply > Row:(orderId:002,userId:U123) to accumulator* > > *[ts:1528150096460] [label:NO_ELEMENTS_IN_STATE] ontimer at > 1528150096456(CLEANUP_TIME_1), bypass needToCleanupState check, however > rowMapState is {key:1528150096455, value:[]}* > > *[ts:1528150685815] [label:state_timeout] ontimer at > 1528150685813(CLEANUP_TIME_2), clean/empty the rowMapState > [{key:1528149885813, value:[Row:(orderId:002,userId:U123)]}]* > > > > > > > > > [1] : https://issues.apache.org/jira/browse/FLINK-9524 > > > ------------------------------ > *From:* Yan Zhou [FDS Science] > *Sent:* Monday, June 4, 2018 4:05 PM > *To:* Fabian Hueske > > *Cc:* Dawid Wysakowicz; user > *Subject:* Re: NPE in flink sql over-window > > > Hi Fabian, > > > Yes, the NPE cause the job failure and recovery( instead of being the > result of a recovery). > > And yet, during the recovery, the same exceptions are thrown from same > line. > > > Best > > Yan > ------------------------------ > *From:* Fabian Hueske > *Sent:* Thursday, May 31, 2018 12:09:03 AM > *To:* Yan Zhou [FDS Science] > *Cc:* Dawid Wysakowicz; user > *Subject:* Re: NPE in flink sql over-window > > Hi Yan, > > Thanks for the details and for digging into the issue. > If I got it right, the NPE caused the job failure and recovery (instead of > being the result of a recovery), correct? > > Best, Fabian > > 2018-05-31 7:00 GMT+02:00 Yan Zhou [FDS Science] : > > Thanks for the replay. > > > Yes, it only happen if I config the idle state retention times. The error > occurs the first time before the first recovery. I haven't run with > proctime but rowtime in flink 1.4.x. I am not sure if it will cause > problems with proctime in 1.4.x. > > > I am adding some trace log for ProcTimeBoundedRangeOver. I will update > with my test result and fire a JIRA after that. > > > Best > > Yan > ------------------------------ > *From:* Fabian Hueske > *Sent:* Wednesday, May 30, 2018 1:43:01 AM > *To:* Dawid Wysakowicz > *Cc:* user > *Subject:* Re: NPE in flink sql over-window > > Hi, > > Dawid's analysis is certainly correct, but looking at the code this should > not happen. > > I have a few questions: > - You said this only happens if you configure idle state retention times, > right? > - Does the error occur the first time without a previous recovery? > - Did you run the same query on Flink 1.4.x without any problems? > > Thanks, Fabian > > 2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz : > > Hi Yan, > > > I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a > list of elements that was already cleared and does not check against null. > Could you please file a JIRA for that? > > > Best, > > Dawid > > On 30/05/18 08:27, Yan Zhou [FDS Science] wrote: > > I also get warnning that CodeCache is full around that time. It's printed > by JVM and doesn't have timestamp. But I suspect that it's because so > many failure recoveries from checkpoint and the sql queries are dynamically > compiled too many times. > > > > *Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler > has been disabled.* > *Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache > size using -XX:ReservedCodeCacheSize=* > *CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb* > *bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]* > *total_blobs=54308 nmethods=53551 adapters=617* > *compilation: disabled (not enough contiguous free space left)* > > > > ------------------------------ > *From:* Yan Zhou [FDS Science] > *Sent:* Tuesday, May 29, 2018 10:52:18 PM > *To:* user@flink.apache.org > *Subject:* NPE in flink sql over-window > > > Hi, > > I am using flink sql 1.5.0. My application throws NPE. And after it > recover from checkpoint automatically, it throws NPE immediately from same > line of code. > > > My application read message from kafka, convert the datastream into a > table, issue an Over-window aggregation and write the result into a sink. > NPE throws from class ProcTimeBoundedRangeOver. Please see exception log > at the bottom. > > > The exceptions always happens after the application started for *maxIdleStateRetentionTime > *time. What could be the possible causes? > > > Best > > Yan > > > *2018-05-27 11:03:37,656 INFO org.apache.flink.runtime.taskmanager.Task > - over: (PARTITION BY: uid, ORDER BY: proctime, > RANGEBETWEEN 86400000 PRECEDI* > *NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS > w0$o0)) -> select: * > *(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter -> > Sink: Unnamed (3/15) (327* > *efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.* > *TimerException{java.lang.NullPointerException}* > * at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)* > * at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)* > * at java.util.concurrent.FutureTask.run(FutureTask.java:266)* > * at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)* > * at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)* > * at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)* > * at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)* > * at java.lang.Thread.run(Thread.java:748)* > *Caused by: java.lang.NullPointerException* > * at > org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)* > * at > org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)* > * at > org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)* > * at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)* > * at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)* > > > > > > > --000000000000827b2e056de220ae Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Yan,

Thanks for providing t= he logs and opening the JIRA issue!
Let's continue the discuss= ion there.

Best, Fabian

2018-06-05 1:26 GMT+02:0= 0 Yan Zhou [FDS Science] <yzhou@coupang.com>:
Hi Fabian,

I added some trace logs in=C2=A0ProcTimeBoundedRangeOver and thi= nk it should be a bug. The log should explain how=C2=A0cleanup_time_1 bypasses the=C2=A0needToCleanupState check and causes NPE. A jira ticket [1] is created.=C2=A0

Best
Yan


[ts:1528149296456] [label:state_ttl_update] register for cleanup at= 1528150096456(CLEANUP_TIME_1), because of Row:(orderId:001,userId:U123)
[ts:1528149296456] [label:register_pt] register for process input a= t 1528149296457, because of Row:(orderId:001,userId:U123)
[ts:1528149296458] [label:state_apply] ontimer at 1528149296457, ap= ply Row:(orderId:001,userId:U123) to accumulator

[ts:1528149885813] [label:state_ttl_update] register at 15281506858= 13(CLEANUP_TIME_2), because of Row:(orderId:002,userId:U123)
[ts:1528149885813] [label:register_pt] register for process input a= t 1528149885814, because of Row:(orderId:002,userId:U123)
[ts:1528149885814] [label:state_apply] ontimer at 1528149885814, ap= ply Row:(orderId:002,userId:U123) to accumulator

[ts:1528150096460] [label:NO_ELEMENTS_IN_STATE] ontimer at 15281500= 96456(CLEANUP_TIME_1), bypass needToCleanupState check, however rowMapState is {key:1528150096455= , value:[]}

[ts:1528150685815] [label:state_timeout] ontimer at 1528150685813(<= span style=3D"font-family:Calibri,Helvetica,sans-serif,Helvetica,EmojiFont,= "Apple Color Emoji","Segoe UI Emoji",NotoColorEmoji,&qu= ot;Segoe UI Symbol","Android Emoji",EmojiSymbols;font-size:1= 6px">CLEANUP_TIME_2), clean/empty the rowMapState [{key:1528149885813, value:[Row:(orderId:002,<= wbr>userId:U123)]}]





From: Yan Zhou [FDS Science] <yzhou@coupang.com>
Sent: Monday, June 4, 2018 4:05 PM
To: Fabian Hueske

Cc: Dawid Wysakowicz; user
Subject: Re: NPE in flink sql over-window
=C2=A0

Hi Fabian,


Yes, the NPE cause the job failur= e and recovery( instead of being the result of a recovery).=C2=A0

And yet, during the recovery, the= same exceptions are thrown from same line.


Best

Yan


From:= Fabian Hueske <fhueske@gmail.com>
Sent: Thursday, May 31, 2018 12:09:03 AM
To: Yan Zhou [FDS Science]
Cc: Dawid Wysakowicz; user
Subject: Re: NPE in flink sql over-window
=C2=A0
Hi Yan,

Thanks for the details and for digging into the issue.
If I got it right, the NPE caused the job failure and recovery (instead of = being the result of a recovery), correct?

Best, Fabian

2018-05-31 7:00 GMT+02= :00 Yan Zhou [FDS Science] <yzhou@= coupang.com>:

Thanks for the replay.


Yes, it only happen if I config t= he idle state retention times. The error occurs the first time before the f= irst recovery. I haven't run with proctime but rowtime in flink 1.4.x. = I am not sure if it will cause problems with proctime in 1.4.x.=C2=A0


I am adding some trace log for=C2= =A0ProcTimeBoundedRangeOver. I will update with=C2=A0my test result a= nd fire a JIRA after that.


Best

Yan=C2=A0


From: Fabian Hueske <fhueske@gmail.com>
Sent: Wednesday, May 30, 2018 1:43:01 AM
To: Dawid Wysakowicz
Cc: user
Subject: Re: NPE in flink sql over-window
=C2=A0
Hi,

Dawid's analysis is certainly correct, but looking at the code this sho= uld not happen.

I have a few questions:
- You said this only happens if you configure idle state retention times, r= ight?
- Does the error occur the first time without a previous recovery?
- Did you run the same query on Flink 1.4.x without any problems?

Thanks, Fabian

2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz <dwysakowicz@apache.org>:

Hi Yan,


I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access = a list of elements that was already cleared and does not check against null= . Could you please file a JIRA for that?


Best,

Dawid


On 30/05/18 08:27, Yan Zhou [FDS Science] wrote= :

I also get=C2=A0warnning th= at=C2=A0CodeCache is full around that time. It's printed by JVM = and doesn't have timestamp. But I suspect that it's because so many failure recoveries from checkpo= int and the sql queries are dynamically compiled too many times.



Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compi= ler has been disabled.
Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code = cache size using -XX:ReservedCodeCacheSize=3D
CodeCache: size=3D245760Kb used=3D244114Kb max_used=3D244146Kb free= =3D1645Kb
bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]=
total_blobs=3D54308 nmethods=3D53551 adapters=3D617
compilation: disabled (not enough contiguous free space left)




From: Yan Zhou [FDS Science] <yzhou@coupang.com>
Sent: Tuesday, May 29, 2018 10:52:18 PM
To: user@flink.apache.org
Subject: NPE in flink sql over-window
=C2=A0

Hi,

I am using flink sql 1.5.0. My=C2=A0application=C2=A0throws=C2=A0NPE. And after it= recover from checkpoint automatically, it throws NPE immediately from same= line of code.=C2=A0


My application read message from = kafka, convert the datastream into a table,=C2=A0issue an Over-window aggre= gation and write the result into a sink. NPE throws from class=C2=A0<= span>ProcTimeBoundedRangeOver. Please see exception log at the bottom.


The exceptions always happe= ns after the application started for=C2=A0maxIdleStateRetentionTim= e time. =C2=A0What could be the possible causes?=C2=A0


Best

Yan


2018-05-27 11:03:37,656 INFO=C2=A0 org.apache.flink.runtime.taskmanager.Task=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0- over: (PARTITION BY: uid, ORDER BY: proctime, RANGEBETWE= EN 86400000 PRECEDI
NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS= w0$o0)) -> select:=C2=A0
(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -= > Filter -> Sink: Unnamed (3/15) (327
efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.=
TimerException{java.lang.NullPointerException}
=C2=A0 =C2=A0 =C2=A0 =C2=A0at org.apache.flink.streaming.runti= me.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProces= singTimeService.java:284)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at java.util.concurrent.Executors$R= unnableAdapter.call(Executors.java:511)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at java.util.concurrent.FutureTask.= run(FutureTask.java:266)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at java.util.concurrent.ScheduledTh= readPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPo= olExecutor.java:180)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at java.util.concurrent.ScheduledTh= readPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecu= tor.java:293)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at java.util.concurrent.ThreadPoolE= xecutor.runWorker(ThreadPoolExecutor.java:1142)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at java.util.concurrent.ThreadPoolE= xecutor$Worker.run(ThreadPoolExecutor.java:617)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at java.lang.Thread.run(Thread.java= :748)
Caused by: java.lang.NullPointerException
=C2=A0 =C2=A0 =C2=A0 =C2=A0at org.apache.flink.table.runtime.a= ggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedR= angeOver.scala:181)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at org.apache.flink.streaming.api.o= perators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeye= dProcessOperator.java:97)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at org.apache.flink.streaming.api.o= perators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedP= rocessOperator.java:81)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at org.apache.flink.streaming.api.o= perators.HeapInternalTimerService.onProcessingTime(HeapInternalTi= merService.java:266)
=C2=A0 =C2=A0 =C2=A0 =C2=A0at org.apache.flink.streaming.runti= me.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProces= singTimeService.java:281)







--000000000000827b2e056de220ae--