Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 243791812B for ; Tue, 16 Feb 2016 21:02:16 +0000 (UTC) Received: (qmail 94567 invoked by uid 500); 16 Feb 2016 21:02:15 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 94473 invoked by uid 500); 16 Feb 2016 21:02:15 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 94463 invoked by uid 99); 16 Feb 2016 21:02:15 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Feb 2016 21:02:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id EA6AC1A06A2 for ; Tue, 16 Feb 2016 21:02:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id PFc5dE4N6med for ; Tue, 16 Feb 2016 21:02:12 +0000 (UTC) Received: from mail-wm0-f54.google.com (mail-wm0-f54.google.com [74.125.82.54]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id E9DE45FB15 for ; Tue, 16 Feb 2016 21:02:11 +0000 (UTC) Received: by mail-wm0-f54.google.com with SMTP id c200so182230903wme.0 for ; Tue, 16 Feb 2016 13:02:11 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=9jYc2iB7yKV2Pu7f3g7aDTGNThrLg1TNeQj2DK64K+g=; b=U+X1wAVmY5U8zhytcS3MCW5PkMRfg5Agd7NNysWVJr17rAv4dmRM9RBqWd2/CdfYK0 pGcHweeNkhbIKRwRvksgx3ZNKdSRU0Rvu/gGQh82PQSmOio3M7f+QP+OnkH9La7s3N2E qDe/1Q5J9X+BrtcffnFcKZUyUE6VzYnNDCObJVi1qV4w2+kpQaCTDYgJQXmpk6wv0aWP dN5CcscKpezhUgilFOwinuWfWJYyaJJcyCGC0/yglks5gKlFBBebt/243R5h2v1/IZ8z Jt3s6nPOWun+nqxrfwSb4lMNJhTBrvaju3ZFYUCPR2RRflsgxWXGBDG09aPCCpl4joBY b+Kg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=9jYc2iB7yKV2Pu7f3g7aDTGNThrLg1TNeQj2DK64K+g=; b=J+6d65wxk/gApAQN7NgtUFdx7L6aPhYrFdNnL/abruWk2bg8gvo+qW1KWohPrO1r72 PAWMul38NNAe1tmysC3U6hbxDapDojnSjHX7RF9MClKzk48hA5ArAPnA3o+sEeYK4dHn JbMCEyUJuyUFIK2tksg9zN3JU+/aoVn6gx8DELXAEP9imCSg0z66JA4uS/9DVmvWkWin LsPi7rMUVYeRSB24y5tYBMkGvpuOix+31yMLG8hOrkVjKWHkOjEYWRatuZif6ZqyPSAW Kp0K9ap7Jv47AmhSo71Iz8PblouTgWS9Y2XObQJN3WKxp/pK6i0DF4arHAnOcrrtZveE PHag== X-Gm-Message-State: AG10YORS3hvi76P5HWFsyHr6Jd6q3IQP0F4zebm3UD5ywe9woZiJOu09lK3n/wCPofczCsjFdwVjOfYUlMUHVA== X-Received: by 10.194.87.1 with SMTP id t1mr24216707wjz.170.1455656530893; Tue, 16 Feb 2016 13:02:10 -0800 (PST) MIME-Version: 1.0 Received: by 10.28.224.69 with HTTP; Tue, 16 Feb 2016 13:01:41 -0800 (PST) In-Reply-To: References: From: Fabian Hueske Date: Tue, 16 Feb 2016 22:01:41 +0100 Message-ID: Subject: Re: Read once input data? To: user@flink.apache.org Content-Type: multipart/alternative; boundary=089e0102ee2a7062bd052be972d5 --089e0102ee2a7062bd052be972d5 Content-Type: text/plain; charset=UTF-8 Broadcasted DataSets are stored on the JVM heap of each task manager (but shared among multiple slots on the same TM), hence the size restriction. There are two ways to retrieve a DataSet (such as the result of a reduce). 1) if you want to fetch the result into your client program use DataSet.collect(). This immediately triggers an execution and fetches the result from the cluster. 2) if you want to use the result for a computation in the cluster use broadcast sets as described above. 2016-02-16 21:54 GMT+01:00 Saliya Ekanayake : > Thank you, yes, this makes sense. The broadcasted data in my case would a > large array of 3D coordinates, > > On a side note, how can I take the output from a reduce function? I can > see methods to write it to a given output, but is it possible to retrieve > the reduced result back to the program - like a double value representing > the average in the previous example. > > > On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske wrote: > >> You can use so-called BroadcastSets to send any sufficiently small >> DataSet (such as a computed average) to any other function and use it there. >> However, in your case you'll end up with a data flow that branches (at >> the source) and merges again (when the average is send to the second map). >> Such patterns can cause deadlocks and can therefore not be pipelined >> which means that the data before the branch is written to disk and read >> again. >> In your case it might be even better to read the data twice instead of >> reading, writing, and reading it. >> >> Fabian >> >> 2016-02-16 21:15 GMT+01:00 Saliya Ekanayake : >> >>> I looked at the samples and I think what you meant is clear, but I >>> didn't find a solution for my need. In my case, I want to use the result >>> from first map operation before I can apply the second map on the *same* data >>> set. For simplicity, let's say I've a bunch of short values represented as >>> my data set. Then I need to find their average, so I use a map and reduce. >>> Then I want to map these short values with another function, but it needs >>> that average computed in the beginning to work correctly. >>> >>> Is this possible without doing multiple reads of the input data to >>> create the same dataset? >>> >>> Thank you, >>> saliya >>> >>> On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske >>> wrote: >>> >>>> Yes, if you implement both maps in a single job, data is read once. >>>> >>>> 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake : >>>> >>>>> Fabian, >>>>> >>>>> I've a quick follow-up question on what you suggested. When streaming >>>>> the same data through different maps, were you implying that everything >>>>> goes as single job in Flink, so data read happens only once? >>>>> >>>>> Thanks, >>>>> Saliya >>>>> >>>>> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske >>>>> wrote: >>>>> >>>>>> It is not possible to "pin" data sets in memory, yet. >>>>>> However, you can stream the same data set through two different >>>>>> mappers at the same time. >>>>>> >>>>>> For instance you can have a job like: >>>>>> >>>>>> /---> Map 1 --> SInk1 >>>>>> Source --< >>>>>> \---> Map 2 --> SInk2 >>>>>> >>>>>> and execute it at once. >>>>>> For that you define you data flow and call execute once after all >>>>>> sinks have been created. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake : >>>>>> >>>>>>> Fabian, >>>>>>> >>>>>>> count() was just an example. What I would like to do is say run two >>>>>>> map operations on the dataset (ds). Each map will have it's own reduction, >>>>>>> so is there a way to avoid creating two jobs for such scenario? >>>>>>> >>>>>>> The reason is, reading these binary matrices are expensive. In our >>>>>>> current MPI implementation, I am using memory maps for faster loading and >>>>>>> reuse. >>>>>>> >>>>>>> Thank you, >>>>>>> Saliya >>>>>>> >>>>>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> it looks like you are executing two distinct Flink jobs. >>>>>>>> DataSet.count() triggers the execution of a new job. If you have an >>>>>>>> execute() call in your program, this will lead to two Flink jobs being >>>>>>>> executed. >>>>>>>> It is not possible to share state among these jobs. >>>>>>>> >>>>>>>> Maybe you should add a custom count implementation (using a >>>>>>>> ReduceFunction) which is executed in the same program as the other >>>>>>>> ReduceFunction. >>>>>>>> >>>>>>>> Best, Fabian >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake : >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I see that an InputFormat's open() and nextRecord() methods get >>>>>>>>> called for each terminal operation on a given dataset using that particular >>>>>>>>> InputFormat. Is it possible to avoid this - possibly using some caching >>>>>>>>> technique in Flink? >>>>>>>>> >>>>>>>>> For example, I've some code like below and I see for both the last >>>>>>>>> two statements (reduce() and count()) the above methods in the input format >>>>>>>>> get called. Btw. this is a custom input format I wrote to represent a >>>>>>>>> binary matrix stored as Short values. >>>>>>>>> >>>>>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat(); >>>>>>>>> >>>>>>>>> DataSet ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO); >>>>>>>>> >>>>>>>>> MapOperator op = ds.map(...) >>>>>>>>> >>>>>>>>> *op.reduce(...)* >>>>>>>>> >>>>>>>>> *op.count(...)* >>>>>>>>> >>>>>>>>> >>>>>>>>> Thank you, >>>>>>>>> Saliya >>>>>>>>> -- >>>>>>>>> Saliya Ekanayake >>>>>>>>> Ph.D. Candidate | Research Assistant >>>>>>>>> School of Informatics and Computing | Digital Science Center >>>>>>>>> Indiana University, Bloomington >>>>>>>>> Cell 812-391-4914 >>>>>>>>> http://saliya.org >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Saliya Ekanayake >>>>>>> Ph.D. Candidate | Research Assistant >>>>>>> School of Informatics and Computing | Digital Science Center >>>>>>> Indiana University, Bloomington >>>>>>> Cell 812-391-4914 >>>>>>> http://saliya.org >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Saliya Ekanayake >>>>> Ph.D. Candidate | Research Assistant >>>>> School of Informatics and Computing | Digital Science Center >>>>> Indiana University, Bloomington >>>>> Cell 812-391-4914 >>>>> http://saliya.org >>>>> >>>> >>>> >>> >>> >>> -- >>> Saliya Ekanayake >>> Ph.D. Candidate | Research Assistant >>> School of Informatics and Computing | Digital Science Center >>> Indiana University, Bloomington >>> Cell 812-391-4914 >>> http://saliya.org >>> >> >> > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org > --089e0102ee2a7062bd052be972d5 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Broadcasted DataSets are stored on the= JVM heap of each task manager (but shared among multiple slots on the same= TM), hence the size restriction.

