crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <>
Subject Re: crunch : correct way to think about tuple abstractions for aggregations?
Date Sat, 04 Jan 2014 21:57:41 GMT
Hey Jay,

Crunch isn't big into tuples; it's mostly used to process some sort of
structured, complex record data like Avro, protocol buffers, or Thrift. I
certainly don't speak for everyone in the community, but I think that using
one of these rich, evolvable formats is the best way to work with data on
Hadoop. For the problem you gave, where the data is in CSV text, there are
a couple of options.

One option would be to use the TupleN type to represent a record and the
Extractor API in crunch-contrib to parse the lines of strings into typed
tokens, so you would do something like this to your PCollection<String>:

PCollection<String> rawData = ...;
TokenizerFactory tokenize = TokenizerFactory.builder().delim(",").build();
PCollection<TupleN> tuples = Parse.parse("bigpetshop", // a name to use for
the counters used in parsing
      xstring(),   // big pet store
      xstring(),   // store code
      xint(),        // line item
      xstring(),  // first name
      xstring(),  // last name
      xstring(),  // timestamp
      xdouble(),  // price
      xstring()));   // item description

You could also create a POJO to represent a LineItem (which is what I
assume this is) and then use Avro reflection-based serialization to
serialize it with Crunch:

public static class LineItem {
  String appName;
  String storeCode;
  int lineId;
  String firstName;
  String lastName;
  String timestamp;
  double price;
  String description;

  public LineItem() {
     // Avro reflection needs a zero-arg constructor

  // other constructors, parsers, etc.

and then you would have something like this:

PCollection<LineItem> lineItems = rawData.parallelDo(new MapFn<String,
LineItem>() {
  public LineItem map(String input) {
    // parse line to LineItem object
}, Avros.reflects(LineItem.class));

I'm not quite sure what you're doing in the grouping clause you have here:


...I assume you want to count the distinct values of the first field in
your tuple, which you would do like this for line items:

PTable<String, Long> counts = lineItems.parallelDo(new MapFn<LineItem,
String>() {
  public String map(LineItem lineItem) { return lineItem.appName; }
}, Avros.strings()).count();

and similarly for TupleN, although you would call get(0) on TupleN and have
to cast the returned Object to a String b/c TupleN methods don't have type

I hope that helps. In general, I don't really recommend Crunch for this
sort of data processing; Hive, Pig, and Cascading are fine alternatives.
But I think Crunch is superior to any of them if you were trying to, say,
create an Order record that aggregated the result of multiple LineItems:

Order {
  List<LineItem> lineItems;
  // global order attributes

or a customer type that aggregated multiple Orders for a single customer:

Customer {
  List<Order> orders;
  // other customer fields

...especially if this was the sort of processing task you had to do
regularly because lots of other downstream processing tasks required these
standard aggregations to exist so that they could do their own
calculations. I would also recommend Crunch if you were building
BigPetStore on top of HBase using custom schemas that you needed to
periodically MapReduce over in order to calculate statistics, cleanup stale
data, or fix any consistency issues.


On Sat, Jan 4, 2014 at 12:34 PM, Jay Vyas <> wrote:

> Hi crunch !
> I want to process a list in crunch:
> Something like this:
>         PCollection<String> lines = MemPipeline.collectionOf(
>                 "BigPetStore,storeCode_AK,1  lindsay,franco,Sat Jan 10
> 00:11:10 EST 1970,10.5,dog-food",
>                 "BigPetStore,storeCode_AZ,1  tom,giles,Sun Dec 28 23:08:45
> EST 1969,10.5,dog-food",
>                 "BigPetStore,storeCode_CA,1  brandon,ewing,Mon Dec 08
> 20:23:57 EST 1969,16.5,organic-dog-food",
>                 "BigPetStore,storeCode_CA,2  angie,coleman,Thu Dec 11
> 07:00:31 EST 1969,10.5,dog-food",
>                 "BigPetStore,storeCode_CA,3  angie,coleman,Tue Jan 20
> 06:24:23 EST 1970,7.5,cat-food",
>                 "BigPetStore,storeCode_CO,1  sharon,trevino,Mon Jan 12
> 07:52:10 EST 1970,30.1,antelope snacks",
>                 "BigPetStore,storeCode_CT,1  kevin,fitzpatrick,Wed Dec 10
> 05:24:13 EST 1969,10.5,dog-food",
>                 "BigPetStore,storeCode_NY,1  dale,holden,Mon Jan 12
> 23:02:13 EST 1970,19.75,fish-food",
>                 "BigPetStore,storeCode_NY,2  dale,holden,Tue Dec 30
> 12:29:52 EST 1969,10.5,dog-food",
>                 "BigPetStore,storeCode_OK,1  donnie,tucker,Sun Jan 18
> 04:50:26 EST 1970,7.5,cat-food");
>         PCollection coll = lines.parallelDo(
>               "split lines into words",
>               new DoFn<String, String>() {
>                   @Override
>                   public void process(String line, Emitter emitter) {
>                     //not sure this regex will work but you get the idea..
> split by tabs and commas
>                     emitter.emit(Arrays.asList(line.split("\t,")));
>                   }
>               },
>               Writables.lists()
>         ).groupBy(0).count();
>         }
> What is the correct abstraction in crunch to convert raw text into tuples,
> and access them by an index - which you then use to group and count on?
> thanks !
> ** FYI ** this is for the bigpetstore project, id like to show crunch
> examples in it if i can get them working,  as the API is a nice example of
> a lowerlevel mapreduce paradigm which is more java freindly.
> See and
> for details..

Director of Data Science
Cloudera <>
Twitter: @josh_wills <>

View raw message