crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Trouble with Avro records
Date Mon, 29 Sep 2014 20:49:01 GMT
Hrm, that's surprising. We upgraded the Avro version to deal w/some Java 7
stuff-- maybe exclude it and give it another go?

On the Parquet stuff, this is apparently a long standing issue w/Parquet
that requires some major surgery to fix. I have some book stuff to get done
over the next couple of weeks so I don't have the time to do it myself, but
I'll keep checking in with the parquet folks.

J

On Mon, Sep 29, 2014 at 12:11 PM, Danny Morgan <unluckyboy@hotmail.com>
wrote:

> Hi Josh,
>
> This work around with:
>
> AvroMode.setSpecificClassLoader(CLASS.class.getClassLoader());
>
> Doesn't seem to work after upgrading to crunch 0.11. If I don't set a
> specific class loader at all it doesn't work either.
>
> :(
>
> -Shant
>
> ------------------------------
> From: josh.wills@gmail.com
> Date: Thu, 21 Aug 2014 13:33:40 -0700
>
> Subject: Re: Trouble with Avro records
> To: user@crunch.apache.org
>
>
> On Thu, Aug 21, 2014 at 1:28 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> Awesome adding:
>
> AvroMode.setSpecificClassLoader(Visit.class.getClassLoader());
>
> Did the trick, thanks Josh!
>
> Do you mind clarifying when to use Avros.records() versus
> Avros.specifics() or Avros.reflects()?
>
>
> Oops, yes-- sorry about that. The records() method is defined on both the
> WritableTypeFamily and the AvroTypeFamily as a method that is supposed to
> handle "arbitrary" data types that aren't supported by the built-in POJO
> and Crunch Tuple methods. Fortunately/unfortunately, the arbitrary data
> types depends on the backing implementation: for Writables, records() only
> really supports classes that implement Writable, and for Avros, records()
> checks to see if the class implements SpecificRecord (at which point it
> handles it using Avros.specifics) and if it does not, passes it on to
> Avros.reflects.
>
> I think that, like many thinks in Crunch, records() reflects an early and
> somewhat ill-advised decision I made when I was first creating the API that
> I would likely do away with if I had it to do over again. :)
>
> J
>
>
> Thanks Again!
>
>
> ------------------------------
> From: josh.wills@gmail.com
> Date: Thu, 21 Aug 2014 13:20:20 -0700
>
> Subject: Re: Trouble with Avro records
> To: user@crunch.apache.org
>
> Okay. I suspect this is the problem:
>
> https://issues.apache.org/jira/browse/CRUNCH-442
>
> Gabriel fixed this for the upcoming 0.11 release, but there are a couple
> of workarounds in the comments. One is to put the avro jar into your job
> jar file, instead of relying on the one that is in the hadoop lib. The
> other is to configure AvroMode.setSpecificClassLoader w/the class loader
> for your Visit class before kicking off the job.
>
>
> Josh
>
>
> On Thu, Aug 21, 2014 at 1:10 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> Hi Josh,
>
> crunch-0.10.0-hadoop2
>
> Thanks.
>
> ------------------------------
> From: josh.wills@gmail.com
> Date: Thu, 21 Aug 2014 13:04:29 -0700
> Subject: Re: Trouble with Avro records
> To: user@crunch.apache.org
>
>
> That feels like an AvroMode-related exception; which version of Crunch are
> you using?
>
> J
>
>
> On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> Hi Guys,
>
> Love crunch but having some trouble recently using Avro records. I think
> someone needs to write a Crunch book.
>
> I'm trying to aggregate hourly visits to a page by each user. I do one
> pass to parse the records and then I try to "group by" unique users and
> hour and count the number of times they visited as well as their first
> visit time in the hour. Here's the Avro schema
>
> {"namespace": "com.test",
>  "type": "record",
>  "name": "Visit",
>  "fields": [
>      {"name": "dateid", "type": "int"},            // Year Month Day Hour
> in PST as an integer e.g. 2014081103
>      {"name": "userid", "type": "string"},
>      {"name": "vcount",  "type": ["long", "null"]},
>      {"name": "firsttimestamp",  "type": ["long", "null"]} // Unixtime
> stamp of first visit
>  ]
> }
>
> Here I do the parsing, at first vcount and firstvisit aren't set.
>
> PTable<Visit, Pair<Long, Long>> visits =
> parsed.parallelDo("visits-parsing", new VisitsExtractor(),
>           Avros.tableOf(Avros.specifics(Visit.class),
> Avros.pairs(Avros.longs(), Avros.longs())));
>
> The relevant line from VisitsExtractor:
> emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp())));
>
> Everything up to this point works fine, now I want to count up the unique
> visitors and the minimum timestamp.
>
>         PTable<Visit, Pair<Long, Long>> agg =
> visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
> Aggregators.MIN_LONGS()));
>
> The above seems to work fine too, now I want to create new Visit classes
> and fill in the count and minimum timestamp fields.
>
>         PCollection<Visit> count_visits = agg.parallelDo("visits-count",
> new DoFn<Pair<Visit, Pair<Long, Long>>, Visit>() {
>           @Override
>           public void process(Pair<Visit, Pair<Long, Long>> p,
> Emitter<Visit> emitter) {
>             Visit v = Visit.newBuilder(p.first()).build();
>             v.setVcount(p.second().first());
>             v.setFirsttimestamp(p.second().second());
>             emitter.emit(v);
>           }
>        }, Avros.specifics(Visit.class));
>     }
>
>      count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE);
>
> Here's the error:
> 2014-08-21 15:09:26,245 ERROR run.CrunchReducer
> (CrunchReducer.java:reduce(54)) - Reducer exception
> java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> cannot be cast to com.test.Visit
>         at com.test.Logs$1.process(Logs.java:49)
>         at com.test.Logs$1.process(Logs.java:1)
>         at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>         at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>
> So I'm pretty sure the problem is this line:
>
> Visit v = Visit.newBuilder(p.first()).build();
>
> specifically p.first() should be a Visit type but I guess it isn't. I
> assume the output of the groupBy operation in the reducers is serializing
> the key but not using the correct Avro type to do it?
>
> Also I don't think I understand when I should be using Avros.records()
> versus Avros.specifics() when I have a generated avro file.
>
> Thanks!
>
> Danny
>
>
>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message