Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D7DA200BE3 for ; Thu, 22 Dec 2016 22:17:20 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3C10A160B26; Thu, 22 Dec 2016 21:17:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8495F160B1B for ; Thu, 22 Dec 2016 22:17:19 +0100 (CET) Received: (qmail 75463 invoked by uid 500); 22 Dec 2016 21:17:18 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 75453 invoked by uid 99); 22 Dec 2016 21:17:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Dec 2016 21:17:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id EDD78C0146 for ; Thu, 22 Dec 2016 21:17:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.898 X-Spam-Level: * X-Spam-Status: No, score=1.898 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id vrzfz4pb98nr for ; Thu, 22 Dec 2016 21:17:17 +0000 (UTC) Received: from mail-vk0-f47.google.com (mail-vk0-f47.google.com [209.85.213.47]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id C154E5F5F8 for ; Thu, 22 Dec 2016 21:17:16 +0000 (UTC) Received: by mail-vk0-f47.google.com with SMTP id y197so21571890vky.2 for ; Thu, 22 Dec 2016 13:17:16 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=aR2/jyoC1n1gyVY3sSvpdWfIfSpQE3162fiXv8sM64c=; b=G6CcBme5P4wdBIuOalqMHveJPn3SNEt6lzJrKoHQK1Bs1UEiXh2fMjiCT/3TW5QU4x +o4s/Y4Nk88dWlo7ug99sNPHtnWlxTPA0iqKx4YJF3wczBmqqs4tJYjdYqNS54IJXYUu ap+I6jpCsojTj317+WQ31eKF5IkPdgAFb6cR45xpzq/XdmDtt9g+u9vwKTHSpZqWUHyV 7h0izxnRXQgcvE68ixt0qySfqVTt1e2LCddKlQWM1vBwyBfMpgnkejNHOVbOLVBmkhJ9 uM0vEibC1v6ggOJ4T2szWzIRaQRkCum277oY0hs74poBr646Chj59HMuRFyjvLcsKIKf mfSg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=aR2/jyoC1n1gyVY3sSvpdWfIfSpQE3162fiXv8sM64c=; b=ofjL8/+lJOTgOfxXo4PLzC2VjeOwwPkVPeZppkL35vFo6yi828POW371F6ljfToEe7 IUBxy/yxWi3z43Y8IJ2xrFKO9+NuAllNicfQ6lr2GXYa9jKA2pV3PIXhwoUjdgGP0pk+ E8jIEp4jb524N0oaRcNh2QteXdK2rGfXGogdoeYQgeGbPEsD+VQehGgAFji/YwA1OpvI KFhBDIqrGEuN0Ij1jc+2pm1ElaYatdisNhLeuQ7BRP+bSFxzwQI2rGdd+yDr8d45p5uG +8/+oopzSVOFixlTvTPvJwAVmwgnEhRxv0fWiGrok68C0sejQ0lbibYUwPJoaIdaZyE+ KZSQ== X-Gm-Message-State: AIkVDXK6WgneNU7/o3Qt4s6DwE7wUu3IBBOQ/wNFgsTDwQ/8Lrw5NBoGzuIYnn9HEitswthY0yT+vPTR4ozm8Q== X-Received: by 10.31.86.132 with SMTP id k126mr4634900vkb.8.1482441429099; Thu, 22 Dec 2016 13:17:09 -0800 (PST) MIME-Version: 1.0 Received: by 10.103.116.203 with HTTP; Thu, 22 Dec 2016 13:17:08 -0800 (PST) In-Reply-To: References: From: Matt Date: Thu, 22 Dec 2016 18:17:08 -0300 Message-ID: Subject: Re: Caching collected objects in .apply() To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114e63c2c831ac054445cabe archived-at: Thu, 22 Dec 2016 21:17:20 -0000 --001a114e63c2c831ac054445cabe Content-Type: text/plain; charset=UTF-8 Just to be clear, the stream is of String elements. The first part of the pipeline (up to the first .apply) receives those strings, and returns objects of another class ("A" let's say). On Thu, Dec 22, 2016 at 6:04 PM, Matt wrote: > Hello, > > I have a window processing 10 objects at a time, and creating 1 as a > result. The problem is in order to create that object I need the object > from the previous window. > > I'm doing this: > > stream > .keyBy(...some key...) > .countWindow(10, 1) > .apply(...creates an element A...) > .keyBy(...same key as above...) > .countWindow(2, 1) > .apply(...updates A with the value of the previous element A...) > .addSink(...) > > Probably there is a way to retrieve the last collected object inside the > first .apply(), or to cache it somehow. > > Is there a better way to achieve the same? How inefficient is this? > > Regards, > Matt > --001a114e63c2c831ac054445cabe Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Just to be clear, the stream is of String elements. The fi= rst part of the pipeline (up to the first .apply) receives those strings, a= nd returns objects of another class ("A" let's say).

On Thu, Dec 22, 2016= at 6:04 PM, Matt <dromitlabs@gmail.com> wrote:
Hello,

I have = a window processing 10 objects at a time, and creating 1 as a result. The p= roblem is in order to create that object I need the object from the previou= s window.

I'm doing this:

=
stream
=C2=A0 .keyBy(...some key...)
=C2=A0 .c= ountWindow(10, 1)
=C2=A0=C2=A0.apply(...creates an element A...)<= /div>
=C2=A0=C2=A0.keyBy(...same key as above...)
=C2=A0=C2= =A0.countWindow(2, 1)
=C2=A0=C2=A0.apply(...updates A with the va= lue of the previous element A...)
=C2=A0 .addSink(...)
=
Probably there is a way to retrieve the last collected objec= t inside the first .apply(), or to cache it somehow.

Is there a better way to achieve the same? How inefficient is this?<= /div>

Regards,
Matt

--001a114e63c2c831ac054445cabe--