Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6FD1F1820A for ; Tue, 16 Feb 2016 17:04:27 +0000 (UTC) Received: (qmail 58391 invoked by uid 500); 16 Feb 2016 17:04:27 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 58303 invoked by uid 500); 16 Feb 2016 17:04:27 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 58293 invoked by uid 99); 16 Feb 2016 17:04:26 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Feb 2016 17:04:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 3CB5D180693 for ; Tue, 16 Feb 2016 17:04:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id o2pBnquGc8ju for ; Tue, 16 Feb 2016 17:04:22 +0000 (UTC) Received: from mail-wm0-f49.google.com (mail-wm0-f49.google.com [74.125.82.49]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id 2F5C75FAFE for ; Tue, 16 Feb 2016 17:04:22 +0000 (UTC) Received: by mail-wm0-f49.google.com with SMTP id g62so118327045wme.1 for ; Tue, 16 Feb 2016 09:04:22 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=AbT6jeMA4Zm2PAMWIEiQuexKXsKzW751UBsvvk4xAJY=; b=j2g+4qt2zNzyI+/50xSS8uhwfHwDWAC1Asm2s3QPxwU7zcYrZ3C+k2FTU2NQSwoTTw bi8SFSGyC8a65fTZCwWRRD862ttHG9i2klR8f9KAxMOGAgwsMr2uS6VnQeLu1Xg8Fos9 rfyDQAW+KiBEs3lJhjpZ6lhzxiPQeixizt0Gegrl7vDW0BTJLJE0MOr/+kmIbew7o5p/ zKzzYlWT4RN3dofBuYo6Rlow7VewVUpr+N7SZZ77BQE48Lg31URNorF1zTCnRl2/FoZh 3347zzSmEmeWeNrJzQskaWxwspBmlWz2AXNgUHbsIV8804HpF6h/l6qmhm6cLneR3wZi IsXw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=AbT6jeMA4Zm2PAMWIEiQuexKXsKzW751UBsvvk4xAJY=; b=Ak2Y/EFhZ2JbSp90nWOx4NvnwiVahZd4w6pNjLcJ0PqplOMzRs48bZAYVE272BGutr SDhTQQ9uveTL0YxNdRxmGoNAQdtEVUidY2oz+AdmQEWcdXaYV34S+mlIRM5bqad8xbZB OJZ9DPc3SrdrkfDmZMH50bORv+3b/+wpvPlXETyC7ClkvPbbS8+A5PYpfj8FKwxcizSD 24WiuCH0TzbY2s5nlhBAWSImpDZKzpl0MY85LisPRnVf53K9u6erUTeVoG3DKsxUglYF Fn/ql/0jh4jphMWgpK/tgOjADUwA9/RI8KbgFh6i6+TnOGPqr37nUuZdbJaPVNOrwQiA NMTw== X-Gm-Message-State: AG10YORRF3TAbGxD77/EBXc5WH5KJTvE4iRYaBy/JCAN4/8mwRq6ttvH6MGABMcSIV+6pY/ownnHQRpZZG3nxQ== X-Received: by 10.194.76.161 with SMTP id l1mr23191126wjw.108.1455642261872; Tue, 16 Feb 2016 09:04:21 -0800 (PST) MIME-Version: 1.0 Received: by 10.28.224.69 with HTTP; Tue, 16 Feb 2016 09:03:52 -0800 (PST) In-Reply-To: References: From: Fabian Hueske Date: Tue, 16 Feb 2016 18:03:52 +0100 Message-ID: Subject: Re: Read once input data? To: user@flink.apache.org Content-Type: multipart/alternative; boundary=047d7bd91c8af08b9e052be61fad --047d7bd91c8af08b9e052be61fad Content-Type: text/plain; charset=UTF-8 Yes, if you implement both maps in a single job, data is read once. 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake : > Fabian, > > I've a quick follow-up question on what you suggested. When streaming the > same data through different maps, were you implying that everything goes as > single job in Flink, so data read happens only once? > > Thanks, > Saliya > > On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske wrote: > >> It is not possible to "pin" data sets in memory, yet. >> However, you can stream the same data set through two different mappers >> at the same time. >> >> For instance you can have a job like: >> >> /---> Map 1 --> SInk1 >> Source --< >> \---> Map 2 --> SInk2 >> >> and execute it at once. >> For that you define you data flow and call execute once after all sinks >> have been created. >> >> Best, Fabian >> >> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake : >> >>> Fabian, >>> >>> count() was just an example. What I would like to do is say run two map >>> operations on the dataset (ds). Each map will have it's own reduction, so >>> is there a way to avoid creating two jobs for such scenario? >>> >>> The reason is, reading these binary matrices are expensive. In our >>> current MPI implementation, I am using memory maps for faster loading and >>> reuse. >>> >>> Thank you, >>> Saliya >>> >>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske >>> wrote: >>> >>>> Hi, >>>> >>>> it looks like you are executing two distinct Flink jobs. >>>> DataSet.count() triggers the execution of a new job. If you have an >>>> execute() call in your program, this will lead to two Flink jobs being >>>> executed. >>>> It is not possible to share state among these jobs. >>>> >>>> Maybe you should add a custom count implementation (using a >>>> ReduceFunction) which is executed in the same program as the other >>>> ReduceFunction. >>>> >>>> Best, Fabian >>>> >>>> >>>> >>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake : >>>> >>>>> Hi, >>>>> >>>>> I see that an InputFormat's open() and nextRecord() methods get called >>>>> for each terminal operation on a given dataset using that particular >>>>> InputFormat. Is it possible to avoid this - possibly using some caching >>>>> technique in Flink? >>>>> >>>>> For example, I've some code like below and I see for both the last two >>>>> statements (reduce() and count()) the above methods in the input format get >>>>> called. Btw. this is a custom input format I wrote to represent a binary >>>>> matrix stored as Short values. >>>>> >>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat(); >>>>> >>>>> DataSet ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO); >>>>> >>>>> MapOperator op = ds.map(...) >>>>> >>>>> *op.reduce(...)* >>>>> >>>>> *op.count(...)* >>>>> >>>>> >>>>> Thank you, >>>>> Saliya >>>>> -- >>>>> Saliya Ekanayake >>>>> Ph.D. Candidate | Research Assistant >>>>> School of Informatics and Computing | Digital Science Center >>>>> Indiana University, Bloomington >>>>> Cell 812-391-4914 >>>>> http://saliya.org >>>>> >>>> >>>> >>> >>> >>> -- >>> Saliya Ekanayake >>> Ph.D. Candidate | Research Assistant >>> School of Informatics and Computing | Digital Science Center >>> Indiana University, Bloomington >>> Cell 812-391-4914 >>> http://saliya.org >>> >> >> > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org > --047d7bd91c8af08b9e052be61fad Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Yes, if you implement both maps in a single job, data is r= ead once.