There are two ways to retrie= ve a DataSet (such as the result of a reduce).
1) if you want to f= etch the result into your client program use DataSet.collect(). This immedi= ately triggers an execution and fetches the result from the cluster.
2) if you want to use the result for a computation in the cluster use br= oadcast sets as described above.
=
2016-02-16 21:54 GMT+01:00 Saliya Ekanayake = <esaliya@gmail.com>:
Thank you, yes, this makes sense. The broadcasted data in my ca= se would a large array of 3D coordinates,=C2=A0

On a sid= e note, how can I take the output from a reduce function? I can see methods= to write it to a given output, but is it possible to retrieve the reduced = result back to the program - like a double value representing the average i= n the previous example.


On= Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <fhueske@gmail.com> wrote:
<= div>You can use so-called BroadcastSets to send any sufficiently small Data= Set (such as a computed average) to any other function and use it there.
However, in your case you'll end up with a data flow that branch= es (at the source) and merges again (when the average is send to the second= map).
Such patterns can cause deadlocks and can therefore not be pipeli= ned which means that the data before the branch is written to disk and read= again.
In your case it might be even better to read the data twi= ce instead of reading, writing, and reading it.

Fabian

2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <<= a href=3D"mailto:esaliya@gmail.com" target=3D"_blank">esaliya@gmail.com= >:
I looked at= the samples and I think what you meant is clear, but I didn't find a s= olution for my need. In my case, I want to use the result from first map op= eration before I can apply the second map on the same=C2=A0data set.= For simplicity, let's say I've a bunch of short values represented= as my data set. Then I need to find their average, so I use a map and redu= ce. Then I want to map these short values with another function, but it nee= ds that average computed in the beginning to work correctly.

