Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 09CD9106EF for ; Sun, 5 Jan 2014 03:43:48 +0000 (UTC) Received: (qmail 79947 invoked by uid 500); 5 Jan 2014 03:43:42 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 79865 invoked by uid 500); 5 Jan 2014 03:43:38 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 79857 invoked by uid 99); 5 Jan 2014 03:43:36 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Jan 2014 03:43:36 +0000 X-ASF-Spam-Status: No, hits=1.7 required=5.0 tests=FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of jayunit100@gmail.com designates 209.85.215.49 as permitted sender) Received: from [209.85.215.49] (HELO mail-la0-f49.google.com) (209.85.215.49) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Jan 2014 03:43:30 +0000 Received: by mail-la0-f49.google.com with SMTP id er20so8976699lab.36 for ; Sat, 04 Jan 2014 19:43:09 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=Vz2EmUDfbCqNG/8TwFvjtYKQJ5FuuaNDWpUaEskq00w=; b=hMk+5j/B2OH1SkJBUXTTvtoaV1CSwwUafiPSjnhGWQXgcuxRkBbGXnHpqR5FCFyv6M m1CLl4rHHSrsBJyLMa93TXrTtbby6lvzQA3NvfJOvUGG9oJpPwbRL73h8YpC6DbtWu1i N1kxAxNSZaPZhuSJ0JL03O6S9EEruzADxm/8/PRxpBZor0BNa5/pyKXotKUvBPhElBwm 9fDzF05qrh+fIBAAT/p1a9qU/tj4/2dpLbVpE3OD+YXerVOQ8nh/cWvp8iEwpaf72Hn9 YDHEQVRBQrInz+mftNwi0m2wxE3z1ru41PJma08fGUURT9ElnbuhEHAEZYk+e115lUOO 0PhA== MIME-Version: 1.0 X-Received: by 10.152.7.67 with SMTP id h3mr2234137laa.29.1388893389404; Sat, 04 Jan 2014 19:43:09 -0800 (PST) Received: by 10.112.134.7 with HTTP; Sat, 4 Jan 2014 19:43:09 -0800 (PST) In-Reply-To: <2BBFFDD4-7C7B-417A-B3BB-08E3FF2F48AC@gmail.com> References: <2BBFFDD4-7C7B-417A-B3BB-08E3FF2F48AC@gmail.com> Date: Sat, 4 Jan 2014 22:43:09 -0500 Message-ID: Subject: Re: crunch : correct way to think about tuple abstractions for aggregations? From: Jay Vyas To: "user@crunch.apache.org" Content-Type: multipart/alternative; boundary=001a11c21fba1b3d9b04ef30f1a7 X-Virus-Checked: Checked by ClamAV on apache.org --001a11c21fba1b3d9b04ef30f1a7 Content-Type: text/plain; charset=ISO-8859-1 BTW Thanks josh ! That worked! Here is an example of how easy it is to do aggregations in crunch :) ~~~~~~ https://github.com/jayunit100/bigpetstore/commit/03a59fc88680d8926aba4c8d00760436c8cafb69 PS Are you sure PIG/HIVE is really better for this kind of stuff? I really like the IDE friendly, statically validated, strongly typed, functional API ALOT more than the russian roulette that I always seem to play with my pig/hive code :) On Sat, Jan 4, 2014 at 7:49 PM, Jay Vyas wrote: > Thanks josh ..That was very helpful!! ..I like the avro mapper > intermediate solution I'll try it out. > > ...Also : would be interested in contributing a new "section" of the > bigpetstore workflow , a module which really showed where crunch's > differentiating factors were valuable? > > The idea is that bigpetstore should show the differences between different > ecosystem components so that people can pick for themselves which tool is > best for which job, and so I think it would be cool to have a phase in the > bigpetstore workflow which used some nested, strongly typed data and > processed it with crunch versus pig, to demonstrate (in code) the comments > you've made. > > Right now I only have pig and hive but want to add in cascading and > (obviously) crunch as well. > > On Jan 4, 2014, at 4:57 PM, Josh Wills wrote: > > Hey Jay, > > Crunch isn't big into tuples; it's mostly used to process some sort of > structured, complex record data like Avro, protocol buffers, or Thrift. I > certainly don't speak for everyone in the community, but I think that using > one of these rich, evolvable formats is the best way to work with data on > Hadoop. For the problem you gave, where the data is in CSV text, there are > a couple of options. > > One option would be to use the TupleN type to represent a record and the > Extractor API in crunch-contrib to parse the lines of strings into typed > tokens, so you would do something like this to your PCollection: > > PCollection rawData = ...; > TokenizerFactory tokenize = TokenizerFactory.builder().delim(",").build(); > PCollection tuples = Parse.parse("bigpetshop", // a name to use > for the counters used in parsing > rawData, > xtupleN(tokenize, > xstring(), // big pet store > xstring(), // store code > xint(), // line item > xstring(), // first name > xstring(), // last name > xstring(), // timestamp > xdouble(), // price > xstring())); // item description > > You could also create a POJO to represent a LineItem (which is what I > assume this is) and then use Avro reflection-based serialization to > serialize it with Crunch: > > public static class LineItem { > String appName; > String storeCode; > int lineId; > String firstName; > String lastName; > String timestamp; > double price; > String description; > > public LineItem() { > // Avro reflection needs a zero-arg constructor > } > > // other constructors, parsers, etc. > } > > and then you would have something like this: > > PCollection lineItems = rawData.parallelDo(new MapFn LineItem>() { > @Override > public LineItem map(String input) { > // parse line to LineItem object > } > }, Avros.reflects(LineItem.class)); > > I'm not quite sure what you're doing in the grouping clause you have here: > > groupBy(0).count(); > > ...I assume you want to count the distinct values of the first field in > your tuple, which you would do like this for line items: > > PTable counts = lineItems.parallelDo(new MapFn String>() { > public String map(LineItem lineItem) { return lineItem.appName; } > }, Avros.strings()).count(); > > and similarly for TupleN, although you would call get(0) on TupleN and > have to cast the returned Object to a String b/c TupleN methods don't have > type information. > > I hope that helps. In general, I don't really recommend Crunch for this > sort of data processing; Hive, Pig, and Cascading are fine alternatives. > But I think Crunch is superior to any of them if you were trying to, say, > create an Order record that aggregated the result of multiple LineItems: > > Order { > List lineItems; > // global order attributes > } > > or a customer type that aggregated multiple Orders for a single customer: > > Customer { > List orders; > // other customer fields > } > > ...especially if this was the sort of processing task you had to do > regularly because lots of other downstream processing tasks required these > standard aggregations to exist so that they could do their own > calculations. I would also recommend Crunch if you were building > BigPetStore on top of HBase using custom schemas that you needed to > periodically MapReduce over in order to calculate statistics, cleanup stale > data, or fix any consistency issues. > > Best, > Josh > > > > On Sat, Jan 4, 2014 at 12:34 PM, Jay Vyas wrote: > >> Hi crunch ! >> >> I want to process a list in crunch: >> >> Something like this: >> >> PCollection lines = MemPipeline.collectionOf( >> "BigPetStore,storeCode_AK,1 lindsay,franco,Sat Jan 10 >> 00:11:10 EST 1970,10.5,dog-food", >> "BigPetStore,storeCode_AZ,1 tom,giles,Sun Dec 28 >> 23:08:45 EST 1969,10.5,dog-food", >> "BigPetStore,storeCode_CA,1 brandon,ewing,Mon Dec 08 >> 20:23:57 EST 1969,16.5,organic-dog-food", >> "BigPetStore,storeCode_CA,2 angie,coleman,Thu Dec 11 >> 07:00:31 EST 1969,10.5,dog-food", >> "BigPetStore,storeCode_CA,3 angie,coleman,Tue Jan 20 >> 06:24:23 EST 1970,7.5,cat-food", >> "BigPetStore,storeCode_CO,1 sharon,trevino,Mon Jan 12 >> 07:52:10 EST 1970,30.1,antelope snacks", >> "BigPetStore,storeCode_CT,1 kevin,fitzpatrick,Wed Dec 10 >> 05:24:13 EST 1969,10.5,dog-food", >> "BigPetStore,storeCode_NY,1 dale,holden,Mon Jan 12 >> 23:02:13 EST 1970,19.75,fish-food", >> "BigPetStore,storeCode_NY,2 dale,holden,Tue Dec 30 >> 12:29:52 EST 1969,10.5,dog-food", >> "BigPetStore,storeCode_OK,1 donnie,tucker,Sun Jan 18 >> 04:50:26 EST 1970,7.5,cat-food"); >> >> PCollection coll = lines.parallelDo( >> "split lines into words", >> new DoFn() { >> @Override >> public void process(String line, Emitter emitter) { >> //not sure this regex will work but you get the >> idea.. split by tabs and commas >> emitter.emit(Arrays.asList(line.split("\t,"))); >> } >> }, >> Writables.lists() >> ).groupBy(0).count(); >> >> } >> >> What is the correct abstraction in crunch to convert raw text into >> tuples, >> and access them by an index - which you then use to group and count on? >> >> thanks ! >> >> ** FYI ** this is for the bigpetstore project, id like to show crunch >> examples in it if i can get them working, as the API is a nice example of >> a lowerlevel mapreduce paradigm which is more java freindly. >> >> See https://issues.apache.org/jira/browse/BIGTOP-1089 and >> https://github.com/jayunit100/bigpetstore for details.. >> >> >> > > > -- > Director of Data Science > Cloudera > Twitter: @josh_wills > > -- Jay Vyas http://jayunit100.blogspot.com --001a11c21fba1b3d9b04ef30f1a7 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
BTW Thanks josh ! That worked!

