crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Trouble with Avro records
Date Fri, 29 Aug 2014 20:52:35 GMT
Hey Danny,

So I suspect the problem is that the AvroMode info isn't getting propagated
to the ParquetFileSourceTarget. The simplest way to verify the problem is
not as simple as it should be, but I'd like you to try something like this:

SourceTarget<Log> output = new *AvroParquetFileSourceTarget*<Log>(new
Path(path), Avros.specifics(Log.class));
output.conf("crunch.avro.mode", "SPECIFIC");

...and let me know if that fixes the problem. If so, I can file a JIRA to
fix it properly.

J


On Fri, Aug 29, 2014 at 11:15 AM, Josh Wills <jwills@cloudera.com> wrote:

> Hey Danny,
>
> I'll take a look at it later today, kind of a crazy AM for me.
>
> J
>
>
> On Fri, Aug 29, 2014 at 9:11 AM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
>> Okay looks like the AvroMode.setSpecificClassLoader() fix works for Avro
>> files but doesn't work for Parquet Avro files.
>>
>> // This works great
>> Pipeline p1 = new MRPipeline(MyLogs.class, getConf());
>> PCollections logs = ....;
>> SourceTarget<Log> output = At.*avroFile*(path,
>> Avros.specifics(Log.class))
>> p1.write(logs, output, WriteMode.OVERWRITE);
>> p1.done();
>>
>> Pipeline p2 = new MRPipeline(MyLogs.class, getConf());
>> AvroMode.setSpecificClassLoader(Log.class.getClassLoader());
>> PCollection<Log> logs2 = p2.read(out);
>> p2.done();
>>
>> //This fails with java.lang.ClassCastException:
>> org.apache.avro.generic.GenericData$Record cannot be cast to com.test.Log
>> Pipeline p1 = new MRPipeline(MyLogs.class, getConf());
>> PCollections logs = ....;
>> SourceTarget<Log> output = At.avroFile(path, Avros.specifics(Log.class))
>> SourceTarget<Log> output = new *AvroParquetFileSourceTarget*<Log>(new
>> Path(path), Avros.specifics(Log.class));
>> p1.write(logs, output, WriteMode.OVERWRITE);
>> p1.done();
>>
>> Pipeline p2 = new MRPipeline(MyLogs.class, getConf());
>> AvroMode.setSpecificClassLoader(Log.class.getClassLoader());
>> PCollection<Log> logs2 = p2.read(out);
>> p2.done();
>>
>> Any idea?
>>
>> Thanks!
>>
>>
>>
>> ------------------------------
>> From: josh.wills@gmail.com
>> Date: Thu, 21 Aug 2014 13:54:56 -0700
>>
>> Subject: Re: Trouble with Avro records
>> To: user@crunch.apache.org
>>
>>
>> On Thu, Aug 21, 2014 at 1:46 PM, Danny Morgan <unluckyboy@hotmail.com>
>> wrote:
>>
>> Thanks Josh, if it is any consolation Crunch has saved me countless hours
>> and cpu cycles over how we used to do things so I'm glad to be benefiting
>> from your ill-advised decisions :-)
>>
>>
>> That is always nice to hear-- thanks!
>>
>>
>>
>> Danny
>>
>> ------------------------------
>> From: josh.wills@gmail.com
>> Date: Thu, 21 Aug 2014 13:33:40 -0700
>>
>> Subject: Re: Trouble with Avro records
>> To: user@crunch.apache.org
>>
>>
>> On Thu, Aug 21, 2014 at 1:28 PM, Danny Morgan <unluckyboy@hotmail.com>
>> wrote:
>>
>> Awesome adding:
>>
>> AvroMode.setSpecificClassLoader(Visit.class.getClassLoader());
>>
>> Did the trick, thanks Josh!
>>
>> Do you mind clarifying when to use Avros.records() versus
>> Avros.specifics() or Avros.reflects()?
>>
>>
>> Oops, yes-- sorry about that. The records() method is defined on both the
>> WritableTypeFamily and the AvroTypeFamily as a method that is supposed to
>> handle "arbitrary" data types that aren't supported by the built-in POJO
>> and Crunch Tuple methods. Fortunately/unfortunately, the arbitrary data
>> types depends on the backing implementation: for Writables, records() only
>> really supports classes that implement Writable, and for Avros, records()
>> checks to see if the class implements SpecificRecord (at which point it
>> handles it using Avros.specifics) and if it does not, passes it on to
>> Avros.reflects.
>>
>> I think that, like many thinks in Crunch, records() reflects an early and
>> somewhat ill-advised decision I made when I was first creating the API that
>> I would likely do away with if I had it to do over again. :)
>>
>> J
>>
>>
>> Thanks Again!
>>
>>
>> ------------------------------
>> From: josh.wills@gmail.com
>> Date: Thu, 21 Aug 2014 13:20:20 -0700
>>
>> Subject: Re: Trouble with Avro records
>> To: user@crunch.apache.org
>>
>> Okay. I suspect this is the problem:
>>
>> https://issues.apache.org/jira/browse/CRUNCH-442
>>
>> Gabriel fixed this for the upcoming 0.11 release, but there are a couple
>> of workarounds in the comments. One is to put the avro jar into your job
>> jar file, instead of relying on the one that is in the hadoop lib. The
>> other is to configure AvroMode.setSpecificClassLoader w/the class loader
>> for your Visit class before kicking off the job.
>>
>>
>> Josh
>>
>>
>> On Thu, Aug 21, 2014 at 1:10 PM, Danny Morgan <unluckyboy@hotmail.com>
>> wrote:
>>
>> Hi Josh,
>>
>> crunch-0.10.0-hadoop2
>>
>> Thanks.
>>
>> ------------------------------
>> From: josh.wills@gmail.com
>> Date: Thu, 21 Aug 2014 13:04:29 -0700
>> Subject: Re: Trouble with Avro records
>> To: user@crunch.apache.org
>>
>>
>> That feels like an AvroMode-related exception; which version of Crunch
>> are you using?
>>
>> J
>>
>>
>> On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan <unluckyboy@hotmail.com>
>> wrote:
>>
>> Hi Guys,
>>
>> Love crunch but having some trouble recently using Avro records. I think
>> someone needs to write a Crunch book.
>>
>> I'm trying to aggregate hourly visits to a page by each user. I do one
>> pass to parse the records and then I try to "group by" unique users and
>> hour and count the number of times they visited as well as their first
>> visit time in the hour. Here's the Avro schema
>>
>> {"namespace": "com.test",
>>  "type": "record",
>>  "name": "Visit",
>>  "fields": [
>>      {"name": "dateid", "type": "int"},            // Year Month Day Hour
>> in PST as an integer e.g. 2014081103
>>      {"name": "userid", "type": "string"},
>>      {"name": "vcount",  "type": ["long", "null"]},
>>      {"name": "firsttimestamp",  "type": ["long", "null"]} // Unixtime
>> stamp of first visit
>>  ]
>> }
>>
>> Here I do the parsing, at first vcount and firstvisit aren't set.
>>
>> PTable<Visit, Pair<Long, Long>> visits =
>> parsed.parallelDo("visits-parsing", new VisitsExtractor(),
>>           Avros.tableOf(Avros.specifics(Visit.class),
>> Avros.pairs(Avros.longs(), Avros.longs())));
>>
>> The relevant line from VisitsExtractor:
>> emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp())));
>>
>> Everything up to this point works fine, now I want to count up the unique
>> visitors and the minimum timestamp.
>>
>>         PTable<Visit, Pair<Long, Long>> agg =
>> visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
>> Aggregators.MIN_LONGS()));
>>
>> The above seems to work fine too, now I want to create new Visit classes
>> and fill in the count and minimum timestamp fields.
>>
>>         PCollection<Visit> count_visits = agg.parallelDo("visits-count",
>> new DoFn<Pair<Visit, Pair<Long, Long>>, Visit>() {
>>           @Override
>>           public void process(Pair<Visit, Pair<Long, Long>> p,
>> Emitter<Visit> emitter) {
>>             Visit v = Visit.newBuilder(p.first()).build();
>>             v.setVcount(p.second().first());
>>             v.setFirsttimestamp(p.second().second());
>>             emitter.emit(v);
>>           }
>>        }, Avros.specifics(Visit.class));
>>     }
>>
>>      count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE);
>>
>> Here's the error:
>> 2014-08-21 15:09:26,245 ERROR run.CrunchReducer
>> (CrunchReducer.java:reduce(54)) - Reducer exception
>> java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
>> cannot be cast to com.test.Visit
>>         at com.test.Logs$1.process(Logs.java:49)
>>         at com.test.Logs$1.process(Logs.java:1)
>>         at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>>         at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>
>> So I'm pretty sure the problem is this line:
>>
>> Visit v = Visit.newBuilder(p.first()).build();
>>
>> specifically p.first() should be a Visit type but I guess it isn't. I
>> assume the output of the groupBy operation in the reducers is serializing
>> the key but not using the correct Avro type to do it?
>>
>> Also I don't think I understand when I should be using Avros.records()
>> versus Avros.specifics() when I have a generated avro file.
>>
>> Thanks!
>>
>> Danny
>>
>>
>>
>>
>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message