Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5D9EA17D9B for ; Tue, 21 Oct 2014 17:04:36 +0000 (UTC) Received: (qmail 19997 invoked by uid 500); 21 Oct 2014 17:04:36 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 19946 invoked by uid 500); 21 Oct 2014 17:04:36 -0000 Mailing-List: contact dev-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list dev@flink.incubator.apache.org Received: (qmail 19928 invoked by uid 99); 21 Oct 2014 17:04:35 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Oct 2014 17:04:35 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of mneumann@spotify.com designates 209.85.212.179 as permitted sender) Received: from [209.85.212.179] (HELO mail-wi0-f179.google.com) (209.85.212.179) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Oct 2014 17:04:09 +0000 Received: by mail-wi0-f179.google.com with SMTP id d1so2421294wiv.6 for ; Tue, 21 Oct 2014 10:04:07 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=spotify.com; s=google; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=4uIlenr8aAWAcnQE+0H6JY44ThpDggzhslU3zWjqdw0=; b=awhU071GD3PQxxdEynI+NhMgdqXHQQxshuNjLQLy0uyA+3M5X7wXLMwVncNUpGzF6c bcgHRbw7kfE5wrK6wXd2tazW6cfpbPCLPLJZ/QrpOSSkApCx8UVocggi0I8l7paHnRN4 lY11KPI4LiFYri6d4Uz6uOzZPe7Ue3yMYxJ1A= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=4uIlenr8aAWAcnQE+0H6JY44ThpDggzhslU3zWjqdw0=; b=XR3vmOKVMaolr6VloVEPCmNclpShtUtlFOTu3ShEZH4PRZWzeyUSaeJCevd6uzjHd6 mj7Q3Rl4zOnv6m0p+lRtk3dcjOGp11U7XzXmkG7INijyh6/xdYrxbKtDIO80E3+eysb0 IzcbVga0hxUUeEi1lwwt2ILPfS62e/sFBBaj4DgSiasENW6dzckt+MUmKSXgzKNtJ11N pJ+4G/a+ncxtIZrxwE/D4W2rhmx7fBYPVkoPbl915u9w7RNHxPd3S0mQa5dOvi7lxliV Z8xg0i9nnRy61bVzXEPhCZfkFQe2qkFJpPDf8XD4ulJGSVcMKwyahahOdfxjnTbZmFFv r8GA== X-Gm-Message-State: ALoCoQnPkrTQrAQlf8qsEk8u2wqJMuAt7JB+sYOfiCLFvthy3TlE6kUYZjGAnnhZ9fqLhTNvBfYC MIME-Version: 1.0 X-Received: by 10.194.90.175 with SMTP id bx15mr43697568wjb.25.1413911045717; Tue, 21 Oct 2014 10:04:05 -0700 (PDT) Received: by 10.194.43.135 with HTTP; Tue, 21 Oct 2014 10:04:05 -0700 (PDT) In-Reply-To: References: <6C485BCF-26A6-4BA1-909A-C7213E24C162@apache.org> Date: Tue, 21 Oct 2014 19:04:05 +0200 Message-ID: Subject: Re: how load/group with large csv files From: Martin Neumann To: dev@flink.incubator.apache.org Content-Type: multipart/alternative; boundary=047d7bfd011ea02e6c0505f1d1e4 X-Virus-Checked: Checked by ClamAV on apache.org --047d7bfd011ea02e6c0505f1d1e4 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable There was not enough time to clean it up and gold plate it. He got semi horrible java code now with some explanation how it would look in scala. My colleague was asking for a quick (and dirty) job, so taking more time on it would have defied the purpose of the whole thing a bit. In any case thanks for the advice, hopefully I found us another Flink supporter. On Tue, Oct 21, 2014 at 3:52 PM, Stephan Ewen wrote: > Hej, > > Do you want to use Scala? You can use simple case classes there and use > fields directly as keys, it will look very elegant... > > If you want to stick with Java, you can actually use POJOs (Robert just > corrected me, expression keys should be available there) > > Can you define a class > > public class MyClass { > public String id; > public int someValue; > public double anotherValue; > ... > } > > and then run a program like this: > > DataSet data =3D env.readAsText(...).map(new Parser()); > > data.groupBy("id").sort("someValue").reduceGroup(new > GroupReduceFunction(...)); > > > Feel free to post your program here so we can give you comments! > > Greetings, > Stephan > > > > On Tue, Oct 21, 2014 at 3:32 PM, Martin Neumann > wrote: > > > Nope, > > > > but I cant filter out the useless data since the program I'm comparing = to > > does not either. The point is to prove to one of my Colleague that Flin= k > > > > Spark. > > The Spark program runs out of memory and crashes when just doing a simp= le > > group and counting the number of items. > > > > This is also one of the reasons I ask for what is the best style of doi= ng > > this so I can get it as clean as possible to compare it to Spark. > > > > cheers Martin > > > > > > On Tue, Oct 21, 2014 at 3:07 PM, Aljoscha Krettek > > wrote: > > > > > By the way, do you actually need all those 54 columns in your job? > > > > > > On Tue, Oct 21, 2014 at 3:02 PM, Martin Neumann > > > wrote: > > > > I will go with that workaround, however I would have preferred if I > > could > > > > have done that directly with the API instead of doing Map/Reduce li= ke > > > > Key/Value tuples again :-) > > > > > > > > By the way is there a simple function to count the number of items > in a > > > > reduce group? It feels stupid to write a GroupReduce that just > iterates > > > and > > > > increments a counter. > > > > > > > > cheers Martin > > > > > > > > On Tue, Oct 21, 2014 at 2:54 PM, Robert Metzger > > > > wrote: > > > > > > > >> Yes, for sorted groups, you need to use Pojos or Tuples. > > > >> I think you have to split the input lines manually, with a mapper. > > > >> How about using a TupleN<...> with only the fields you need? > (returned > > > by > > > >> the mapper) > > > >> > > > >> if you need all fields, you could also use a Tuple2 String[]> > > > where > > > >> the first position is the sort key? > > > >> > > > >> > > > >> > > > >> On Tue, Oct 21, 2014 at 2:20 PM, Gyula Fora > > wrote: > > > >> > > > >> > I am not sure how you should go about that, let=E2=80=99s wait f= or some > > > feedback > > > >> > from the others. > > > >> > > > > >> > Until then you can always map the array to (array, keyfield) and > use > > > >> > groupBy(1). > > > >> > > > > >> > > > > >> > > On 21 Oct 2014, at 14:17, Martin Neumann > > > wrote: > > > >> > > > > > >> > > Hej, > > > >> > > > > > >> > > Unfortunately .sort() cannot take a key extractor, would I hav= e > to > > > do > > > >> the > > > >> > > sort myself then? > > > >> > > > > > >> > > cheers Martin > > > >> > > > > > >> > > On Tue, Oct 21, 2014 at 2:08 PM, Gyula Fora > > > wrote: > > > >> > > > > > >> > >> Hey, > > > >> > >> > > > >> > >> Using arrays is probably a convenient way to do so. > > > >> > >> > > > >> > >> I think the way you described the groupBy only works for tupl= es > > > now. > > > >> To > > > >> > do > > > >> > >> the grouping on the array field, you would need to create a k= ey > > > >> > extractor > > > >> > >> for this and pass that to groupBy. > > > >> > >> > > > >> > >> Actually we have some use-cases like this for streaming so we > are > > > >> > thinking > > > >> > >> of writing a wrapper for the array types that would behave as > you > > > >> > described. > > > >> > >> > > > >> > >> Regards, > > > >> > >> Gyula > > > >> > >> > > > >> > >>> On 21 Oct 2014, at 14:03, Martin Neumann < > mneumann@spotify.com> > > > >> wrote: > > > >> > >>> > > > >> > >>> Hej, > > > >> > >>> > > > >> > >>> I have a csv file with 54 columns each of them is string (fo= r > > > now). I > > > >> > >> need > > > >> > >>> to group and sort them on field 15. > > > >> > >>> > > > >> > >>> Whats the best way to load the data into Flink? > > > >> > >>> There is no Tuple54 (and the <> would look awful anyway with > 54 > > > times > > > >> > >>> String in it). > > > >> > >>> My current Idea is to write a Mapper and split the string to > > > Arrays > > > >> of > > > >> > >>> Strings would grouping and sorting work on this? > > > >> > >>> > > > >> > >>> So can I do something like this or does that only work on > > tuples: > > > >> > >>> Dataset ds; > > > >> > >>> ds.groupBy(15).sort(20. ANY) > > > >> > >>> > > > >> > >>> cheers Martin > > > >> > >> > > > >> > >> > > > >> > > > > >> > > > > >> > > > > > > --047d7bfd011ea02e6c0505f1d1e4--