Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 15542200B21 for ; Fri, 10 Jun 2016 14:11:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 14022160A38; Fri, 10 Jun 2016 12:11:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E0BA5160A04 for ; Fri, 10 Jun 2016 14:11:05 +0200 (CEST) Received: (qmail 8836 invoked by uid 500); 10 Jun 2016 12:11:04 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 8827 invoked by uid 99); 10 Jun 2016 12:11:04 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jun 2016 12:11:04 +0000 Received: from mail-wm0-f53.google.com (mail-wm0-f53.google.com [74.125.82.53]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 0A1891A0178 for ; Fri, 10 Jun 2016 12:11:02 +0000 (UTC) Received: by mail-wm0-f53.google.com with SMTP id n184so264111463wmn.1 for ; Fri, 10 Jun 2016 05:11:02 -0700 (PDT) X-Gm-Message-State: ALyK8tLOyacks6g6gKwM+7LBcRMkuWgejRH8tDyN/XIlGhTZbsl0T+vndYYf5xArneUhtNCYnfA67fZNELC3Eg== X-Received: by 10.194.246.129 with SMTP id xw1mr1928765wjc.142.1465560661633; Fri, 10 Jun 2016 05:11:01 -0700 (PDT) MIME-Version: 1.0 Received: by 10.194.94.35 with HTTP; Fri, 10 Jun 2016 05:10:42 -0700 (PDT) In-Reply-To: References: From: Robert Metzger Date: Fri, 10 Jun 2016 14:10:42 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Reading whole files (from S3) To: "user@flink.apache.org" Content-Type: multipart/alternative; boundary=089e01681caaa236240534eb6e6b archived-at: Fri, 10 Jun 2016 12:11:07 -0000 --089e01681caaa236240534eb6e6b Content-Type: text/plain; charset=UTF-8 Hi, setting the unsplittable attribute in the constructor is fine. The field's value will be send to the cluster. So what happens is that you initialize the input format in your client program. Then, its serialized, send over the network to the machines and deserilaized again. So the value you've set in the ctor will end up in the cluster. On Fri, Jun 10, 2016 at 10:53 AM, Andrea Cisternino wrote: > Hi, > > I am replying to myself for the records and to provide an update on what I > am trying to do. > > I have looked into Mahout's XmlInputFormat class but unfortunately it > doesn't solve my problem. > > My exploratory work with Flink tries to reproduce the key steps that we > already perform in a quite large Apache Spark application that runs on > Amazon EMR. > > For our use case the GPX files are not collections of independent records > that could be split and analyzed in parallel. Instead, more than 95% of > them are considered by our algorithms as a single record (a so called > "Track"). > > IOW, we would not gain anything by splitting the files because in the vast > majority of the cases we would get only one slice out of one file defeating > the purpose of splitting them in the first place. > > GPX files have also another nasty property: they come in two versions (1.0 > and 1.1, see more at http://www.topografix.com/gpx.asp.) > Important attributes of a point (e.g. speed) are encoded very differently > in the two versions and therefore the parsing logic must be different, at > least for some sections of the file. > > To recognize the file version, the parser must look at the entire file > because this information is available only in the namespace declaration of > the root element. > > On top of all of this I think that, because of their small size and > because we read all of them from S3, splitting within the file is not an > issue. Can you confirm that? > > Going back to my WholeFileInputFormat class I am worried about setting > the unsplittable attribute to true in the constructor. Will the > constructor be invoked also when running in cluster? > > Well, I think i really need to setup a small Flink cluster and try it > myself :) > > Thanks again. > Andrea. > > On 8 June 2016 at 08:16, Andrea Cisternino wrote: > >> Jamie, Suneel thanks a lot, your replies have been very helpful. >> >> I will definitely take a look at XMLInputFormat. >> >> In any case the files are not very big: on average 100-200kB up to a max >> of a couple of MB. >> >> >> On 8 June 2016 at 04:23, Suneel Marthi wrote: >> >>> You can use Mahout XMLInputFormat with Flink - HAdoopInputFormat >>> definitions. See >>> >>> >>> >>> http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache-flink >>> >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Read-XML-from-HDFS-td7023.html >>> >>> >>> On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier >>> wrote: >>> >>>> Hi Andrea, >>>> >>>> How large are these data files? The implementation you've mentioned >>>> here is only usable if they are very small. If so, you're fine. If not >>>> read on... >>>> >>>> Processing XML input files in parallel is tricky. It's not a great >>>> format for this type of processing as you've seen. They are tricky to >>>> split and more complex to iterate through than simpler formats. However, >>>> others have implemented XMLInputFormat classes for Hadoop. Have you looked >>>> at these? Mahout has an XMLInputFormat implementation for example but I >>>> haven't used it directly. >>>> >>>> Anyway, you can reuse Hadoop InputFormat implementations in Flink >>>> directly. This is likely a good route. See Flink's HadoopInputFormat >>>> class. >>>> >>>> -Jamie >>>> >>>> >>>> On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino < >>>> a.cisternino@gmail.com> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I am evaluating Apache Flink for processing large sets of Geospatial >>>>> data. >>>>> The use case I am working on will involve reading a certain number of >>>>> GPX files stored on Amazon S3. >>>>> >>>>> GPX files are actually XML files and therefore cannot be read on a >>>>> line by line basis. >>>>> One GPX file will produce one or more Java objects that will contain >>>>> the geospatial data we need to process (mostly a list of geographical >>>>> points). >>>>> >>>>> To cover this use case I tried to extend the FileInputFormat class: >>>>> >>>>> public class WholeFileInputFormat extends FileInputFormat >>>>> { >>>>> private boolean hasReachedEnd = false; >>>>> >>>>> public WholeFileInputFormat() { >>>>> unsplittable = true; >>>>> } >>>>> >>>>> @Override >>>>> public void open(FileInputSplit fileSplit) throws IOException { >>>>> super.open(fileSplit); >>>>> hasReachedEnd = false; >>>>> } >>>>> >>>>> @Override >>>>> public String nextRecord(String reuse) throws IOException { >>>>> // uses apache.commons.io.IOUtils >>>>> String fileContent = IOUtils.toString(stream, >>>>> StandardCharsets.UTF_8); >>>>> hasReachedEnd = true; >>>>> return fileContent; >>>>> } >>>>> >>>>> @Override >>>>> public boolean reachedEnd() throws IOException { >>>>> return hasReachedEnd; >>>>> } >>>>> } >>>>> >>>>> This class returns the content of the whole file as a string. >>>>> >>>>> Is this the right approach? >>>>> It seems to work when run locally with local files but I wonder if it >>>>> would >>>>> run into problems when tested in a cluster. >>>>> >>>>> Thanks in advance. >>>>> Andrea. >>>>> >>>>> -- >>>>> Andrea Cisternino, Erlangen, Germany >>>>> GitHub: http://github.com/acisternino >>>>> GitLab: https://gitlab.com/u/acisternino >>>>> >>>> >>>> >>>> >>>> -- >>>> >>>> Jamie Grier >>>> data Artisans, Director of Applications Engineering >>>> @jamiegrier >>>> jamie@data-artisans.com >>>> >>>> >>> >> >> >> -- >> Andrea Cisternino, Erlangen, Germany >> LinkedIn: http://www.linkedin.com/in/andreacisternino >> GitHub: http://github.com/acisternino >> > > > > -- > Andrea Cisternino, Erlangen, Germany > LinkedIn: http://www.linkedin.com/in/andreacisternino > GitHub: http://github.com/acisternino > --089e01681caaa236240534eb6e6b Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,
setting the unsplittable attribute in the construc= tor is fine. The field's value will be send to the cluster.
S= o what happens is that you initialize the input format in your client progr= am. Then, its serialized, send over the network to the machines and deseril= aized again. So the value you've set in the ctor will end up in the clu= ster.

= On Fri, Jun 10, 2016 at 10:53 AM, Andrea Cisternino <<= a href=3D"mailto:a.cisternino@gmail.com" target=3D"_blank">a.cisternino@gma= il.com> wrote:
Hi,

I am replying to myself for the records and to provide an u= pdate on what I am trying to do.

I have looked into Mahout's XmlInputFormat class bu= t unfortunately it doesn't solve my problem.

My exploratory work= with Flink tries to reproduce the key steps that we already perform in a q= uite large Apache Spark application that runs on Amazon EMR.

For our= use case the GPX files are not collections of independent records that cou= ld be split and analyzed in parallel. Instead, more than 95% of them are co= nsidered by our algorithms as a single record (a so called "Track"= ;).

IOW, we would not gain anything by splitting the files because i= n the vast majority of the cases we would get only one slice out of one fil= e defeating the purpose of splitting them in the first place.

GPX fi= les have also another nasty property: they come in two versions (1.0 and 1.= 1, see more at http://www.topografix.com/gpx.asp.)
Important attributes of a po= int (e.g. speed) are encoded very differently in the two versions and there= fore the parsing logic must be different, at least for some sections of the= file.

To recognize the file version, the parser must look at the en= tire file because this information is available only in the namespace decla= ration of the root element.

On top of all of this I think that, beca= use of their small size and because we read all of them from S3, splitting = within the file is not an issue. Can you confirm that?

Going back to= my WholeFileInputFormat class I am worried about setting the unsplittable attribute to true in the constructor. Will the constructor be invoked als= o when running in cluster?

Well, I think i really need to setup a sm= all Flink cluster and try it myself :)

Thanks again.
=C2=A0 Andrea.

On 8 June 2016 at 08:16, Andrea Cisternino <a.= cisternino@gmail.com> wrote:
Jamie, Suneel thanks a lot, your replies = have been very helpful.

I will definitely take a look at XMLIn= putFormat.

In any case the files are not very big: on average = 100-200kB up to a max of a couple of MB.


On 8 June 2016 at 04:2= 3, Suneel Marthi <smarthi@apache.org> wrote:
You can use Mahout XMLInputFormat with= Flink - HAdoopInputFormat definitions. See=C2=A0

http://stackoverflow.com/questions/29429428/xmlinputformat-for-apache= -flink


On Tue, Jun 7, 2016 at 10:11 PM, Jamie Grier <jamie@data-artisans.com> wrote:
Hi Andrea,

How large are these data files?=C2= =A0 The implementation you've mentioned here is only usable if they are= very small.=C2=A0 If so, you're fine.=C2=A0 If not read on...

Processing XML input files in parallel is tricky.=C2=A0 It= 's not a great format for this type of processing as you've seen.= =C2=A0 They are tricky to split and more complex to iterate through than si= mpler formats. However, others have implemented XMLInputFormat classes for = Hadoop.=C2=A0 Have you looked at these?=C2=A0 Mahout has an XMLInputFormat = implementation for example but I haven't used it directly.
Anyway, you can reuse Hadoop InputFormat implementations in Fl= ink directly.=C2=A0 This is likely a good route.=C2=A0 See Flink's Hado= opInputFormat class. =C2=A0

-Jamie
=C2= =A0=C2=A0

On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <= ;a.cisternino@g= mail.com> wrote:
Hi al= l,

I am evaluating Apache Flink for processing large sets of Geospat= ial data.
The use case I am working on will involve reading a certain nu= mber of GPX files stored on Amazon S3.

GPX files are actually XML fi= les and therefore cannot be read on a line by line basis.
One GPX file w= ill produce one or more Java objects that will contain the geospatial data = we need to process (mostly a list of geographical points).

To cover = this use case I tried to extend the FileInputFormat class:

public class WholeFileInputFormat ex= tends FileInputFormat<String>
{
=C2=A0 private boolean hasReach= edEnd =3D false;

=C2=A0 public WholeFileInputFormat() {
=C2=A0=C2= =A0=C2=A0 unsplittable =3D true;
=C2=A0 }

=C2=A0 @Override
=C2= =A0 public void open(FileInputSplit fileSplit) throws IOException {
=C2= =A0=C2=A0=C2=A0 super.open(fileSplit);
=C2=A0=C2=A0=C2=A0 hasReachedEnd = =3D false;
=C2=A0 }

=C2=A0 @Override
=C2=A0 public String next= Record(String reuse) throws IOException {
=C2=A0=C2=A0=C2=A0 // uses apa= che.commons.io.IOUtils
=C2=A0=C2=A0=C2=A0 String fileContent =3D IOUtils= .toString(stream, StandardCharsets.UTF_8);
=C2=A0=C2=A0=C2=A0 hasReached= End =3D true;
=C2=A0=C2=A0=C2=A0 return fileContent;
=C2=A0 }

= =C2=A0 @Override
=C2=A0 public boolean reachedEnd() throws IOException {=
=C2=A0=C2=A0=C2=A0 return hasReachedEnd;
=C2=A0 }
}

This class returns the content of the whole file as a string.

Is th= is the right approach?
It seems to work when run locally with local file= s but I wonder if it would
run into problems when tested in a cluster.
Thanks in advance.
=C2=A0 Andrea.

--
Andrea Cisternino, Erlangen, Germ= any
GitHub: = http://github.com/acisternino
GitLab: https://gitlab.com/u/acisternino



--
=
Jamie Grier
data Artisans, Director of Applications Engi= neering





--
Andrea Cisternino, Erlangen, Germany
Linked= In: http://www.linkedin.com/in/andreacisternino
GitHub: http://github.com/acisternin= o



--
Andrea Cisternino, Erlangen, Germany
LinkedIn: http://w= ww.linkedin.com/in/andreacisternino
GitHub: http://github.com/acisternino

--089e01681caaa236240534eb6e6b--