incubator-crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Looking for some guidance in building a basic Avro pipeline
Date Tue, 11 Dec 2012 21:39:08 GMT
Thanks. I think the core issue here is that there is an assumption in
MapReduce (and in Crunch) that once the Iterable<V> are processed in a
reduce step, they are now gone, and cannot be processed again, which is why
passing along the PType for the grouped table type didn't make sense as
part of the processing pipeline-- you couldn't serialize the Iterable<V> to
disk, and so it was only possible to process the output of a groupByKey
operation once. Updating that so that the Iterable<V> could be processed
multiple times in a single job necessitates including spillable collections.

J


On Tue, Dec 11, 2012 at 1:26 PM, Jonathan Natkins <natty@cloudera.com>wrote:

> Cool, JIRA filed: https://issues.apache.org/jira/browse/CRUNCH-129
>
>
>
> On Tue, Dec 11, 2012 at 1:21 PM, Josh Wills <jwills@cloudera.com> wrote:
>
>> No, you're not-- I think that's a bug. Switching to "parallelDo" instead
>> of "by" on the groupedData object will work fine, but we should make sure
>> the by() operation works on grouped tables.
>>
>>
>> On Tue, Dec 11, 2012 at 1:18 PM, Jonathan Natkins <natty@cloudera.com>wrote:
>>
>>> Alright, I'm back for more. This time, I'm trying to perform a group by
>>> with Avro data. What I've currently got is this:
>>>
>>>     PGroupedTable<String, MyAvroObject> processedData =
>>> data.parallelDo(new DoFn<String, Pair<String, MyAvroObject>>() {
>>>       public void process(String line, Emitter<Pair<String, MyAvroObject>>
>>> emitter) {
>>>         String key = getKey(line);
>>>         MyAvroObject value = convertToAvroObject(line);
>>>         emitter.emit(Pair.of(key, value));
>>>       }
>>>     }, Avros.tableOf(Avros.strings(),
>>> Avros.specifics(MyAvroObject.class)))
>>>     .groupByKey(3);
>>>
>>>     PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>>
>>> groupedData =
>>>         processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>,
>>> MyAvroGroup>() {
>>>             @Override
>>>             public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>>
>>> input) {
>>>               MyAvroGroup group = new MyAvroGroup();
>>>               group.objects = Lists.<MyAvroObject>newArrayList();
>>>
>>>               for (MyAvroObject obj : input.second()) {
>>>                 group.objects.add(obj);
>>>               }
>>>
>>>               return group;
>>>             }
>>>           },
>>>           Avros.specifics(MyAvroGroup.class));
>>>
>>> I think this is all pretty sane, but I'm getting an exception when the
>>> pipeline attempts to run the by():
>>>
>>> 12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load
>>> native-hadoop library for your platform... using builtin-java classes where
>>> applicable
>>> Exception in thread "main" java.lang.ClassCastException:
>>> org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to
>>> org.apache.crunch.types.avro.AvroType
>>>     at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608)
>>>     at
>>> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135)
>>>     at org.apache.crunch.impl.mem.collect.MemCollection.by
>>> (MemCollection.java:222)
>>>
>>> Am I doing something obviously wrong?
>>>
>>> Thanks,
>>> Natty
>>>
>>>
>>>
>>>
>>> On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <natty@cloudera.com>wrote:
>>>
>>>> To bring things full circle, the core issue I was having was caused by
>>>> the fact that I was writing the data in the wrong way. Instead of
>>>>
>>>> pipeline.writeTextFile(words, args[1]);
>>>>
>>>> I should have been using
>>>>
>>>> pipeline.write(words, To.avroFile(args[1]);
>>>>
>>>> As Josh noted, writeTextFile was attempting to write my data out as a
>>>> String, but I wasn't giving it an object that was easy to turn into a
>>>> String, which resulted in an exception. Changing it to write to an avro
>>>> file solved those issues.
>>>>
>>>> Thanks, Josh!
>>>>
>>>>
>>>>
>>>> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <jwills@cloudera.com>wrote:
>>>>
>>>>> Hey Natty,
>>>>>
>>>>> Reply inlined.
>>>>>
>>>>>
>>>>>  On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <natty@cloudera.com
>>>>> > wrote:
>>>>>
>>>>>> Hey Josh,
>>>>>>
>>>>>> That really doesn't solve the problem I'm facing. Avros.specifics
>>>>>> assumes that I've got a Java file that Avro generated for me, which
I don't
>>>>>> have. I can certainly go through the trouble of getting that file,
but what
>>>>>> I've got currently is a POJO that I'm associating with a JSON Avro
schema.
>>>>>> It's a perfectly valid use case, and as far as I can tell, from what's
>>>>>> provided by the Avros utility class, it should be supported. So here's
my
>>>>>> question:
>>>>>>
>>>>>
>>>>> Interesting-- I had not hit that use case for Avro before. For a POJO,
>>>>> I would just use the reflection APIs, which are available via
>>>>> Avros.reflects.
>>>>>
>>>>>
>>>>>>
>>>>>> Is the Avros.generics issue a bug? It seems to me that the T of
>>>>>> PType<T> has to implement Writable, and in the case of the
return type of
>>>>>> Avros.generics, this is not the case.
>>>>>>
>>>>>
>>>>> There's no requirement for the PType<T> to be a Writable, or even
an
>>>>> Avro instance. There's stuff like o.a.c.types.PTypes.derived that lets
you
>>>>> create PType<T> that depend on other PTypes, which is how Crunch
handles
>>>>> things like protocol buffers/thrift/jackson-style object serializations.
>>>>>
>>>>> I'm just taking a closer look at the Exception that was thrown, and it
>>>>> looks to me like the problem is occurring at the end of the pipeline,
where
>>>>> you're calling pipeline.writeTextFile (not included in the code snippet
>>>>> posted). Crunch has to convert the PType to something that can be converted
>>>>> to a Writable impl-- if you try to write an Avro object to the
>>>>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks
>>>>> to me that in this case, Crunch can't figure out how to turn MyAvroObject
>>>>> into a Writable instance for writing to the TextOuputFormat.
>>>>>
>>>>>
>>>>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>>>>>> necessary hoops exist.
>>>>>>
>>>>>
>>>>> One way to fix this would be to update writeTextFile to force
>>>>> conversion of any non-string that was passed into it into a String via
an
>>>>> auxiliary MapFn-- I'm not sure why I didn't do that in the first place.
>>>>> What do you think?
>>>>>
>>>>>
>>>>>> Thanks,
>>>>>> Natty
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <josh.wills@gmail.com>wrote:
>>>>>>
>>>>>>> Did you look at Avros.specifics?
>>>>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <natty@cloudera.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(),
>>>>>>>> and then I modify my code to use GenericData.Records. Those
Records still
>>>>>>>> don't implement the Writable interface, so I'm still getting
a class cast
>>>>>>>> exception. Did I do something totally wrong?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <
>>>>>>>> natty@cloudera.com> wrote:
>>>>>>>>
>>>>>>>>> Well, the problem with that is that I really want to
work with my
>>>>>>>>> objects, rather than use Avros.generics, because then
I'm forced to treat
>>>>>>>>> everything as a GenericData.Record. It's just a pain
in the butt.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <josh.wills@gmail.com>wrote:
>>>>>>>>>
>>>>>>>>>> You don't want to create an AvroType yourself, you
want to call
>>>>>>>>>> o.a.c.types.avro.Avros.records or one of its friends
and pass it a Class
>>>>>>>>>> object.
>>>>>>>>>>
>>>>>>>>>> Interesting though, I would still want that case
to work
>>>>>>>>>> correctly.
>>>>>>>>>>
>>>>>>>>>> Josh
>>>>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <natty@cloudera.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> So I've been futzing with Crunch a bit, and trying
to understand
>>>>>>>>>>> how to build a pipeline that outputs Avro data
files. Roughly, I'm doing
>>>>>>>>>>> something along these lines:
>>>>>>>>>>>
>>>>>>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>>>>>>> schemaJsonString);
>>>>>>>>>>>
>>>>>>>>>>>     AvroType avroType = new
>>>>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>>>>>         avroObjSchema, new
>>>>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>>>>>>
>>>>>>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>>>>>       public void process(String line, Emitter<MyAvroObject>
>>>>>>>>>>> emitter) {
>>>>>>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>>>>>>       }
>>>>>>>>>>>     }, avroType);
>>>>>>>>>>>
>>>>>>>>>>> However, this results in a class cast exception:
>>>>>>>>>>>
>>>>>>>>>>> Exception in thread "main" java.lang.ClassCastException:
class
>>>>>>>>>>> com.company.MyAvroObject
>>>>>>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>>>>>
>>>>>>>>>>> Anybody have any thoughts? There's got to be
a magical
>>>>>>>>>>> incantation that I have slightly off.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Director of Data Science
>>>>> Cloudera <http://www.cloudera.com>
>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message