Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E36A018F2C for ; Tue, 16 Feb 2016 20:40:15 +0000 (UTC) Received: (qmail 83716 invoked by uid 500); 16 Feb 2016 20:30:17 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 83656 invoked by uid 500); 16 Feb 2016 20:30:17 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 83642 invoked by uid 99); 16 Feb 2016 20:30:16 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Feb 2016 20:30:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 5FEFAC1927 for ; Tue, 16 Feb 2016 20:15:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 9ZAz5kJ-IQDH for ; Tue, 16 Feb 2016 20:15:48 +0000 (UTC) Received: from mail-vk0-f41.google.com (mail-vk0-f41.google.com [209.85.213.41]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id D3FDE5FB00 for ; Tue, 16 Feb 2016 20:15:47 +0000 (UTC) Received: by mail-vk0-f41.google.com with SMTP id e185so143943760vkb.1 for ; Tue, 16 Feb 2016 12:15:47 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=JH+9W/0RbMqUmXHXyewb5s/tSqKKfH9d0e7qOQlT/tU=; b=PGCfXSJnxe3ehvovDIFddUpnPzGZjyb7xGn++RogXZxBfGZS5/kGamk1dD1BnVjros hk3fyMygNIZ7M9SO5PPj4XCgXUHyls8AEDt254UMM5X6U7LGP2nSG+6ctCsxQt7jQ3kI GHI+zkaxQAPfHkTS2w1sCIY+9uXPPveD5FLAHabUg1UkdjjJgbwhx1ywODZAX1dQc/8a U8q5rtZxkabwcfja+JhtHbQWOVEvFiG+jqiGNx+7wS+T48VL+gVjuW/Q3bUlDDHDbeam aAgz4klsgSr0+qmHPC9zqvyyflu9wOIxCxnrRGFjWlFm5cEfljUeq7T2iSjE/dg3AzwN UHsw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=JH+9W/0RbMqUmXHXyewb5s/tSqKKfH9d0e7qOQlT/tU=; b=H4mVb9B+A11LsUibuk+/ZL3nr+NTFMff8q9kdXs95430PvPyQHXrEcf8B327bYPch5 v14fFgVhIFpNClMdyzh8EbHXpASt89iJXHSzbuyk97MkK4CWizPv/HK7L2aHfR4dN/Wi URERVSvxTvdOTP5gijntMsGHRFSTU9+ucgqSeF+We4CY7w9TKF+IaFAcm97La/RRYolT lVRM3JoicVAXDpVv6f9djI7d39N1IAclSw/bdbSqm0RyzM9ABl9jx57nhttFO2F/H+zy WEOHL9C97E2k9OHMH0te6Z1toPd0P/MNks7+5a8W7I5fwX0gDdKZRET3tCA5t/bJzAL6 4iAA== X-Gm-Message-State: AG10YOSKKidvb6Aq/vszIOD0SCS7VR7PP4Ns7y1ytuG2EUJVpxiCx2bBq0xymtdLgBshXAW3fTAq7bfpLhVpMA== MIME-Version: 1.0 X-Received: by 10.31.157.67 with SMTP id g64mr18823736vke.79.1455653747260; Tue, 16 Feb 2016 12:15:47 -0800 (PST) Received: by 10.31.177.14 with HTTP; Tue, 16 Feb 2016 12:15:47 -0800 (PST) In-Reply-To: References: Date: Tue, 16 Feb 2016 15:15:47 -0500 Message-ID: Subject: Re: Read once input data? From: Saliya Ekanayake To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11414f78858449052be8cc91 --001a11414f78858449052be8cc91 Content-Type: text/plain; charset=UTF-8 I looked at the samples and I think what you meant is clear, but I didn't find a solution for my need. In my case, I want to use the result from first map operation before I can apply the second map on the *same* data set. For simplicity, let's say I've a bunch of short values represented as my data set. Then I need to find their average, so I use a map and reduce. Then I want to map these short values with another function, but it needs that average computed in the beginning to work correctly. Is this possible without doing multiple reads of the input data to create the same dataset? Thank you, saliya On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske wrote: > Yes, if you implement both maps in a single job, data is read once. > > 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake : > >> Fabian, >> >> I've a quick follow-up question on what you suggested. When streaming the >> same data through different maps, were you implying that everything goes as >> single job in Flink, so data read happens only once? >> >> Thanks, >> Saliya >> >> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske wrote: >> >>> It is not possible to "pin" data sets in memory, yet. >>> However, you can stream the same data set through two different mappers >>> at the same time. >>> >>> For instance you can have a job like: >>> >>> /---> Map 1 --> SInk1 >>> Source --< >>> \---> Map 2 --> SInk2 >>> >>> and execute it at once. >>> For that you define you data flow and call execute once after all sinks >>> have been created. >>> >>> Best, Fabian >>> >>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake : >>> >>>> Fabian, >>>> >>>> count() was just an example. What I would like to do is say run two map >>>> operations on the dataset (ds). Each map will have it's own reduction, so >>>> is there a way to avoid creating two jobs for such scenario? >>>> >>>> The reason is, reading these binary matrices are expensive. In our >>>> current MPI implementation, I am using memory maps for faster loading and >>>> reuse. >>>> >>>> Thank you, >>>> Saliya >>>> >>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> it looks like you are executing two distinct Flink jobs. >>>>> DataSet.count() triggers the execution of a new job. If you have an >>>>> execute() call in your program, this will lead to two Flink jobs being >>>>> executed. >>>>> It is not possible to share state among these jobs. >>>>> >>>>> Maybe you should add a custom count implementation (using a >>>>> ReduceFunction) which is executed in the same program as the other >>>>> ReduceFunction. >>>>> >>>>> Best, Fabian >>>>> >>>>> >>>>> >>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake : >>>>> >>>>>> Hi, >>>>>> >>>>>> I see that an InputFormat's open() and nextRecord() methods get >>>>>> called for each terminal operation on a given dataset using that particular >>>>>> InputFormat. Is it possible to avoid this - possibly using some caching >>>>>> technique in Flink? >>>>>> >>>>>> For example, I've some code like below and I see for both the last >>>>>> two statements (reduce() and count()) the above methods in the input format >>>>>> get called. Btw. this is a custom input format I wrote to represent a >>>>>> binary matrix stored as Short values. >>>>>> >>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat(); >>>>>> >>>>>> DataSet ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO); >>>>>> >>>>>> MapOperator op = ds.map(...) >>>>>> >>>>>> *op.reduce(...)* >>>>>> >>>>>> *op.count(...)* >>>>>> >>>>>> >>>>>> Thank you, >>>>>> Saliya >>>>>> -- >>>>>> Saliya Ekanayake >>>>>> Ph.D. Candidate | Research Assistant >>>>>> School of Informatics and Computing | Digital Science Center >>>>>> Indiana University, Bloomington >>>>>> Cell 812-391-4914 >>>>>> http://saliya.org >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Saliya Ekanayake >>>> Ph.D. Candidate | Research Assistant >>>> School of Informatics and Computing | Digital Science Center >>>> Indiana University, Bloomington >>>> Cell 812-391-4914 >>>> http://saliya.org >>>> >>> >>> >> >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >> Cell 812-391-4914 >> http://saliya.org >> > > -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington Cell 812-391-4914 http://saliya.org --001a11414f78858449052be8cc91 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I looked at the samples and I think what you meant is clea= r, but I didn't find a solution for my need. In my case, I want to use = the result from first map operation before I can apply the second map on th= e same=C2=A0data set. For simplicity, let's say I've a bunch= of short values represented as my data set. Then I need to find their aver= age, so I use a map and reduce. Then I want to map these short values with = another function, but it needs that average computed in the beginning to wo= rk correctly.

Is this possible without doing multiple re= ads of the input data to create the same dataset?

= Thank you,
saliya

On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <fhu= eske@gmail.com> wrote:
Yes, if you implement both maps in a single job, data is read = once.

2016-02-16 15:53 GMT+01:00 Saliya Eka= nayake <esaliya@gmail.com>:
Fabian,

I've a quick follow-up qu= estion on what you suggested. When streaming the same data through differen= t maps, were you implying that everything goes as single job in Flink, so d= ata read happens only once?=C2=A0

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhueske@gmail= .com> wrote:
It is not possible to "pin" data sets in memory, yet= .
However, you can stream the same data set through two different = mappers at the same time.

For instance you can have a job= like:

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 /---> Map 1 --> SInk= 1
Source --<
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 \---> Map 2 -->= ; SInk2

and execute it at once.
For that yo= u define you data flow and call execute once after all sinks have been crea= ted.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Sali= ya Ekanayake <esaliya@gmail.com>:
Fabian,

count() was just an exa= mple. What I would like to do is say run two map operations on the dataset = (ds). Each map will have it's own reduction, so is there a way to avoid= creating two jobs for such scenario?

The reason i= s, reading these binary matrices are expensive. In our current MPI implemen= tation, I am using memory maps for faster loading and reuse.

=
Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM,= Fabian Hueske <fhueske@gmail.com> wrote:
Hi,

it looks like you ar= e executing two distinct Flink jobs.
DataSet.count() triggers the execu= tion of a new job. If you have an execute() call in your program, this will= lead to two Flink jobs being executed.
It is not possible to= share state among these jobs.

Maybe you should add a cus= tom count implementation (using a ReduceFunction) which is executed in the = same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esaliya= @gmail.com>:
Hi,

I see that an InputFormat's open() and nextRe= cord() methods get called for each terminal operation on a given dataset us= ing that particular InputFormat. Is it possible to avoid this - possibly us= ing some caching technique in Flink?

For example, = I've some code like below and I see for both the last two statements (r= educe() and count()) the above methods in the input format get called. Btw.= this is a custom input format I wrote to represent a binary matrix stored = as Short values.

ShortMatrixInputFormat smif =3D new ShortMatrixInputFormat();
DataSet&=
lt;Short[]> ds =3D env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_T=
YPE_INFO);
MapOperator<Short[], DoubleStatistics> op =3D ds.=
map(...)
op.reduce(...)
op.count(...)

Th= ank you,
Saliya
--
Saliya Ekanayake
Ph.D. = Candidate | Research Assistant
School of Informatics = and Computing | Digital Science Center
Indiana Univer= sity, Bloomington
Cell 812-391-4914
http://saliya.org




--
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Infor= matics and Computing | Digital Science Center
Indiana= University, Bloomington
Cell 812-391-= 4914
http://saliya.org




--
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Infor= matics and Computing | Digital Science Center
Indiana= University, Bloomington
Cell 812-391-= 4914
http://saliya.org




--
=
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
--001a11414f78858449052be8cc91--