From user-return-25212-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Jan 8 16:29:20 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9C14A180652 for ; Tue, 8 Jan 2019 16:29:19 +0100 (CET) Received: (qmail 26841 invoked by uid 500); 8 Jan 2019 15:29:18 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 26831 invoked by uid 99); 8 Jan 2019 15:29:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jan 2019 15:29:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E71D0C0795 for ; Tue, 8 Jan 2019 15:29:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.146 X-Spam-Level: ** X-Spam-Status: No, score=2.146 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.143, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_FONT_FACE_BAD=0.289, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=customercentria-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id BxUA13Ze8Ueq for ; Tue, 8 Jan 2019 15:29:16 +0000 (UTC) Received: from mail-oi1-f175.google.com (mail-oi1-f175.google.com [209.85.167.175]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 24FF95F5AA for ; Tue, 8 Jan 2019 15:29:16 +0000 (UTC) Received: by mail-oi1-f175.google.com with SMTP id w13so3588991oiw.9 for ; Tue, 08 Jan 2019 07:29:16 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=customercentria-com.20150623.gappssmtp.com; s=20150623; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=zrbr3KX1Fil3w8DqsIuZG0eX4E6fgORlH5SQNzahDsk=; b=jshCaBfotpi1zBi0T+l6rHDkKaF8m0lmx7RM8x+wHAK2GPE9u1GVZP4jgBsj85qsna CROsORwzOi013rlg966ZHEUsarktMH6UdHGVYdCpnzHgKavULXiIryR3iT3LfdECRjsU Ob9VjsWnkx4438ZD5J0ylo0uOtdiBzih3+m57P6y0SkTlFduYcLs/asnLWGfY6nKCiYM 99F/j5GuYQze5qUAp4XbNeEP73incJ49T6jkHWLN659Joxa4GB3vEfPz5pHafFtNK+K4 L6D7Ic1dbKWcEiiDMOd0zdmaklWHnxOrfd8ADazqftBUBtE5U+66HnFbFpofhVlxzCNY 7DiA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=zrbr3KX1Fil3w8DqsIuZG0eX4E6fgORlH5SQNzahDsk=; b=WesSe0TeYlzgjAUhJ4ONOyUCTrR4/ZuqXCV+ZqyovaXD3lijqg8aAi6XEgi1h90DOp YUTxpyrU4RR3/bJAAUfj30wGR+TrHHuKLS3aCrrsuNVZ5aeF2j+PjcqtxM1OwM5I4BBL xtY2KT49ezgy9YbWmT+dUF00Zo0YELITbM2+uK2vWzci4T3Fc7du3fme/ZHmeHzqb5pw ruiDsCBdPkBGLnxIOoM7zbQ94tb9ErnLPCeKnqq4dC1w4Pe+nO+t4G8N5e+DUsKO++5m T5IobjGMhGsZhKW9gvvX42OKr8H1MVr+zcC0kc9l2hjMDeb1djTFN8wObltyrOU1sSR5 y1/A== X-Gm-Message-State: AJcUukczRfgzdcQzoVn3v2b1tI0eYq3/RqSz1og3MEHa7qf5Xbw/4rL0 y+pEr8NAMfkhr1bB/oRF+OJWWSxlDuriKtxa6oah X-Google-Smtp-Source: ALg8bN7BX5WaM9IpfdnlOHokA7SQcaF+AdK+r88jEf8AR5KdLKeusOhUKGf9Nl/gz5ZS/I0ctqE57xhUP9VTxjeUK9U= X-Received: by 2002:aca:ef87:: with SMTP id n129mr1338486oih.118.1546961355265; Tue, 08 Jan 2019 07:29:15 -0800 (PST) MIME-Version: 1.0 References: <0e1ea675-6bd8-a9c5-7046-957b8d2788d9@apache.org> In-Reply-To: From: Puneet Kinra Date: Tue, 8 Jan 2019 20:58:40 +0530 Message-ID: Subject: Re: onTimer function is not getting executed and job is marked as finished. To: Hequn Cheng Cc: Timo Walther , user Content-Type: multipart/alternative; boundary="0000000000000ff252057ef402d9" --0000000000000ff252057ef402d9 Content-Type: text/plain; charset="UTF-8" Sure, I will do that. On Tue, Jan 8, 2019 at 7:25 PM Hequn Cheng wrote: > Hi Puneet, > > Can you explain it in more detail? Do you mean the job is finished before > you call ctx.timeservice()? > Maybe you have to let your source running for a longer time. > > It's better to show us the whole pipeline of your job. For example, write > a sample code(or provide a git link) that can reproduce your problem easily. > > Best, Hequn > > > On Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra < > puneet.kinra@customercentria.com> wrote: > >> Hi hequan >> >> Weird behaviour when i m calling ctx.timeservice() function is getting >> exited even not throwing error >> >> On Tuesday, January 8, 2019, Hequn Cheng wrote: >> >>> Hi puneet, >>> >>> Could you print `parseLong + 5000` and >>> `ctx.timerService().currentProcessingTime()` out and check the value? >>> I know it is a streaming program. What I mean is the timer you have >>> registered is not within the interval of your job, so the timer has not >>> been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 = >>> 100000000000(very big). >>> >>> Best, Hequn >>> >>> >>> On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra < >>> puneet.kinra@customercentria.com> wrote: >>> >>>> I checked the same the function is getting exited when i am calling >>>> ctx.getTimeservice () function. >>>> >>>> On Mon, Jan 7, 2019 at 10:27 PM Timo Walther >>>> wrote: >>>> >>>>> Hi Puneet, >>>>> >>>>> maybe you can show or explain us a bit more about your pipeline. From >>>>> what I see your ProcessFunction looks correct. Are you sure the registering >>>>> takes place? >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> Am 07.01.19 um 14:15 schrieb Puneet Kinra: >>>>> >>>>> Hi Hequn >>>>> >>>>> Its a streaming job . >>>>> >>>>> On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng >>>>> wrote: >>>>> >>>>>> Hi Puneet, >>>>>> >>>>>> The value of the registered timer should within startTime and endTime >>>>>> of your job. For example, job starts at processing time t1 and stops at >>>>>> processing time t2. You have to make sure t1< `parseLong + 5000` < t2. >>>>>> >>>>>> Best, Hequn >>>>>> >>>>>> On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra < >>>>>> puneet.kinra@customercentria.com> wrote: >>>>>> >>>>>>> Hi All >>>>>>> >>>>>>> Facing some issue with context to onTimer method in processfunction >>>>>>> >>>>>>> class TimerTest extends >>>>>>> ProcessFunction,String>{ >>>>>>> >>>>>>> /** >>>>>>> * >>>>>>> */ >>>>>>> private static final long serialVersionUID = 1L; >>>>>>> >>>>>>> @Override >>>>>>> public void processElement(Tuple2 arg0, >>>>>>> ProcessFunction, String>.Context ctx, >>>>>>> Collector arg2) throws Exception { >>>>>>> // TODO Auto-generated method stub >>>>>>> long parseLong = Long.parseLong(arg0.f1); >>>>>>> TimerService timerService = ctx.timerService(); >>>>>>> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void onTimer(long timestamp, ProcessFunction>>>>>> String>, String>.OnTimerContext ctx, >>>>>>> Collector out) throws Exception { >>>>>>> // TODO Auto-generated method stub >>>>>>> super.onTimer(timestamp, ctx, out); >>>>>>> System.out.println("Executing timmer"+timestamp); >>>>>>> out.collect("Timer Testing.."); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> -- >>>>>>> *Cheers * >>>>>>> >>>>>>> *Puneet Kinra* >>>>>>> >>>>>>> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com >>>>>>> * >>>>>>> >>>>>>> *e-mail :puneet.kinra@customercentria.com >>>>>>> * >>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> *Cheers * >>>>> >>>>> *Puneet Kinra* >>>>> >>>>> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com >>>>> * >>>>> >>>>> *e-mail :puneet.kinra@customercentria.com >>>>> * >>>>> >>>>> >>>>> >>>>> >>>> >>>> -- >>>> *Cheers * >>>> >>>> *Puneet Kinra* >>>> >>>> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com >>>> * >>>> >>>> *e-mail :puneet.kinra@customercentria.com >>>> * >>>> >>>> >>>> >> >> -- >> *Cheers * >> >> *Puneet Kinra* >> >> *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com >> * >> >> *e-mail :puneet.kinra@customercentria.com >> * >> >> >> >> -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com * *e-mail :puneet.kinra@customercentria.com * --0000000000000ff252057ef402d9 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Sure, I will do that.=C2=A0

On Tue, Jan 8, 2019 at 7:25 PM Hequn Cheng <<= a href=3D"mailto:chenghequn@gmail.com">chenghequn@gmail.com> wrote:<= br>
Hi=C2= =A0Puneet,

Can you explain it in more detail? Do you mea= n the job is finished before you call ctx.timeservice()?=C2=A0
Ma= ybe you have to let your source running for a longer time.

It's better to show us the whole pipeline of your job. For=C2= =A0example, write a=C2=A0sample code(or provide a git link) that can reprod= uce your problem easily.

Best, Hequn

On Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra <puneet.kinra@customer= centria.com> wrote:
Hi hequan

Weird behaviour when i m calling ct= x.timeservice() function is getting exited even not throwing error

O= n Tuesday, January 8, 2019, Hequn Cheng <chenghequn@gmail.com> wrote:
=
Hi puneet,

Could you p= rint `parseLong + 5000` and `ctx.timerService().currentProcessingTime()` ou= t and check the value?
I know it is a streaming program. What I m= ean is the timer you have registered is not within the interval of your job= , so the timer has not been triggered. For=C2=A0example,=C2=A0parseLong + 5= 000 =3D 5000 or=C2=A0parseLong + 5000 =3D 100000000000(very big).

Best, Hequn

On Tue, Jan 8, 2019 at 1:38 AM= Puneet Kinra <puneet.kinra@customercentria.com> wrote:
I checked the= same the function is getting exited when i am calling ctx.getTimeservice (= ) function.

On Mon, Ja= n 7, 2019 at 10:27 PM Timo Walther <twalthr@apache.org> wrote:
=20 =20 =20
Hi Puneet,

maybe you can show or explain us a bit more about your pipeline. From what I see your ProcessFunction looks correct. Are you sure the registering takes place?

Regards,
Timo

Am 07.01.19 um 14:15 schrieb Puneet Kinra:
=20
Hi Hequn=C2=A0

Its a streaming job .=C2=A0

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <chenghequn@gmail.com= > wrote:
Hi Puneet,

The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time=C2=A0t1 and stops at processing time=C2=A0t2. You have to make sure t1< `parseLong + 500= 0` < t2.=C2=A0

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <puneet.kinra@customercentria.com> wrote:
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction<Tuple2<String,String>,Stri= ng>{

/**=
*=C2=A0
*/
pri= vate static final long serialVersionUID =3D 1L;

@Ov= erride
pub= lic void processElement(Tuple2<String, String> arg0,
P= rocessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {
// TODO Auto-generated method stub
lo= ng parseLong =3D Long.parseLong(arg0.f1);
Ti= merService timerService =3D ctx.timerService();
ct= x.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Ov= erride
pub= lic void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,
C= ollector<String> out) throws Exception {
// TODO Auto-generated method stub
su= per.onTimer(timestamp, ctx, out);
Sy= stem.out.println("Executing timmer"+timestamp);
ou= t.collect("Timer Testing..");
}
}

--
Cheers=C2=A0

Puneet Kinra



--
Cheer= s=C2=A0

Puneet Kinra

Mobile:+918800167808 | Skype : puneet.kinra@customercentria.com

e-mail :puneet.kinra@customercentria.com<= /p>





--
Cheers=C2=A0
=
Puneet K= inra

=

puneet.kinra@customercentria.com<= /b>

e-mail :puneet.kinra@customercentria.com

<= br>



--
Cheers=C2=A0

Puneet Kinra




--
Cheers=C2=A0

Puneet Kinra

--0000000000000ff252057ef402d9--