=
Is this possible without doing multiple reads of the input data to cre= ate the same dataset?

Thank you,
saliya<= /div>

On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <<= a href=3D"mailto:fhueske@gmail.com" target=3D"_blank">fhueske@gmail.com= > wrote:
Yes, = if you implement both maps in a single job, data is read once.

2016-02-16= 15:53 GMT+01:00 Saliya Ekanayake <esaliya@gmail.com>:
Fabian,

I&= #39;ve a quick follow-up question on what you suggested. When streaming the= same data through different maps, were you implying that everything goes a= s single job in Flink, so data read happens only once?=C2=A0

=
Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fa= bian Hueske <fhueske@gmail.com> wrote:
It is not possible to "pin&qu= ot; data sets in memory, yet.
However, you can stream the same dat= a set through two different mappers at the same time.

For= instance you can have a job like:

=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= /---> Map 1 --> SInk1
Source --<
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0 \---> Map 2 --> SInk2

and execute it at once= .
For that you define you data flow and call execute once aft= er all sinks have been created.

Best, Fabian

2016= -02-15 21:32 GMT+01:00 Saliya Ekanayake <esaliya@gmail.com>:=
Fabian,

<= div>count() was just an example. What I would like to do is say run two map= operations on the dataset (ds). Each map will have it's own reduction,= so is there a way to avoid creating two jobs for such scenario?
=
The reason is, reading these binary matrices are expensive. = In our current MPI implementation, I am using memory maps for faster loadin= g and reuse.

Thank you,
Saliya

On Mo= n, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhueske@gmail.com> wrote:
Hi, <= br>
it looks like you are executing two distinct Flink jobs.
DataSet= .count() triggers the execution of a new job. If you have an execute() call= in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction= ) which is executed in the same program as the other ReduceFunction.
Best, Fabian



2016-02-15 21:05 GMT+01:00 = Saliya Ekanayake <esaliya@gmail.com>:
Hi,

I see that an InputFo= rmat's open() and nextRecord() methods get called for each terminal ope= ration on a given dataset using that particular InputFormat. Is it possible= to avoid this - possibly using some caching technique in Flink?
=
For example, I've some code like below and I see for bot= h the last two statements (reduce() and count()) the above methods in the i= nput format get called. Btw. this is a custom input format I wrote to repre= sent a binary matrix stored as Short values.

ShortMatrixInputFormat smi=
f =3D new ShortMatrixInputFormat()=
;
DataSet<Short[]> ds =3D env.createInput(smif, Bas=
icArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], Doub=
leStatistics> op =3D ds.map(...)
op.reduce(...)
op.count(...)
<= /pre>

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http:/= /saliya.org




--
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Infor= matics and Computing | Digital Science Center
Indiana= University, Bloomington
Cell 812-391-= 4914
http://saliya.org




--
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Infor= matics and Computing | Digital Science Center
Indiana= University, Bloomington
Cell 812-391-= 4914
http://saliya.org




--
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Infor= matics and Computing | Digital Science Center
Indiana= University, Bloomington
Cell 812-391-= 4914
http://saliya.org




--
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Infor= matics and Computing | Digital Science Center
Indiana= University, Bloomington
Cell 812-391-= 4914
http://saliya.org

--089e0102ee2a7062bd052be972d5--