Her= e is an example of how easy it is to do aggregations in crunch :) ~~~~~~
PS Are you sure PIG/HIVE is really better for this kind of s= tuff?=A0 I really like the IDE friendly, statically validated, strongly typ= ed, functional API=A0 ALOT more than the russian roulette that I always see= m to play with my pig/hive code :)




On Sat, Jan 4, 2014 at 7:49 PM, Jay Vyas <jayunit100@gmail.com> wrote:
Thanks josh ..That wa= s very helpful!! ..I like the avro mapper intermediate solution I'll tr= y it out.

...Also : would be interested in contributing a new &qu= ot;section" of the bigpetstore workflow , a module which really showed= where crunch's differentiating factors were valuable?

The idea is that bigpetstore should show the differences between= different ecosystem components so that people can pick for themselves whic= h tool is best for which job, and so I think it would be cool to have a pha= se in the bigpetstore workflow which used some nested, strongly typed data = and processed it with crunch versus pig, to demonstrate (in code) the comme= nts you've made.

Right now I only have pig and hive but want to add in cascad= ing and (obviously) crunch as well.
Hey Jay,

=
Crunch isn't big into tuples; it's mostly used to proces= s some sort of structured, complex record data like Avro, protocol buffers,= or Thrift. I certainly don't speak for everyone in the community, but = I think that using one of these rich, evolvable formats is the best way to = work with data on Hadoop. For the problem you gave, where the data is in CS= V text, there are a couple of options.

