incubator-crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Looking for some guidance in building a basic Avro pipeline
Date Fri, 07 Dec 2012 18:26:25 GMT
Hey Natty,

Reply inlined.


On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <natty@cloudera.com>wrote:

> Hey Josh,
>
> That really doesn't solve the problem I'm facing. Avros.specifics assumes
> that I've got a Java file that Avro generated for me, which I don't have. I
> can certainly go through the trouble of getting that file, but what I've
> got currently is a POJO that I'm associating with a JSON Avro schema. It's
> a perfectly valid use case, and as far as I can tell, from what's provided
> by the Avros utility class, it should be supported. So here's my question:
>

Interesting-- I had not hit that use case for Avro before. For a POJO, I
would just use the reflection APIs, which are available via Avros.reflects.


>
> Is the Avros.generics issue a bug? It seems to me that the T of PType<T>
> has to implement Writable, and in the case of the return type of
> Avros.generics, this is not the case.
>

There's no requirement for the PType<T> to be a Writable, or even an Avro
instance. There's stuff like o.a.c.types.PTypes.derived that lets you
create PType<T> that depend on other PTypes, which is how Crunch handles
things like protocol buffers/thrift/jackson-style object serializations.

I'm just taking a closer look at the Exception that was thrown, and it
looks to me like the problem is occurring at the end of the pipeline, where
you're calling pipeline.writeTextFile (not included in the code snippet
posted). Crunch has to convert the PType to something that can be converted
to a Writable impl-- if you try to write an Avro object to the
TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to me
that in this case, Crunch can't figure out how to turn MyAvroObject into a
Writable instance for writing to the TextOuputFormat.


> If it's a bug, then fine, I'll file a JIRA and jump through whatever
> necessary hoops exist.
>

One way to fix this would be to update writeTextFile to force conversion of
any non-string that was passed into it into a String via an auxiliary
MapFn-- I'm not sure why I didn't do that in the first place. What do you
think?


> Thanks,
> Natty
>
>
> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <josh.wills@gmail.com> wrote:
>
>> Did you look at Avros.specifics?
>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <natty@cloudera.com> wrote:
>>
>>> Ok, I'm still a little confused. Let's say I use Avros.generics(), and
>>> then I modify my code to use GenericData.Records. Those Records still don't
>>> implement the Writable interface, so I'm still getting a class cast
>>> exception. Did I do something totally wrong?
>>>
>>>
>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <natty@cloudera.com>wrote:
>>>
>>>> Well, the problem with that is that I really want to work with my
>>>> objects, rather than use Avros.generics, because then I'm forced to treat
>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>
>>>>
>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <josh.wills@gmail.com>wrote:
>>>>
>>>>> You don't want to create an AvroType yourself, you want to call
>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>>>>> object.
>>>>>
>>>>> Interesting though, I would still want that case to work correctly.
>>>>>
>>>>> Josh
>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <natty@cloudera.com>
wrote:
>>>>>
>>>>>> So I've been futzing with Crunch a bit, and trying to understand
how
>>>>>> to build a pipeline that outputs Avro data files. Roughly, I'm doing
>>>>>> something along these lines:
>>>>>>
>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>> schemaJsonString);
>>>>>>
>>>>>>     AvroType avroType = new AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>         avroObjSchema, new
>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>
>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>       public void process(String line, Emitter<MyAvroObject>
emitter)
>>>>>> {
>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>       }
>>>>>>     }, avroType);
>>>>>>
>>>>>> However, this results in a class cast exception:
>>>>>>
>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>> com.company.MyAvroObject
>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>     at
>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>     at
>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>     at org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>> (WritableTypeFamily.java:135)
>>>>>>     at
>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>
>>>>>> Anybody have any thoughts? There's got to be a magical incantation
>>>>>> that I have slightly off.
>>>>>>
>>>>>
>>>>
>>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message