flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: how load/group with large csv files
Date Tue, 21 Oct 2014 20:37:24 GMT
Motivated both by Martin and our recent use-case, I updated
<https://github.com/mbalassi/incubator-flink/commit/a6cd742958dace6ead896af189983933470d8eb1>
the groupBys and aggregations for the Streaming API to work also on arrays
by default.

I think it would probably make sense to do something similar for the batch
too:

DataStream<double[500]>  ds = ... ;
ds.groupBy(100,200).sum(10);

(Here we group by the 100 and 200th field and sum the 10th)

What do you think?

Gyula

On Tue, Oct 21, 2014 at 7:04 PM, Martin Neumann <mneumann@spotify.com>
wrote:

> There was not enough time to clean it up and gold plate it. He got semi
> horrible java code now with some explanation how it would look in scala.
> My colleague was asking for a quick (and dirty) job, so taking more time on
> it would have defied the purpose of the whole thing a bit.
>
> In any case thanks for the advice, hopefully I found us another Flink
> supporter.
>
> On Tue, Oct 21, 2014 at 3:52 PM, Stephan Ewen <sewen@apache.org> wrote:
>
> > Hej,
> >
> > Do you want to use Scala? You can use simple case classes there and use
> > fields directly as keys, it will look very elegant...
> >
> > If you want to stick with Java, you can actually use POJOs (Robert just
> > corrected me, expression keys should be available there)
> >
> > Can you define a class
> >
> > public class MyClass {
> >     public String id;
> >     public int someValue;
> >     public double anotherValue;
> >     ...
> > }
> >
> > and then run a program like this:
> >
> > DataSet<MyClass> data = env.readAsText(...).map(new Parser());
> >
> > data.groupBy("id").sort("someValue").reduceGroup(new
> > GroupReduceFunction(...));
> >
> >
> > Feel free to post your program here so we can give you comments!
> >
> > Greetings,
> > Stephan
> >
> >
> >
> > On Tue, Oct 21, 2014 at 3:32 PM, Martin Neumann <mneumann@spotify.com>
> > wrote:
> >
> > > Nope,
> > >
> > > but I cant filter out the useless data since the program I'm comparing
> to
> > > does not either. The point is to prove to one of my Colleague that
> Flink
> > >
> > > Spark.
> > > The Spark program runs out of memory and crashes when just doing a
> simple
> > > group and counting the number of items.
> > >
> > > This is also one of the reasons I ask for what is the best style of
> doing
> > > this so I can get it as clean as possible to compare it to Spark.
> > >
> > > cheers Martin
> > >
> > >
> > > On Tue, Oct 21, 2014 at 3:07 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > By the way, do you actually need all those 54 columns in your job?
> > > >
> > > > On Tue, Oct 21, 2014 at 3:02 PM, Martin Neumann <
> mneumann@spotify.com>
> > > > wrote:
> > > > > I will go with that workaround, however I would have preferred if
I
> > > could
> > > > > have done that directly with the API instead of doing Map/Reduce
> like
> > > > > Key/Value tuples again :-)
> > > > >
> > > > > By the way is there a simple function to count the number of items
> > in a
> > > > > reduce group? It feels stupid to write a GroupReduce that just
> > iterates
> > > > and
> > > > > increments a counter.
> > > > >
> > > > > cheers Martin
> > > > >
> > > > > On Tue, Oct 21, 2014 at 2:54 PM, Robert Metzger <
> rmetzger@apache.org
> > >
> > > > wrote:
> > > > >
> > > > >> Yes, for sorted groups, you need to use Pojos or Tuples.
> > > > >> I think you have to split the input lines manually, with a mapper.
> > > > >> How about using a TupleN<...> with only the fields you
need?
> > (returned
> > > > by
> > > > >> the mapper)
> > > > >>
> > > > >> if you need all fields, you could also use a Tuple2<String,
> > String[]>
> > > > where
> > > > >> the first position is the sort key?
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora <gyfora@apache.org>
> > > wrote:
> > > > >>
> > > > >> > I am not sure how you should go about that, let’s wait
for some
> > > > feedback
> > > > >> > from the others.
> > > > >> >
> > > > >> > Until then you can always map the array to (array, keyfield)
and
> > use
> > > > >> > groupBy(1).
> > > > >> >
> > > > >> >
> > > > >> > > On 21 Oct 2014, at 14:17, Martin Neumann <
> mneumann@spotify.com>
> > > > wrote:
> > > > >> > >
> > > > >> > > Hej,
> > > > >> > >
> > > > >> > > Unfortunately .sort() cannot take a key extractor,
would I
> have
> > to
> > > > do
> > > > >> the
> > > > >> > > sort myself then?
> > > > >> > >
> > > > >> > > cheers Martin
> > > > >> > >
> > > > >> > > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora <
> gyfora@apache.org>
> > > > wrote:
> > > > >> > >
> > > > >> > >> Hey,
> > > > >> > >>
> > > > >> > >> Using arrays is probably a convenient way to do
so.
> > > > >> > >>
> > > > >> > >> I think the way you described the groupBy only
works for
> tuples
> > > > now.
> > > > >> To
> > > > >> > do
> > > > >> > >> the grouping on the array field, you would need
to create a
> key
> > > > >> > extractor
> > > > >> > >> for this and pass that to groupBy.
> > > > >> > >>
> > > > >> > >> Actually we have some use-cases like this for streaming
so we
> > are
> > > > >> > thinking
> > > > >> > >> of writing a wrapper for the array types that would
behave as
> > you
> > > > >> > described.
> > > > >> > >>
> > > > >> > >> Regards,
> > > > >> > >> Gyula
> > > > >> > >>
> > > > >> > >>> On 21 Oct 2014, at 14:03, Martin Neumann <
> > mneumann@spotify.com>
> > > > >> wrote:
> > > > >> > >>>
> > > > >> > >>> Hej,
> > > > >> > >>>
> > > > >> > >>> I have a csv file with 54 columns each of them
is string
> (for
> > > > now). I
> > > > >> > >> need
> > > > >> > >>> to group and sort them on field 15.
> > > > >> > >>>
> > > > >> > >>> Whats the best way to load the data into Flink?
> > > > >> > >>> There is no Tuple54 (and the <> would
look awful anyway with
> > 54
> > > > times
> > > > >> > >>> String in it).
> > > > >> > >>> My current Idea is to write a Mapper and split
the string to
> > > > Arrays
> > > > >> of
> > > > >> > >>> Strings would grouping and sorting work on
this?
> > > > >> > >>>
> > > > >> > >>> So can I do something like this or does that
only work on
> > > tuples:
> > > > >> > >>> Dataset<String[]> ds;
> > > > >> > >>> ds.groupBy(15).sort(20. ANY)
> > > > >> > >>>
> > > > >> > >>> cheers Martin
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message