One option would be to use the TupleN type to represent= a record and the Extractor API in crunch-contrib to parse the lines of str= ings into typed tokens, so you would do something like this to your PCollec= tion<String>:

PCollection<String> rawData =3D ...;
TokenizerFactory tokenize =3D TokenizerFactory.builder().delim(",&qu= ot;).build();
PCollection<TupleN> tuples =3D Parse.parse(&q= uot;bigpetshop", // a name to use for the counters used in parsing
=A0 =A0 rawData,
=A0 =A0 xtupleN(tokenize,
=A0 =A0= =A0 xstring(), =A0 // big pet store
=A0 =A0 =A0 xstring(), =A0 /= / store code
=A0 =A0 =A0 xint(), =A0 =A0 =A0 =A0// line item
=A0 =A0 =A0 xstring(), =A0// first name
=A0 =A0 =A0 xstring(), =A0// last name
=A0 =A0 =A0 xstring()= , =A0// timestamp
=A0 =A0 =A0 xdouble(), =A0// price
= =A0 =A0 =A0 xstring())); =A0 // item description

Y= ou could also create a POJO to represent a LineItem (which is what I assume= this is) and then use Avro reflection-based serialization to serialize it = with Crunch:

public static class LineItem {
=A0 String app= Name;
=A0 String storeCode;
=A0 int lineId;
= =A0 String firstName;
=A0 String lastName;
=A0 String t= imestamp;
=A0 double price;
=A0 String description;

=A0 public LineItem() {
=A0 =A0 =A0// Avro reflection need= s a zero-arg constructor
=A0 }

=A0 // ot= her constructors, parsers, etc.
}

and then you would have something like this= :

PCollection<LineItem> lineItems =3D rawDat= a.parallelDo(new MapFn<String, LineItem>() {
=A0 @Override<= /div>
=A0 public LineItem map(String input) {
=A0 =A0 // parse lin= e to LineItem object
=A0 }
}, Avros.reflects(LineItem.c= lass));

I'm not quite sure what you're doi= ng in the grouping clause you have here:

groupBy(0).count();

...I assum= e you want to count the distinct values of the first field in your tuple, w= hich you would do like this for line items:

PTable= <String, Long> counts =3D lineItems.parallelDo(new MapFn<LineItem,= String>() {
=A0 public String map(LineItem lineItem) { return lineItem.appName; }<= /div>
}, Avros.strings()).count();

and similar= ly for TupleN, although you would call get(0) on TupleN and have to cast th= e returned Object to a String b/c TupleN methods don't have type inform= ation.

I hope that helps. In general, I don't really recom= mend Crunch for this sort of data processing; Hive, Pig, and Cascading are = fine alternatives. But I think Crunch is superior to any of them if you wer= e trying to, say, create an Order record that aggregated the result of mult= iple LineItems:

Order {
=A0 List<LineItem> lineItems;
=A0 // global order attributes
}

or a customer type that aggregated multiple Orders for a single customer:=

Customer {
=A0 List<Order> orders;
=A0 // other customer fields
}

...= especially if this was the sort of processing task you had to do regularly = because lots of other downstream processing tasks required these standard a= ggregations to exist so that they could do their own calculations. I would = also recommend Crunch if you were building BigPetStore on top of HBase usin= g custom schemas that you needed to periodically MapReduce over in order to= calculate statistics, cleanup stale data, or fix any consistency issues.

Best,
Josh



On Sat, Jan 4, 2014 at 12:34= PM, Jay Vyas <jayunit100@gmail.com> wrote:
Hi crunch !

I = want to process a list in crunch:

Something like this:

=A0= =A0=A0=A0=A0=A0=A0 PCollection<String> lines =3D MemPipeline.collecti= onOf(
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "BigPetStore,storeCode_A= K,1=A0 lindsay,franco,Sat Jan 10 00:11:10 EST 1970,10.5,dog-food",
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "BigPetStore,storeCode_A= Z,1=A0 tom,giles,Sun Dec 28 23:08:45 EST 1969,10.5,dog-food",
=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "BigPetStore,storeCode_CA,1= =A0 brandon,ewing,Mon Dec 08 20:23:57 EST 1969,16.5,organic-dog-food",=
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "BigPetStore,storeCode_C= A,2=A0 angie,coleman,Thu Dec 11 07:00:31 EST 1969,10.5,dog-food",
= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "BigPetStore,storeCode_C= A,3=A0 angie,coleman,Tue Jan 20 06:24:23 EST 1970,7.5,cat-food",
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "BigPetStore,storeCode_C= O,1=A0 sharon,trevino,Mon Jan 12 07:52:10 EST 1970,30.1,antelope snacks&quo= t;,
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "BigPetStore,stor= eCode_CT,1=A0 kevin,fitzpatrick,Wed Dec 10 05:24:13 EST 1969,10.5,dog-food&= quot;,
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "BigPetStore,storeCode_N= Y,1=A0 dale,holden,Mon Jan 12 23:02:13 EST 1970,19.75,fish-food",
= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "BigPetStore,storeCode_N= Y,2=A0 dale,holden,Tue Dec 30 12:29:52 EST 1969,10.5,dog-food",
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "BigPetStore,storeCode_O= K,1=A0 donnie,tucker,Sun Jan 18 04:50:26 EST 1970,7.5,cat-food");
= =A0=A0=A0=A0=A0=A0=A0
=A0=A0=A0=A0=A0=A0=A0 PCollection coll =3D lines.= parallelDo(
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "split lines in= to words",
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 new DoFn<String, String>() {<= br>=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 @Override
=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 public void process(String li= ne, Emitter emitter) {
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 //not sure this regex will work but you get the idea.. spli= t by tabs and commas=A0
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 emitter.emit= (Arrays.asList(line.split("\t,")));
=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 }
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 },=
=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Writables.lists()
=A0=A0=A0= =A0=A0=A0=A0 ).groupBy(0).count();
=A0=A0=A0=A0=A0=A0=A0
=A0=A0=A0= =A0=A0=A0=A0 }

What is the correct abstraction in crunch to convert raw tex= t into tuples,
and access them by an index - which you then use to grou= p and count on?

thanks !

** FYI ** this is for th= e bigpetstore project, id like to show crunch examples in it if i can get t= hem working,=A0 as the API is a nice example of a lowerlevel mapreduce para= digm which is more java freindly.






--
Directo= r of Data Science
Twitter: @josh_wills



--
Jay Vyas
http://jayunit100.blogspot.com
--001a11c21fba1b3d9b04ef30f1a7--