incubator-crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jonathan Natkins <na...@cloudera.com>
Subject Re: Looking for some guidance in building a basic Avro pipeline
Date Tue, 11 Dec 2012 21:26:24 GMT
Cool, JIRA filed: https://issues.apache.org/jira/browse/CRUNCH-129


On Tue, Dec 11, 2012 at 1:21 PM, Josh Wills <jwills@cloudera.com> wrote:

> No, you're not-- I think that's a bug. Switching to "parallelDo" instead
> of "by" on the groupedData object will work fine, but we should make sure
> the by() operation works on grouped tables.
>
>
> On Tue, Dec 11, 2012 at 1:18 PM, Jonathan Natkins <natty@cloudera.com>wrote:
>
>> Alright, I'm back for more. This time, I'm trying to perform a group by
>> with Avro data. What I've currently got is this:
>>
>>     PGroupedTable<String, MyAvroObject> processedData =
>> data.parallelDo(new DoFn<String, Pair<String, MyAvroObject>>() {
>>       public void process(String line, Emitter<Pair<String, MyAvroObject>>
>> emitter) {
>>         String key = getKey(line);
>>         MyAvroObject value = convertToAvroObject(line);
>>         emitter.emit(Pair.of(key, value));
>>       }
>>     }, Avros.tableOf(Avros.strings(),
>> Avros.specifics(MyAvroObject.class)))
>>     .groupByKey(3);
>>
>>     PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>> groupedData
>> =
>>         processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>,
>> MyAvroGroup>() {
>>             @Override
>>             public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>>
>> input) {
>>               MyAvroGroup group = new MyAvroGroup();
>>               group.objects = Lists.<MyAvroObject>newArrayList();
>>
>>               for (MyAvroObject obj : input.second()) {
>>                 group.objects.add(obj);
>>               }
>>
>>               return group;
>>             }
>>           },
>>           Avros.specifics(MyAvroGroup.class));
>>
>> I think this is all pretty sane, but I'm getting an exception when the
>> pipeline attempts to run the by():
>>
>> 12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load
>> native-hadoop library for your platform... using builtin-java classes where
>> applicable
>> Exception in thread "main" java.lang.ClassCastException:
>> org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to
>> org.apache.crunch.types.avro.AvroType
>>     at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608)
>>     at
>> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135)
>>     at org.apache.crunch.impl.mem.collect.MemCollection.by
>> (MemCollection.java:222)
>>
>> Am I doing something obviously wrong?
>>
>> Thanks,
>> Natty
>>
>>
>>
>>
>> On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <natty@cloudera.com>wrote:
>>
>>> To bring things full circle, the core issue I was having was caused by
>>> the fact that I was writing the data in the wrong way. Instead of
>>>
>>> pipeline.writeTextFile(words, args[1]);
>>>
>>> I should have been using
>>>
>>> pipeline.write(words, To.avroFile(args[1]);
>>>
>>> As Josh noted, writeTextFile was attempting to write my data out as a
>>> String, but I wasn't giving it an object that was easy to turn into a
>>> String, which resulted in an exception. Changing it to write to an avro
>>> file solved those issues.
>>>
>>> Thanks, Josh!
>>>
>>>
>>>
>>> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <jwills@cloudera.com> wrote:
>>>
>>>> Hey Natty,
>>>>
>>>> Reply inlined.
>>>>
>>>>
>>>>  On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <natty@cloudera.com>wrote:
>>>>
>>>>> Hey Josh,
>>>>>
>>>>> That really doesn't solve the problem I'm facing. Avros.specifics
>>>>> assumes that I've got a Java file that Avro generated for me, which I
don't
>>>>> have. I can certainly go through the trouble of getting that file, but
what
>>>>> I've got currently is a POJO that I'm associating with a JSON Avro schema.
>>>>> It's a perfectly valid use case, and as far as I can tell, from what's
>>>>> provided by the Avros utility class, it should be supported. So here's
my
>>>>> question:
>>>>>
>>>>
>>>> Interesting-- I had not hit that use case for Avro before. For a POJO,
>>>> I would just use the reflection APIs, which are available via
>>>> Avros.reflects.
>>>>
>>>>
>>>>>
>>>>> Is the Avros.generics issue a bug? It seems to me that the T of
>>>>> PType<T> has to implement Writable, and in the case of the return
type of
>>>>> Avros.generics, this is not the case.
>>>>>
>>>>
>>>> There's no requirement for the PType<T> to be a Writable, or even an
>>>> Avro instance. There's stuff like o.a.c.types.PTypes.derived that lets you
>>>> create PType<T> that depend on other PTypes, which is how Crunch handles
>>>> things like protocol buffers/thrift/jackson-style object serializations.
>>>>
>>>> I'm just taking a closer look at the Exception that was thrown, and it
>>>> looks to me like the problem is occurring at the end of the pipeline, where
>>>> you're calling pipeline.writeTextFile (not included in the code snippet
>>>> posted). Crunch has to convert the PType to something that can be converted
>>>> to a Writable impl-- if you try to write an Avro object to the
>>>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to
>>>> me that in this case, Crunch can't figure out how to turn MyAvroObject into
>>>> a Writable instance for writing to the TextOuputFormat.
>>>>
>>>>
>>>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>>>>> necessary hoops exist.
>>>>>
>>>>
>>>> One way to fix this would be to update writeTextFile to force
>>>> conversion of any non-string that was passed into it into a String via an
>>>> auxiliary MapFn-- I'm not sure why I didn't do that in the first place.
>>>> What do you think?
>>>>
>>>>
>>>>> Thanks,
>>>>> Natty
>>>>>
>>>>>
>>>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <josh.wills@gmail.com>wrote:
>>>>>
>>>>>> Did you look at Avros.specifics?
>>>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <natty@cloudera.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(),
>>>>>>> and then I modify my code to use GenericData.Records. Those Records
still
>>>>>>> don't implement the Writable interface, so I'm still getting
a class cast
>>>>>>> exception. Did I do something totally wrong?
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <natty@cloudera.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Well, the problem with that is that I really want to work
with my
>>>>>>>> objects, rather than use Avros.generics, because then I'm
forced to treat
>>>>>>>> everything as a GenericData.Record. It's just a pain in the
butt.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <josh.wills@gmail.com>wrote:
>>>>>>>>
>>>>>>>>> You don't want to create an AvroType yourself, you want
to call
>>>>>>>>> o.a.c.types.avro.Avros.records or one of its friends
and pass it a Class
>>>>>>>>> object.
>>>>>>>>>
>>>>>>>>> Interesting though, I would still want that case to work
correctly.
>>>>>>>>>
>>>>>>>>> Josh
>>>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <natty@cloudera.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> So I've been futzing with Crunch a bit, and trying
to understand
>>>>>>>>>> how to build a pipeline that outputs Avro data files.
Roughly, I'm doing
>>>>>>>>>> something along these lines:
>>>>>>>>>>
>>>>>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>>>>>> schemaJsonString);
>>>>>>>>>>
>>>>>>>>>>     AvroType avroType = new
>>>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>>>>         avroObjSchema, new
>>>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>>>>>
>>>>>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>>>>       public void process(String line, Emitter<MyAvroObject>
>>>>>>>>>> emitter) {
>>>>>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>>>>>       }
>>>>>>>>>>     }, avroType);
>>>>>>>>>>
>>>>>>>>>> However, this results in a class cast exception:
>>>>>>>>>>
>>>>>>>>>> Exception in thread "main" java.lang.ClassCastException:
class
>>>>>>>>>> com.company.MyAvroObject
>>>>>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>>>>
>>>>>>>>>> Anybody have any thoughts? There's got to be a magical
>>>>>>>>>> incantation that I have slightly off.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>
>>>>
>>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>

Mime
View raw message