Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F23AA200BDB for ; Mon, 12 Dec 2016 18:02:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EBFD8160B22; Mon, 12 Dec 2016 17:02:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EA3A5160B1A for ; Mon, 12 Dec 2016 18:02:52 +0100 (CET) Received: (qmail 57755 invoked by uid 500); 12 Dec 2016 17:02:52 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 57742 invoked by uid 99); 12 Dec 2016 17:02:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Dec 2016 17:02:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 7E244C01AB for ; Mon, 12 Dec 2016 17:02:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.398 X-Spam-Level: ** X-Spam-Status: No, score=2.398 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 43rv1baXYbpc for ; Mon, 12 Dec 2016 17:02:50 +0000 (UTC) Received: from mail-vk0-f44.google.com (mail-vk0-f44.google.com [209.85.213.44]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 31FFF5F46F for ; Mon, 12 Dec 2016 17:02:50 +0000 (UTC) Received: by mail-vk0-f44.google.com with SMTP id w194so43769161vkw.2 for ; Mon, 12 Dec 2016 09:02:50 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=YE+/hvFOZfYeBRpX9wfr4oQ6IdYo3x0ZyHnSYU7+awQ=; b=ujEij6IjOWXsg2Fb+2sS0fhSmo6kO+bhC5LkW0RKIsgGpGsr47+gPObELdMmxAB11w OXg9KRwDlMHMrwW/JBesbE9IALVwHHOsF4doFQlDNDGmTQJyf8h4KmDv9UVp/hF++9vI F6gw8CynNFG9uO/SAl0iIEAccJeUnUdg2v8MQKeYfdij6zdnZdNlj1t5MFEVFFU4l3n9 MYl5zAAClcJkEia538Y5Dd8JLoGmzcLgsTifhBdlIB+rfH4SuEmpqWa1RMbSlF8I7A+m 8sVYT7ard0MFiT39DDyIcZIpmglBWm074FNEHtXMSRZPIr4pgfndmS7eoJu+MWtkW352 yLVw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=YE+/hvFOZfYeBRpX9wfr4oQ6IdYo3x0ZyHnSYU7+awQ=; b=SLBNUJO1lcqLRmfUvtuioUPR7pvJcx9d2/fl1pixvZHwSTqcDwhZBRokYfUCV+4rVx NBEzSuNq6GUeX/MRmywTf936ad6aHfqjaH5teCzUY5sxnPKGhvYrkRgKsBbLx8X8WC/X qzO+LxL2zXtfrd0/vBjuPj3BoD6hjmC+ox/kTupimjte4HFUgFVFo3gONILVRg5fhzMj Cvc7tLORDjUOq1MWaN/zfN5TgioICCcRo+uEHYDP2c7g+lsaGpKpAqcY8xX1kLKZaCe2 3qGI8yfSn+opZKlnypwaox4E7Jg6WrqAgWUWOQpQBjPFQo0Msaa0Ac/k4qqU8cP9P8ud ox8A== X-Gm-Message-State: AKaTC013RLZPeO4pig0JjFZl/RlnSng723/CNMFHrr6e0SQnxRR+VYR95jcGbpMKxOpngXvKd/ODXz5j0FVsAA== X-Received: by 10.31.86.132 with SMTP id k126mr36749837vkb.8.1481562162012; Mon, 12 Dec 2016 09:02:42 -0800 (PST) MIME-Version: 1.0 Received: by 10.103.116.203 with HTTP; Mon, 12 Dec 2016 09:02:41 -0800 (PST) In-Reply-To: References: <584EB640.9040402@apache.org> From: Matt Date: Mon, 12 Dec 2016 14:02:41 -0300 Message-ID: Subject: Re: Incremental aggregations - Example not working To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114e63c2612997054379120e archived-at: Mon, 12 Dec 2016 17:02:54 -0000 --001a114e63c2612997054379120e Content-Type: text/plain; charset=UTF-8 I just checked with version 1.1.3 and it works fine, the problem is that in that version we can't use Kafka 0.10 if I'm not work. Thank you for the workaround! Best, Matt On Mon, Dec 12, 2016 at 1:52 PM, Yassine MARZOUGUI < y.marzougui@mindlytix.com> wrote: > Yes, it was suppoed to work. I looked into this, and as Chesnay said, > this is a bug in the fold function. I opened an issue in JIRA : > https://issues.apache.org/jira/browse/FLINK-5320, and will fix it very > soon, thank you for reporting it. > In the mean time you can workaround the problem by specifying the > TypeInformation along with the fold function as follows : fold(ACC, > FoldFunction, WindowFunction, foldAccumulatorType, resultType). In the > example, the foldAccumulatorType is new TupleTypeInfo Long, Integer>>(), and the resultType is also new > TupleTypeInfo>(). > > Best, > Yassine > > 2016-12-12 16:38 GMT+01:00 Matt : > >> I'm using 1.2-SNAPSHOT, should it work in that version? >> >> On Mon, Dec 12, 2016 at 12:18 PM, Yassine MARZOUGUI < >> y.marzougui@mindlytix.com> wrote: >> >>> Hi Matt, >>> >>> What version of Flink are you using? >>> The incremental agregation with fold(ACC, FoldFunction, WindowFunction) >>> in a new change that will be part of Flink 1.2, for Flink 1.1 the correct >>> way to perform incrementation aggregations is : apply(ACC, >>> FoldFunction, WindowFunction) (see the docs for 1.1 [1]) >>> >>> [1] : https://ci.apache.org/projects/flink/flink-docs-release-1. >>> 1/apis/streaming/windows.html#windowfunction-with-incrementa >>> l-aggregation >>> >>> Best, >>> Yassine >>> >>> 2016-12-12 15:37 GMT+01:00 Chesnay Schepler : >>> >>>> Hello Matt, >>>> >>>> This looks like a bug in the fold() function to me. >>>> >>>> I'm adding Timo to the discussion, he can probably shed some light on >>>> this. >>>> >>>> Regards, >>>> Chesnay >>>> >>>> >>>> On 12.12.2016 15:13, Matt wrote: >>>> >>>> In case this is important, if I remove the WindowFunction, and only use >>>> the FoldFunction it works fine. >>>> >>>> I don't see what is wrong... >>>> >>>> On Mon, Dec 12, 2016 at 10:53 AM, Matt wrote: >>>> >>>>> Hi, >>>>> >>>>> I'm following the documentation [1] of window functions with >>>>> incremental aggregations, but I'm getting an "input mismatch" error. >>>>> >>>>> The code [2] is almost identical to the one in the documentation, at >>>>> the bottom you can find the exact error. >>>>> >>>>> What am I missing? Can you provide a working example of a fold >>>>> function with both a FoldFunction and a WindowFunction? >>>>> >>>>> Regards, >>>>> Matt >>>>> >>>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/w >>>>> indows.html#windowfunction-with-incremental-aggregation >>>>> >>>>> [2] https://gist.github.com/cc7ed5570e4ce30c3a482ab835e3983d >>>>> >>>> >>>> >>>> >>> >> > --001a114e63c2612997054379120e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I just checked with version 1.1.3 and it works fine, the p= roblem is that in that version we can't use Kafka 0.10 if I'm not w= ork. Thank you for the workaround!

Best,
Matt
=

On Mon, Dec 12, 2= 016 at 1:52 PM, Yassine MARZOUGUI <y.marzougui@mindlytix.com&g= t; wrote:
Yes, it was suppoed to work. I looked into this= , and as=C2=A0Chesnay said, this is a bug = in the fold function. I opened an issue in JIRA :=C2=A0https://issues.a= pache.org/jira/browse/FLINK-5320, and will fix it very soon, thank you for reporting it.
I= n the mean time you can workaround the problem by specifying the TypeInform= ation along with the fold function as follows : fold(ACC, FoldFunction, WindowFunction,=C2=A0foldAccum= ulatorType, resultType).=C2=A0= In the example, the=C2=A0foldAccumulatorType is new TupleTypeInfo<Tuple3<String, Long, Integer>>= ;(), and the resultType is also=C2=A0new TupleTypeInfo<Tuple3<String, Long, Integer>>()<= /font>.

=
Be= st,
Yassine

2016-12-12 16:38 GMT+01:00 Matt <dromitlabs@gmail.com>:=
I'm using 1.2-SNAPS= HOT, should it work in that version?

On Mon, Dec 12, 2016 at 12:18 PM, Yassine M= ARZOUGUI <y.marzougui@mindlytix.com> wrote:
Hi Matt,

=
What version of Flink are you using= ?
The incremental agreg= ation with fold(ACC, FoldFunctio= n, WindowFunction) in a new chang= e that will be part of Flink 1.2, for Flink 1.1 the correct way to perform = incrementation aggregations is=C2= =A0:=C2=A0apply(ACC,= FoldFunction, WindowFunction)=C2= =A0(see the docs for 1.1=C2=A0[1])


Best,
Yassine
=

2016-12-12 15:37 = GMT+01:00 Chesnay Schepler <chesnay@apache.org>:
=20 =20 =20
Hello Matt,

This looks like a bug in the fold() function to me.

I'm adding Timo to the discussion, he can probably shed some ligh= t on this.

Regards,
Chesnay


On 12.12.2016 15:13, Matt wrote:
In case this is important, if I remove the WindowFunction, and only use the FoldFunction it works fine.

I don't see what is wrong...

On Mon, Dec 12, 2016 at 10:53 AM, Matt <dromitlabs@gmail.com> wrote:
Hi,

I'm following the documentation [1] of window functions with incremental aggregations, but I'm gettin= g an "input mismatch" error.

The code=C2=A0[2]=C2=A0is almost identical to the one in= the documentation, at the bottom you can find the exact error.

What am I missing? Can you provide a working example of a fold function with both a FoldFunction and a WindowFunction?

Regards,
Matt


[2]
https://gist.github.com/cc7ed5570e4ce30c= 3a482ab835e3983d






--001a114e63c2612997054379120e--