Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 29956200D2F for ; Wed, 18 Oct 2017 05:23:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 280991609EC; Wed, 18 Oct 2017 03:23:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4301C1609EB for ; Wed, 18 Oct 2017 05:23:36 +0200 (CEST) Received: (qmail 19245 invoked by uid 500); 18 Oct 2017 03:23:35 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 19235 invoked by uid 99); 18 Oct 2017 03:23:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Oct 2017 03:23:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 356AC18056F for ; Wed, 18 Oct 2017 03:23:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.821 X-Spam-Level: X-Spam-Status: No, score=-0.821 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=icloud.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id tHRXvcs1tK2d for ; Wed, 18 Oct 2017 03:23:32 +0000 (UTC) Received: from pv38p41im-ztdg02061201.me.com (pv38p41im-ztdg02061201.me.com [17.133.179.23]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id D6EA95F477 for ; Wed, 18 Oct 2017 03:23:31 +0000 (UTC) Received: from process-dkim-sign-daemon.pv38p41im-ztdg02061201.me.com by pv38p41im-ztdg02061201.me.com (Oracle Communications Messaging Server 8.0.1.2.20170607 64bit (built Jun 7 2017)) id <0OY000H0012ZB200@pv38p41im-ztdg02061201.me.com> for user@flink.apache.org; Wed, 18 Oct 2017 03:23:24 +0000 (GMT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=icloud.com; s=04042017; t=1508297004; bh=JGtBb3kyMxFmLsndctHVcQQJ8MUy6eaBg5E2ftMjXMs=; h=From:Content-type:MIME-version:Subject:Date:To:Message-id; b=OuLIIoXwkuHDnEEzBZRFr9kJWjLkA5ZouYlRx6Rgt829MPdt1pA9Suom3i/dXU87F HueMWNxyOBgqVDXCk1bFGYQl1TTZDKSb0EFxjl8RUl+z4eDXfAC3EAkKXReTneKhl9 Hlr/XpD3VpAh1ZGpjqcrxHMi6Tsju1zfzWn2t88Bd+pCCg0gd3WsJZBR5Ev/G8b42V zZkmTAq97GXA4hw4rqw7Ek1OejC69Zw8iMCuyLfG5iL66sHjjM6fZb/UpZO0XXKP8e poKiMCGp4oegk7LT7HEshnCO1PCgVowk8a72uhMGC1dUp3ofCKh9zgJGHOVoLTmpAw DTvy0UyytFGng== Received: from icloud.com ([127.0.0.1]) by pv38p41im-ztdg02061201.me.com (Oracle Communications Messaging Server 8.0.1.2.20170607 64bit (built Jun 7 2017)) with ESMTPSA id <0OY000C191EZ7W10@pv38p41im-ztdg02061201.me.com> for user@flink.apache.org; Wed, 18 Oct 2017 03:23:24 +0000 (GMT) X-Proofpoint-Virus-Version: vendor=fsecure engine=2.50.10432:,, definitions=2017-10-17_15:,, signatures=0 X-Proofpoint-Spam-Details: rule=notspam policy=default score=0 spamscore=0 clxscore=1015 suspectscore=1 malwarescore=0 phishscore=0 adultscore=0 bulkscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.0.1-1707230000 definitions=main-1710180049 From: Fritz Budiyanto Content-type: text/plain; charset=us-ascii Content-transfer-encoding: quoted-printable MIME-version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: Parallelism, registerEventTimeTimer and watermark problem Date: Tue, 17 Oct 2017 20:23:25 -0700 References: <6B7E471D-6AF3-4DAF-8F54-557BBA754E5B@icloud.com> To: user@flink.apache.org In-reply-to: <6B7E471D-6AF3-4DAF-8F54-557BBA754E5B@icloud.com> Message-id: <67047C59-4D8A-4E39-9345-1D63AD924A34@icloud.com> X-Mailer: Apple Mail (2.3273) archived-at: Wed, 18 Oct 2017 03:23:37 -0000 Sorry, missing copy paste for the exception thrown: 10/17/2017 20:21:30 dropDetection -> (aggFlowDropDetectPrintln -> = Sink: Unnamed, aggFlowDropDetectPrintln -> Sink: Unnamed, Sink: = kafkaSink)(3/4) switched to CANCELED=20 20:21:30,244 INFO = org.apache.flink.runtime.executiongraph.ExecutionGraph - Job = Aggregate flows (313a46d5fd23e4c2d0d00d0033950b6d) switched from state = FAILING to FAILED. java.lang.NullPointerException: Keyed state can only be used on a 'keyed = stream', i.e., after a 'keyBy()' operation. at = org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at = org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPrec= onditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:151) at = org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(= StreamingRuntimeContext.java:115) at = FlowContractStitcherProcess.endState$lzycompute(FlowContractResolver.scala= :30) at = FlowContractStitcherProcess.endState(FlowContractResolver.scala:30) at = FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96) at = FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17) at = org.apache.flink.streaming.api.operators.ProcessOperator.processElement(Pr= ocessOperator.java:66) at = org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutp= ut.pushToOperator(OperatorChain.java:528) at = org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutp= ut.collect(OperatorChain.java:503) at = org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutp= ut.collect(OperatorChain.java:483) at = org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOu= tput.collect(AbstractStreamOperator.java:891) at = org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOu= tput.collect(AbstractStreamOperator.java:869) at = org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermar= ksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)= at = org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(St= reamInputProcessor.java:206) at = org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputSt= reamTask.java:69) at = org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java= :263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) -- Fritz > On Oct 17, 2017, at 7:55 PM, Fritz Budiyanto = wrote: >=20 > Hi All, >=20 > If I have high parallelism and use processFunction to = registerEventTimeTimer, the timer never gets fired. > After debugging, I found out the watermark isn't updated because I = have keyBy right after assignTimestampsAndWatermarks. > And if I set assignTimestampsAndWatermarks right after the keyBy, an = exception is thrown. >=20 > val contractFlow =3D enrichedFlow > .keyBy(f =3D> f.fiveTupleKey) > .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) = <<<<< > .process(new FlowContractStitcherProcess) > .name("contractStitcher") >=20 > at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30) > at = FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96) > at = FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17) > at = org.apache.flink.streaming.api.operators.ProcessOperator.processElement(Pr= ocessOperator.java:66) > at = org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutp= ut.pushToOperator(OperatorChain.java:528) > at = org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutp= ut.collect(OperatorChain.java:503) > at = org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutp= ut.collect(OperatorChain.java:483) > at = org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOu= tput.collect(AbstractStreamOperator.java:891) > at = org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOu= tput.collect(AbstractStreamOperator.java:869) > at = org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermar= ksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)= > at = org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(St= reamInputProcessor.java:206) > at = org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputSt= reamTask.java:69) > at = org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java= :263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) >=20 >=20 > Any idea how to solve my problem ? How do I update the watermark after = keyBy ? >=20 > Would I hit scaling issue if on large number of timer if I use = registerProcessingTimeTimer instead ? I'm using event time throughout = the pipeline, would mixing processing timer with event time might cause = problem down the line ? >=20 > -- > Fritz