incubator-crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jonathan Natkins <na...@cloudera.com>
Subject Re: Looking for some guidance in building a basic Avro pipeline
Date Tue, 11 Dec 2012 21:18:12 GMT
Alright, I'm back for more. This time, I'm trying to perform a group by
with Avro data. What I've currently got is this:

    PGroupedTable<String, MyAvroObject> processedData = data.parallelDo(new
DoFn<String, Pair<String, MyAvroObject>>() {
      public void process(String line, Emitter<Pair<String, MyAvroObject>>
emitter) {
        String key = getKey(line);
        MyAvroObject value = convertToAvroObject(line);
        emitter.emit(Pair.of(key, value));
      }
    }, Avros.tableOf(Avros.strings(), Avros.specifics(MyAvroObject.class)))
    .groupByKey(3);

    PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>> groupedData
=
        processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>,
MyAvroGroup>() {
            @Override
            public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>>
input) {
              MyAvroGroup group = new MyAvroGroup();
              group.objects = Lists.<MyAvroObject>newArrayList();

              for (MyAvroObject obj : input.second()) {
                group.objects.add(obj);
              }

              return group;
            }
          },
          Avros.specifics(MyAvroGroup.class));

I think this is all pretty sane, but I'm getting an exception when the
pipeline attempts to run the by():

12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.ClassCastException:
org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to
org.apache.crunch.types.avro.AvroType
    at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608)
    at
org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135)
    at org.apache.crunch.impl.mem.collect.MemCollection.by
(MemCollection.java:222)

Am I doing something obviously wrong?

Thanks,
Natty



On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <natty@cloudera.com>wrote:

> To bring things full circle, the core issue I was having was caused by the
> fact that I was writing the data in the wrong way. Instead of
>
> pipeline.writeTextFile(words, args[1]);
>
> I should have been using
>
> pipeline.write(words, To.avroFile(args[1]);
>
> As Josh noted, writeTextFile was attempting to write my data out as a
> String, but I wasn't giving it an object that was easy to turn into a
> String, which resulted in an exception. Changing it to write to an avro
> file solved those issues.
>
> Thanks, Josh!
>
>
>
> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <jwills@cloudera.com> wrote:
>
>> Hey Natty,
>>
>> Reply inlined.
>>
>>
>>  On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <natty@cloudera.com>wrote:
>>
>>> Hey Josh,
>>>
>>> That really doesn't solve the problem I'm facing. Avros.specifics
>>> assumes that I've got a Java file that Avro generated for me, which I don't
>>> have. I can certainly go through the trouble of getting that file, but what
>>> I've got currently is a POJO that I'm associating with a JSON Avro schema.
>>> It's a perfectly valid use case, and as far as I can tell, from what's
>>> provided by the Avros utility class, it should be supported. So here's my
>>> question:
>>>
>>
>> Interesting-- I had not hit that use case for Avro before. For a POJO, I
>> would just use the reflection APIs, which are available via Avros.reflects.
>>
>>
>>>
>>> Is the Avros.generics issue a bug? It seems to me that the T of PType<T>
>>> has to implement Writable, and in the case of the return type of
>>> Avros.generics, this is not the case.
>>>
>>
>> There's no requirement for the PType<T> to be a Writable, or even an Avro
>> instance. There's stuff like o.a.c.types.PTypes.derived that lets you
>> create PType<T> that depend on other PTypes, which is how Crunch handles
>> things like protocol buffers/thrift/jackson-style object serializations.
>>
>> I'm just taking a closer look at the Exception that was thrown, and it
>> looks to me like the problem is occurring at the end of the pipeline, where
>> you're calling pipeline.writeTextFile (not included in the code snippet
>> posted). Crunch has to convert the PType to something that can be converted
>> to a Writable impl-- if you try to write an Avro object to the
>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to
>> me that in this case, Crunch can't figure out how to turn MyAvroObject into
>> a Writable instance for writing to the TextOuputFormat.
>>
>>
>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>>> necessary hoops exist.
>>>
>>
>> One way to fix this would be to update writeTextFile to force conversion
>> of any non-string that was passed into it into a String via an auxiliary
>> MapFn-- I'm not sure why I didn't do that in the first place. What do you
>> think?
>>
>>
>>> Thanks,
>>> Natty
>>>
>>>
>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <josh.wills@gmail.com> wrote:
>>>
>>>> Did you look at Avros.specifics?
>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <natty@cloudera.com> wrote:
>>>>
>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(), and
>>>>> then I modify my code to use GenericData.Records. Those Records still
don't
>>>>> implement the Writable interface, so I'm still getting a class cast
>>>>> exception. Did I do something totally wrong?
>>>>>
>>>>>
>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <natty@cloudera.com>wrote:
>>>>>
>>>>>> Well, the problem with that is that I really want to work with my
>>>>>> objects, rather than use Avros.generics, because then I'm forced
to treat
>>>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <josh.wills@gmail.com>wrote:
>>>>>>
>>>>>>> You don't want to create an AvroType yourself, you want to call
>>>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass
it a Class
>>>>>>> object.
>>>>>>>
>>>>>>> Interesting though, I would still want that case to work correctly.
>>>>>>>
>>>>>>> Josh
>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <natty@cloudera.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> So I've been futzing with Crunch a bit, and trying to understand
>>>>>>>> how to build a pipeline that outputs Avro data files. Roughly,
I'm doing
>>>>>>>> something along these lines:
>>>>>>>>
>>>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>>>> schemaJsonString);
>>>>>>>>
>>>>>>>>     AvroType avroType = new
>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>>         avroObjSchema, new
>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>>>
>>>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>>       public void process(String line, Emitter<MyAvroObject>
>>>>>>>> emitter) {
>>>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>>>       }
>>>>>>>>     }, avroType);
>>>>>>>>
>>>>>>>> However, this results in a class cast exception:
>>>>>>>>
>>>>>>>> Exception in thread "main" java.lang.ClassCastException:
class
>>>>>>>> com.company.MyAvroObject
>>>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>>     at
>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>>     at
>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>>     at
>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>>     at
>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>>
>>>>>>>> Anybody have any thoughts? There's got to be a magical incantation
>>>>>>>> that I have slightly off.
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>>
>

Mime
View raw message