From user-return-20235-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed May 30 07:52:33 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 13E6018063B for ; Wed, 30 May 2018 07:52:32 +0200 (CEST) Received: (qmail 33000 invoked by uid 500); 30 May 2018 05:52:31 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 32990 invoked by uid 99); 30 May 2018 05:52:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 May 2018 05:52:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 683EECD790 for ; Wed, 30 May 2018 05:52:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.998 X-Spam-Level: * X-Spam-Status: No, score=1.998 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, SPF_HELO_PASS=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Vpi85cnYxdC7 for ; Wed, 30 May 2018 05:52:28 +0000 (UTC) Received: from KOR01-SL2-obe.outbound.protection.outlook.com (mail-oln040107225060.outbound.protection.outlook.com [40.107.225.60]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 07D645F418 for ; Wed, 30 May 2018 05:52:27 +0000 (UTC) Received: from SL2P216MB0475.KORP216.PROD.OUTLOOK.COM (10.174.47.151) by SL2P216MB0395.KORP216.PROD.OUTLOOK.COM (10.174.47.20) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P256) id 15.20.797.11; Wed, 30 May 2018 05:52:18 +0000 Received: from SL2P216MB0475.KORP216.PROD.OUTLOOK.COM ([fe80::e8d4:e028:a203:171e]) by SL2P216MB0475.KORP216.PROD.OUTLOOK.COM ([fe80::e8d4:e028:a203:171e%18]) with mapi id 15.20.0797.018; Wed, 30 May 2018 05:52:19 +0000 From: "Yan Zhou [FDS Science]" To: "user@flink.apache.org" Subject: NPE in flink sql over-window Thread-Topic: NPE in flink sql over-window Thread-Index: AQHT99iDEmc2/tAFMEWhCNTmEpmFMw== Date: Wed, 30 May 2018 05:52:18 +0000 Message-ID: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: authentication-results: spf=none (sender IP is ) smtp.mailfrom=yzhou@coupang.com; x-originating-ip: [71.202.105.137] x-ms-publictraffictype: Email x-microsoft-exchange-diagnostics: 1;SL2P216MB0395;7:T5Tw0BEmFgZIzBy8ZS5WqhejNdth1IvURfqEv844xXW+zuv5N692OG767g8BHoVho7bUDHFiIMzK/JINxAHQvd0LPa4xI7/stU0m1dWu2DNBQri3UWO7+nZrONVgsQ+yz8QnOz2jB/z3IGGq5nIWHiqton67nIUfx85fvtoRR0BNunSaJ9q8WL75VGagg0wcup7Z9JZnqKyQVYnEj1VljNI5JBP3Ii1QEDCmaP8CWCb7Bib2JMpkrTiYaso5LsAU x-ms-exchange-antispam-srfa-diagnostics: SOS; x-microsoft-antispam: UriScan:;BCL:0;PCL:0;RULEID:(7020095)(4652020)(5600026)(4534165)(4627221)(201703031133081)(201702281549075)(2017052603328)(7153060)(7193020);SRVR:SL2P216MB0395; x-ms-traffictypediagnostic: SL2P216MB0395: x-microsoft-antispam-prvs: x-exchange-antispam-report-test: UriScan:; x-ms-exchange-senderadcheck: 1 x-exchange-antispam-report-cfa-test: BCL:0;PCL:0;RULEID:(8211001083)(6040522)(2401047)(5005006)(8121501046)(10201501046)(93006095)(93001095)(3231254)(944501410)(52105095)(3002001)(149027)(150027)(6041310)(20161123560045)(20161123558120)(20161123564045)(201703131423095)(201702281528075)(20161123555045)(201703061421075)(201703061406153)(20161123562045)(6072148)(201708071742011)(7699016);SRVR:SL2P216MB0395;BCL:0;PCL:0;RULEID:;SRVR:SL2P216MB0395; x-forefront-prvs: 0688BF9B46 x-forefront-antispam-report: SFV:NSPM;SFS:(10009020)(39840400004)(396003)(376002)(346002)(366004)(39380400002)(189003)(199004)(25786009)(3660700001)(2351001)(6916009)(86362001)(476003)(19627405001)(26005)(5250100002)(5640700003)(102836004)(186003)(66066001)(2501003)(6606003)(486006)(68736007)(53936002)(2900100001)(99286004)(54896002)(74316002)(6436002)(9686003)(7736002)(97736004)(3846002)(6116002)(55016002)(5660300001)(478600001)(7696005)(105586002)(81156014)(81166006)(1730700003)(33656002)(8676002)(106356001)(316002)(2906002)(4743002)(14454004)(6506007)(3280700002)(8936002)(59450400001);DIR:OUT;SFP:1101;SCL:1;SRVR:SL2P216MB0395;H:SL2P216MB0475.KORP216.PROD.OUTLOOK.COM;FPR:;SPF:None;LANG:en;PTR:InfoNoRecords;MX:1;A:1; received-spf: None (protection.outlook.com: coupang.com does not designate permitted sender hosts) x-microsoft-antispam-message-info: 5OGbGzxxNePpBkJb5IRzB6NkDEAS4iaizRLvPNCeSdGl8xl0VHapsaFr6iecYnL9vhxatgoEuMWIdhDaaE4KQvfNJ4BQFvTZtWAt9OlgU/N21+xT71CLVjpP6RHzefDJpPVWDA0RIWaP2v57buq0n3tQsiR/HsW7zI0A3F/uHizb9BNqHgsb/E2JFVSQ+gYb spamdiagnosticoutput: 1:99 spamdiagnosticmetadata: NSPM Content-Type: multipart/alternative; boundary="_000_SL2P216MB0475E45D707B3CD4DDCA5C0CA56C0SL2P216MB0475KORP_" MIME-Version: 1.0 X-MS-Office365-Filtering-Correlation-Id: 12d2aa46-e6ca-4eb8-2885-08d5c5f18151 X-OriginatorOrg: coupang.com X-MS-Exchange-CrossTenant-Network-Message-Id: 12d2aa46-e6ca-4eb8-2885-08d5c5f18151 X-MS-Exchange-CrossTenant-originalarrivaltime: 30 May 2018 05:52:18.9850 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: e3098f96-361b-47c6-a9f4-ab7bafcaffe9 X-MS-Exchange-Transport-CrossTenantHeadersStamped: SL2P216MB0395 --_000_SL2P216MB0475E45D707B3CD4DDCA5C0CA56C0SL2P216MB0475KORP_ Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Hi, I am using flink sql 1.5.0. My application throws NPE. And after it recover= from checkpoint automatically, it throws NPE immediately from same line of= code. My application read message from kafka, convert the datastream into a table= , issue an Over-window aggregation and write the result into a sink. NPE th= rows from class ProcTimeBoundedRangeOver. Please see exception log at the b= ottom. The exceptions always happens after the application started for maxIdleStat= eRetentionTime time. What could be the possible causes? Best Yan 2018-05-27 11:03:37,656 INFO org.apache.flink.runtime.taskmanager.Task = - over: (PARTITION BY: uid, ORDER BY: proctime, RANGEBETWE= EN 86400000 PRECEDI NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS w0$o0))= -> select: (id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter -> Si= nk: Unnamed (3/15) (327 efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED. TimerException{java.lang.NullPointerException} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServ= ice$TriggerTask.run(SystemProcessingTimeService.java:284) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.jav= a:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureT= ask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureT= ask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu= tor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExec= utor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOver= WithLog.onTimer(ProcTimeBoundedRangeOver.scala:181) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperat= or.invokeUserFunction(LegacyKeyedProcessOperator.java:97) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperat= or.onProcessingTime(LegacyKeyedProcessOperator.java:81) at org.apache.flink.streaming.api.operators.HeapInternalTimerService= .onProcessingTime(HeapInternalTimerService.java:266) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServ= ice$TriggerTask.run(SystemProcessingTimeService.java:281) --_000_SL2P216MB0475E45D707B3CD4DDCA5C0CA56C0SL2P216MB0475KORP_ Content-Type: text/html; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable

Hi,

I am using flink sql 1.5.0. My application throws NPE. And after it= recover from checkpoint automatically, it throws NPE immediately from same line of code= . 


My application read message from= kafka, convert the datastream into a table, issue an Over-window aggr= egation and write the result into a sink. NPE throws from class = ProcTimeBoundedRangeOver. Please see exception log at the bottom.


The exceptions always happ= ens after the application started for maxIdleStateRetentionTi= me time.  What could be the possible causes? 


Best

Yan


2018-05-27 11:03:37,656 INFO  org.apache.flink.runtime.taskman= ager.Task                  &nb= sp;  - over: (PARTITION BY: uid, ORDER BY: proctime, RANGEBETWEEN 8640= 0000 PRECEDI
NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS= w0$o0)) -> select: 
(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -= > Filter -> Sink: Unnamed (3/15) (327
efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.=
TimerException{java.lang.NullPointerException}
       at org.apache.flink.streaming.runtime.ta= sks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService= .java:284)
       at java.util.concurrent.Executors$Runnab= leAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.run(F= utureTask.java:266)
       at java.util.concurrent.ScheduledThreadP= oolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java= :180)
       at java.util.concurrent.ScheduledThreadP= oolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at java.util.concurrent.ThreadPoolExecut= or.runWorker(ThreadPoolExecutor.java:1142)
       at java.util.concurrent.ThreadPoolExecut= or$Worker.run(ThreadPoolExecutor.java:617)
       at java.lang.Thread.run(Thread.java:748)=
Caused by: java.lang.NullPointerException
       at org.apache.flink.table.runtime.aggreg= ate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:= 181)
       at org.apache.flink.streaming.api.operat= ors.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperato= r.java:97)
       at org.apache.flink.streaming.api.operat= ors.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.= java:81)
       at org.apache.flink.streaming.api.operat= ors.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java= :266)
       at org.apache.flink.streaming.runtime.ta= sks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService= .java:281)



--_000_SL2P216MB0475E45D707B3CD4DDCA5C0CA56C0SL2P216MB0475KORP_--