crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Vyas <jayunit...@gmail.com>
Subject Re: crunch : correct way to think about tuple abstractions for aggregations?
Date Sun, 05 Jan 2014 00:49:55 GMT
Thanks josh ..That was very helpful!! ..I like the avro mapper intermediate solution I'll try
it out.

...Also : would be interested in contributing a new "section" of the bigpetstore workflow
, a module which really showed where crunch's differentiating factors were valuable?

The idea is that bigpetstore should show the differences between different ecosystem components
so that people can pick for themselves which tool is best for which job, and so I think it
would be cool to have a phase in the bigpetstore workflow which used some nested, strongly
typed data and processed it with crunch versus pig, to demonstrate (in code) the comments
you've made.

Right now I only have pig and hive but want to add in cascading and (obviously) crunch as
well.

> On Jan 4, 2014, at 4:57 PM, Josh Wills <jwills@cloudera.com> wrote:
> 
> Hey Jay,
> 
> Crunch isn't big into tuples; it's mostly used to process some sort of structured, complex
record data like Avro, protocol buffers, or Thrift. I certainly don't speak for everyone in
the community, but I think that using one of these rich, evolvable formats is the best way
to work with data on Hadoop. For the problem you gave, where the data is in CSV text, there
are a couple of options.
> 
> One option would be to use the TupleN type to represent a record and the Extractor API
in crunch-contrib to parse the lines of strings into typed tokens, so you would do something
like this to your PCollection<String>:
> 
> PCollection<String> rawData = ...;
> TokenizerFactory tokenize = TokenizerFactory.builder().delim(",").build();
> PCollection<TupleN> tuples = Parse.parse("bigpetshop", // a name to use for the
counters used in parsing
>     rawData,
>     xtupleN(tokenize,
>       xstring(),   // big pet store
>       xstring(),   // store code
>       xint(),        // line item
>       xstring(),  // first name
>       xstring(),  // last name
>       xstring(),  // timestamp
>       xdouble(),  // price
>       xstring()));   // item description
> 
> You could also create a POJO to represent a LineItem (which is what I assume this is)
and then use Avro reflection-based serialization to serialize it with Crunch:
> 
> public static class LineItem {
>   String appName;
>   String storeCode;
>   int lineId;
>   String firstName;
>   String lastName;
>   String timestamp;
>   double price;
>   String description;
> 
>   public LineItem() {
>      // Avro reflection needs a zero-arg constructor
>   }
> 
>   // other constructors, parsers, etc.
> }
> 
> and then you would have something like this:
> 
> PCollection<LineItem> lineItems = rawData.parallelDo(new MapFn<String, LineItem>()
{
>   @Override
>   public LineItem map(String input) {
>     // parse line to LineItem object
>   }
> }, Avros.reflects(LineItem.class));
> 
> I'm not quite sure what you're doing in the grouping clause you have here:
> 
> groupBy(0).count();
> 
> ...I assume you want to count the distinct values of the first field in your tuple, which
you would do like this for line items:
> 
> PTable<String, Long> counts = lineItems.parallelDo(new MapFn<LineItem, String>()
{
>   public String map(LineItem lineItem) { return lineItem.appName; }
> }, Avros.strings()).count();
> 
> and similarly for TupleN, although you would call get(0) on TupleN and have to cast the
returned Object to a String b/c TupleN methods don't have type information.
> 
> I hope that helps. In general, I don't really recommend Crunch for this sort of data
processing; Hive, Pig, and Cascading are fine alternatives. But I think Crunch is superior
to any of them if you were trying to, say, create an Order record that aggregated the result
of multiple LineItems:
> 
> Order {
>   List<LineItem> lineItems;
>   // global order attributes
> }
> 
> or a customer type that aggregated multiple Orders for a single customer:
> 
> Customer {
>   List<Order> orders;
>   // other customer fields
> }
> 
> ...especially if this was the sort of processing task you had to do regularly because
lots of other downstream processing tasks required these standard aggregations to exist so
that they could do their own calculations. I would also recommend Crunch if you were building
BigPetStore on top of HBase using custom schemas that you needed to periodically MapReduce
over in order to calculate statistics, cleanup stale data, or fix any consistency issues.
> 
> Best,
> Josh
> 
> 
> 
>> On Sat, Jan 4, 2014 at 12:34 PM, Jay Vyas <jayunit100@gmail.com> wrote:
>> Hi crunch ! 
>> 
>> I want to process a list in crunch:
>> 
>> Something like this: 
>> 
>>         PCollection<String> lines = MemPipeline.collectionOf(
>>                 "BigPetStore,storeCode_AK,1  lindsay,franco,Sat Jan 10 00:11:10 EST
1970,10.5,dog-food",
>>                 "BigPetStore,storeCode_AZ,1  tom,giles,Sun Dec 28 23:08:45 EST 1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_CA,1  brandon,ewing,Mon Dec 08 20:23:57 EST
1969,16.5,organic-dog-food",
>>                 "BigPetStore,storeCode_CA,2  angie,coleman,Thu Dec 11 07:00:31 EST
1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_CA,3  angie,coleman,Tue Jan 20 06:24:23 EST
1970,7.5,cat-food",
>>                 "BigPetStore,storeCode_CO,1  sharon,trevino,Mon Jan 12 07:52:10 EST
1970,30.1,antelope snacks",
>>                 "BigPetStore,storeCode_CT,1  kevin,fitzpatrick,Wed Dec 10 05:24:13
EST 1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_NY,1  dale,holden,Mon Jan 12 23:02:13 EST
1970,19.75,fish-food",
>>                 "BigPetStore,storeCode_NY,2  dale,holden,Tue Dec 30 12:29:52 EST
1969,10.5,dog-food",
>>                 "BigPetStore,storeCode_OK,1  donnie,tucker,Sun Jan 18 04:50:26 EST
1970,7.5,cat-food");
>>         
>>         PCollection coll = lines.parallelDo(
>>               "split lines into words", 
>>               new DoFn<String, String>() {
>>                   @Override
>>                   public void process(String line, Emitter emitter) {
>>                     //not sure this regex will work but you get the idea.. split
by tabs and commas  
>>                     emitter.emit(Arrays.asList(line.split("\t,")));
>>                   }
>>               }, 
>>               Writables.lists()
>>         ).groupBy(0).count();
>>         
>>         }
>> 
>> What is the correct abstraction in crunch to convert raw text into tuples, 
>> and access them by an index - which you then use to group and count on? 
>> 
>> thanks !
>> 
>> ** FYI ** this is for the bigpetstore project, id like to show crunch examples in
it if i can get them working,  as the API is a nice example of a lowerlevel mapreduce paradigm
which is more java freindly. 
>> 
>> See https://issues.apache.org/jira/browse/BIGTOP-1089 and https://github.com/jayunit100/bigpetstore
for details..
> 
> 
> 
> -- 
> Director of Data Science
> Cloudera
> Twitter: @josh_wills

Mime
View raw message