Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C474A18FAF for ; Mon, 25 Jan 2016 18:07:05 +0000 (UTC) Received: (qmail 90437 invoked by uid 500); 25 Jan 2016 18:07:05 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 90347 invoked by uid 500); 25 Jan 2016 18:07:05 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 90334 invoked by uid 99); 25 Jan 2016 18:07:05 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Jan 2016 18:07:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 14D5AC17B8 for ; Mon, 25 Jan 2016 18:07:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.88 X-Spam-Level: ** X-Spam-Status: No, score=2.88 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id PhmdIO_Q5VdO for ; Mon, 25 Jan 2016 18:06:58 +0000 (UTC) Received: from mail-vk0-f52.google.com (mail-vk0-f52.google.com [209.85.213.52]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id AA486210B2 for ; Mon, 25 Jan 2016 18:06:57 +0000 (UTC) Received: by mail-vk0-f52.google.com with SMTP id k1so78263280vkb.2 for ; Mon, 25 Jan 2016 10:06:57 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=jsOGtpsxaHoml0XBJDa+oYb1LSQv+MrcVTT4qfM8iog=; b=y4chy9E2M9ZaaShYu7xVnin8UkJArtp6nvYDKeN1NYFyfLWS20uUGL9eh6WyvaeFiK OwYhfeJQwwivU+D0QCg7Jit2nzPFIbC2jOG03l8Er3y/PeLx3G60LX26afMY5BMGt39z sT+aEHQPStOMFKNxobr9KLbY2FkY/0J5ce5ZsXacB8FeQjVZMIJPiybEPfmgfTFXnLso g2uoANKQ5zRoj+76SBaXcgZKNd3UnEfbjlSWTUqc7Yajh23t572k/D7NkVuoDTxRM9RL jHw55uVbtPRrmEwf0i1x1vdje9abqCls4LwMHeRLmJFk73IuJlI5EGHGKqglPVv+lf08 WQ2Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=jsOGtpsxaHoml0XBJDa+oYb1LSQv+MrcVTT4qfM8iog=; b=jxW6OXt5jiSwSxL3p+y7aJ2I+QYHTvv3K+optjB1hFErv6lVW/rHAewTomD6H7XIoz pALPLC73hBoegr0PADk4d6lxVdURjqi6gqXR0aaid5Hp1a1X4iZXlEhgKyU21XFB+QR9 LXWRc0ywBGjyloSXeTuMC4jmgG/wuEVYK9WvUI8pp6l7A/ivFHYltLSmwse6c02/7eGa VbKVng/QtR52bF7Znc/EJkFVCM152BwWQxVaWoOle8zKfVKQGRGz9YoFg2rPR48pcWV2 9aC68+oopwW7DsZV7r7BKRkgiKQfkyOS0p2v8Z9XK1skNdnOUwNErzMmoWiqEICeyLc5 bAUw== X-Gm-Message-State: AG10YOQfRcEhVyfCdl6bvK1qGGQ7ZZT4gkEBgvYNdWXRVanj2MSxEqt4nL7SpJCSjgUtyYaXIHkSLzLfKM6s/Q== MIME-Version: 1.0 X-Received: by 10.31.162.200 with SMTP id l191mr12066348vke.69.1453745216606; Mon, 25 Jan 2016 10:06:56 -0800 (PST) Received: by 10.31.197.66 with HTTP; Mon, 25 Jan 2016 10:06:56 -0800 (PST) In-Reply-To: References: <8B5EDF0A-0EC6-4714-AE5E-DD5E4FE249A6@apache.org> <70AC190A-7D72-4837-8F94-52685B46A67F@gmail.com> Date: Mon, 25 Jan 2016 13:06:56 -0500 Message-ID: Subject: Re: Reading Binary Data (Matrix) with Flink From: Saliya Ekanayake To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11439e243ad922052a2c6fd5 --001a11439e243ad922052a2c6fd5 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Fabian, Thank you, I think I've a better picture of this now. I think if I set DataSource tasks (a config option I guess?) equal to input splits that would do as I expected. Yes, will keep it at the same place across nodes. Thank you, Saliya On Mon, Jan 25, 2016 at 10:59 AM, Fabian Hueske wrote: > Hi Saliya, > > the number of parallel splits is controlled by the number of input splits > returned by the InputFormat.createInputSplits() method. This method > receives a parameter minNumSplits with is equal to the number of DataSour= ce > tasks. > > Flink handles input splits a bit different from Hadoop. In Hadoop, each > input split corresponds to one map task. In Flink you have a fixed number > of DataSource tasks and input splits are lazily distributed to source > tasks. If you have more splits than tasks, a data source requests a new > split when it is done with its last split until all splits are assigned. = If > your createInputSplits method returns less splits than minNumSplits, some > source tasks won't receive a split. > > If you read files from a local FS in a distributed (multi-node) setup, yo= u > have to be careful. Each node must have an exact copy of the data at > exactly the same location. Otherwise, it won't work. > > Best, Fabian > > 2016-01-25 16:46 GMT+01:00 Saliya Ekanayake : > >> Hi Fabian, >> >> Thank you for the information. >> >> So, is there a way I can get the task number within the InputFormat? Tha= t >> way I can use it to offset the block of rows. >> >> The file size is large to fit in a single process' memory, so the curren= t >> setup in MPI and Hadoop use the rank (task number) info to memory map th= e >> corresponding block of rows. In our experiments, we found this approach = to >> be the fastest because of the memory mapping rather buffered reads. Also= , >> the file is replicated across nodes and the reading (mapping) happens on= ly >> once. >> >> Thank you, >> Saliya >> >> On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske wrote= : >> >>> Hi Saliya, >>> >>> yes that is possible, however the requirements for reading a binary fil= e >>> from local fs are basically the same as for reading it from HDSF. >>> In order to be able to start reading different sections of a file in >>> parallel, you need to know the different starting positions. This can b= e >>> done by either having fixed offsets for blocks or adding some meta >>> information for the block start positions. InputFormats can divide the = work >>> of reading a file by generating multiple input splits. Each input split >>> defines the file, the start offset and the length to read. >>> >>> However, are you sure that reading a file in parallel will be faster >>> than reading it sequentially? >>> At least for HDDs, IO-bound workloads with "random" reading patterns ar= e >>> usually much slower than sequential reads. >>> >>> Cheers, Fabian >>> >>> 2016-01-24 19:10 GMT+01:00 Suneel Marthi : >>> >>>> There should be a env.readbinaryfile() IIRC, check that >>>> >>>> Sent from my iPhone >>>> >>>> On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake >>>> wrote: >>>> >>>> Thank you for the response on this, but I still have some doubt. >>>> Simply, the files is not in HDFS, it's in local storage. In Flink if I= run >>>> the program with, say 5 parallel tasks, what I would like to do is to = read >>>> a block of rows in each task as shown below. I looked at the simple CS= V >>>> reader and was thinking to create a custom one like that, but I would = need >>>> to know the task number to read the relevant block. Is this possible? >>>> >>>> >>>> >>>> Thank you, >>>> Saliya >>>> >>>> On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann >>>> wrote: >>>> >>>>> With readHadoopFile you can use all of Hadoop=E2=80=99s FileInputForm= ats and >>>>> thus you can also do everything with Flink, what you can do with Hado= op. >>>>> Simply take the same Hadoop FileInputFormat which you would take for >>>>> your MapReduce job. >>>>> >>>>> Cheers, >>>>> Till >>>>> =E2=80=8B >>>>> >>>>> On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake >>>>> wrote: >>>>> >>>>>> Thank you, I saw the readHadoopFile, but I was not sure how it can b= e >>>>>> used to the following, which is what I need. The logic of the code r= equires >>>>>> an entire row to operate on, so in our current implementation with P= tasks, >>>>>> each of them will read a rectangular block of (N/P) x N from the mat= rix. Is >>>>>> this possible with readHadoopFile? Also, the file may not be in hdfs= , so is >>>>>> it possible to refer to local disk in doing this? >>>>>> >>>>>> Thank you >>>>>> >>>>>> On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park >>>>>> wrote: >>>>>> >>>>>>> Hi Saliya, >>>>>>> >>>>>>> You can use the input format from Hadoop in Flink by using >>>>>>> readHadoopFile method. The method returns a dataset which of type i= s >>>>>>> Tuple2. Note that MapReduce equivalent transformation i= n Flink >>>>>>> is composed of map, groupBy, and reduceGroup. >>>>>>> >>>>>>> > On Jan 20, 2016, at 3:04 PM, Suneel Marthi >>>>>>> wrote: >>>>>>> > >>>>>>> > Guess u r looking for Flink's BinaryInputFormat to be able to rea= d >>>>>>> blocks of data from HDFS >>>>>>> > >>>>>>> > >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/ja= va/org/apache/flink/api/common/io/BinaryInputFormat.html >>>>>>> > >>>>>>> > On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake < >>>>>>> esaliya@gmail.com> wrote: >>>>>>> > Hi, >>>>>>> > >>>>>>> > I am trying to use Flink perform a parallel batch operation on a >>>>>>> NxN matrix represented as a binary file. Each (i,j) element is stor= ed as a >>>>>>> Java Short value. In a typical MapReduce programming with Hadoop, e= ach map >>>>>>> task will read a block of rows of this matrix and perform computati= on on >>>>>>> that block and emit result to the reducer. >>>>>>> > >>>>>>> > How is this done in Flink? I am new to Flink and couldn't find a >>>>>>> binary reader so far. Any help is greatly appreciated. >>>>>>> > >>>>>>> > Thank you, >>>>>>> > Saliya >>>>>>> > >>>>>>> > -- >>>>>>> > Saliya Ekanayake >>>>>>> > Ph.D. Candidate | Research Assistant >>>>>>> > School of Informatics and Computing | Digital Science Center >>>>>>> > Indiana University, Bloomington >>>>>>> > Cell 812-391-4914 >>>>>>> > http://saliya.org >>>>>>> > >>>>>>> >>>>>>> Regards, >>>>>>> Chiwan Park >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Saliya Ekanayake >>>>>> Ph.D. Candidate | Research Assistant >>>>>> School of Informatics and Computing | Digital Science Center >>>>>> Indiana University, Bloomington >>>>>> Cell 812-391-4914 >>>>>> http://saliya.org >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Saliya Ekanayake >>>> Ph.D. Candidate | Research Assistant >>>> School of Informatics and Computing | Digital Science Center >>>> Indiana University, Bloomington >>>> Cell 812-391-4914 >>>> http://saliya.org >>>> >>>> >>> >> >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >> Cell 812-391-4914 >> http://saliya.org >> > > --=20 Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington Cell 812-391-4914 http://saliya.org --001a11439e243ad922052a2c6fd5 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Fabian,

Thank you, I think I've = a better picture of this now. I think if I set DataSource tasks (a config o= ption I guess?) equal to input splits that would do as I expected.

Yes, will =C2=A0keep it at the same place across nodes.

Thank you,
Saliya

On Mon, Jan 25, 2016 at 10:59 = AM, Fabian Hueske <fhueske@gmail.com> wrote:
Hi Saliya,

<= /div>the number of parallel splits is controlled by the number of input spl= its returned by the InputFormat.createInputSplits() method. This method rec= eives a parameter minNumSplits with is equal to the number of DataSource ta= sks.

Flink handles input splits a bit different from Hadoop. I= n Hadoop, each input split corresponds to one map task. In Flink you have a= fixed number of DataSource tasks and input splits are lazily distributed t= o source tasks. If you have more splits than tasks, a data source requests = a new split when it is done with its last split until all splits are assign= ed. If your createInputSplits method returns less splits than minNumSplits,= some source tasks won't receive a split.

If you read fil= es from a local FS in a distributed (multi-node) setup, you have to be care= ful. Each node must have an exact copy of the data at exactly the same loca= tion. Otherwise, it won't work.

Best, Fabian

2016-01-25 16:46 GMT+01:00 Saliya Ekanayake <esaliya@gm= ail.com>:
= Hi Fabian,

Thank you for the information.

=
So, is there a way I can get the task number within the InputFor= mat? That way I can use it to offset the block of rows.=C2=A0
The file size is large to fit in a single process' memory, = so the current setup in MPI and Hadoop use the rank (task number) info to m= emory map the corresponding block of rows. In our experiments, we found thi= s approach to be the fastest because of the memory mapping rather buffered = reads. Also, the file is replicated across nodes and the reading (mapping) = happens only once.=C2=A0

Thank you,
Sali= ya

=

2016-01-24 1= 9:10 GMT+01:00 Suneel Marthi <suneel.marthi@gmail.com>= :
There should be = a env.readbinaryfile() IIRC, check that

Sent from my iPhone

On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake <esaliya@gmail.com> wrote:=

= Thank you for the response on this, but I still have some doubt. Simply, th= e files is not in HDFS, it's in local storage. In Flink if I run the pr= ogram with, say 5 parallel tasks, what I would like to do is to read a bloc= k of rows in each task as shown below. I looked at the simple CSV reader an= d was thinking to create a custom one like that, but I would need to know t= he task number to read the relevant block. Is this possible?

=
<image.png>

Thank you,
Sa= liya

On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann = <trohrmann@apa= che.org> wrote:

With readHadoopFile you can use all of Hadoop=E2=80=99s FileInputFormats and thus you can also do everything with Flink, w= hat you can do with Hadoop. Simply take the same Hadoop Fi= leInputFormat which you would take for your MapReduce job.

Cheers,
Till

=E2=80=8B

On Wed, Jan 20, 2016= at 3:16 PM, Saliya Ekanayake <esaliya@gmail.com> wrote:
=
Thank you, I saw the readHa= doopFile, but I was not sure how it can be used to the following, which is = what I need. The logic of the code requires an entire row to operate on, so= in our current implementation with P tasks, each of them will read a recta= ngular block of (N/P) x N from the matrix. Is this possible with readHadoop= File? Also, the file may not be in hdfs, so is it possible to refer to loca= l disk in doing this?

Thank you

On Wed, Jan 20, 2016= at 1:31 AM, Chiwan Park <chiwanpark@apache.org> wrote:<= br>
Hi Saliya,

You can use the input format from Hadoop in Flink by using readHadoopFile m= ethod. The method returns a dataset which of type is Tuple2<Key, Value&g= t;. Note that MapReduce equivalent transformation in Flink is composed of m= ap, groupBy, and reduceGroup.

> On Jan 20, 2016, at 3:04 PM, Suneel Marthi <smarthi@apache.org> wrote:
>
> Guess u r looking for Flink's BinaryInputFormat to be able to read= blocks of data from HDFS
>
> https://ci.apache.org/projects/flink/flink-docs= -release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.htm= l
>
> On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <esaliya@gmail.com> wrote: > Hi,
>
> I am trying to use Flink perform a parallel batch operation on a NxN m= atrix represented as a binary file. Each (i,j) element is stored as a Java = Short value. In a typical MapReduce programming with Hadoop, each map task = will read a block of rows of this matrix and perform computation on that bl= ock and emit result to the reducer.
>
> How is this done in Flink? I am new to Flink and couldn't find a b= inary reader so far. Any help is greatly appreciated.
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> htt= p://saliya.org
>

Regards,
Chiwan Park




--
Saliya Ekanayake
Ph.D. Candi= date | Research Assistant
School of Informatics and C= omputing | Digital Science Center
Indiana University,= Bloomington
Cell 812-391-4914
http://saliya.org




--
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Infor= matics and Computing | Digital Science Center
Indiana= University, Bloomington
Cell 812-391-= 4914
http://saliya.org




--
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Infor= matics and Computing | Digital Science Center
Indiana= University, Bloomington
Cell 812-391-= 4914
http://saliya.org




--
=
=
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
--001a11439e243ad922052a2c6fd5--