2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <esaliya@gmail.com>:
Fabian,

<= /div>
I've a quick follow-up question on what you suggested. When s= treaming the same data through different maps, were you implying that every= thing goes as single job in Flink, so data read happens only once?=C2=A0

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <= ;fhueske@gmail.com> wrote:
It is not possible to "pin" data sets in memory, yet.
<= /div>However, you can stream the same data set through two different mapper= s at the same time.

For instance you can have a job like:=

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 /---> Map 1 --> SInk1
<= /div>
Source --<
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 \---> Map 2 --> SInk= 2

and execute it at once.
For that you defi= ne you data flow and call execute once after all sinks have been created.
Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Eka= nayake <esaliya@gmail.com>:
Fabian,

count() was just an example. = What I would like to do is say run two map operations on the dataset (ds). = Each map will have it's own reduction, so is there a way to avoid creat= ing two jobs for such scenario?

The reason is, rea= ding these binary matrices are expensive. In our current MPI implementation= , I am using memory maps for faster loading and reuse.

=
Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabia= n Hueske <fhueske@gmail.com> wrote:
Hi,

it looks like you are exe= cuting two distinct Flink jobs.
DataSet.count() triggers the execution = of a new job. If you have an execute() call in your program, this will lead= to two Flink jobs being executed.
It is not possible to shar= e state among these jobs.

Maybe you should add a custom c= ount implementation (using a ReduceFunction) which is executed in the same = program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esaliya@gmai= l.com>:
Hi= ,

I see that an InputFormat's open() and nextRecord(= ) methods get called for each terminal operation on a given dataset using t= hat particular InputFormat. Is it possible to avoid this - possibly using s= ome caching technique in Flink?

For example, I'= ;ve some code like below and I see for both the last two statements (reduce= () and count()) the above methods in the input format get called. Btw. this= is a custom input format I wrote to represent a binary matrix stored as Sh= ort values.

ShortMatrixInputFormat smif =3D new ShortMatrixInputFormat();
DataSet<Sh=
ort[]> ds =3D env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_IN=
FO);
MapOperator<Short[], DoubleStatistics> op =3D ds.map(..=
.)
op.reduce(...)
op.count(...)

Thank yo= u,
Saliya
--
Saliya Ekanayake
Ph.D. Candid= ate | Research Assistant
School of Informatics and Co= mputing | Digital Science Center
Indiana University, = Bloomington
Cell 812-391-4914
http://saliya.org




--
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Infor= matics and Computing | Digital Science Center
Indiana= University, Bloomington
Cell 812-391-= 4914
http://saliya.org




--
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Infor= matics and Computing | Digital Science Center
Indiana= University, Bloomington
Cell 812-391-= 4914
http://saliya.org

--047d7bd91c8af08b9e052be61fad--