crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Danny Morgan <unlucky...@hotmail.com>
Subject RE: Trouble with Avro records
Date Mon, 29 Sep 2014 18:57:29 GMT
Hi Josh,

Any luck?

-Danny

From: unluckyboy@hotmail.com
To: user@crunch.apache.org
CC: tomwhite@apache.org
Subject: RE: Trouble with Avro records
Date: Fri, 19 Sep 2014 14:49:55 +0000




Gave it a shot but with no luck.
From: jwills@cloudera.com
Date: Thu, 18 Sep 2014 20:52:17 -0700
Subject: Re: Trouble with Avro records
To: user@crunch.apache.org
CC: tomwhite@apache.org

Let me see how much work it would be to get a Parquet patch in; they're usually pretty good
about these things. There's no way that a bump to Avro 1.7.7 fixes this, right?
J
On Thu, Sep 18, 2014 at 10:44 AM, Danny Morgan <unluckyboy@hotmail.com> wrote:



Hi Guys,
Wondering if this is fixable in a future release of the parquet jars?
Thanks!
From: unluckyboy@hotmail.com
To: user@crunch.apache.org; tomwhite@apache.org
Subject: RE: Trouble with Avro records
Date: Fri, 29 Aug 2014 22:29:23 +0000




Thanks Josh, it does seem to work if I treat the records as generics instead of a specific
class.

From: jwills@cloudera.com
Date: Fri, 29 Aug 2014 14:49:59 -0700
Subject: Re: Trouble with Avro records
To: user@crunch.apache.org; tomwhite@apache.org

+tom
Ack, okay. Dug into this a bit more, and I think the source of the issue is in Parquet: the
code that is looking for the specific class type inside of Parquet's AvroRecordMaterializer
is calling SpecificData.get() instead of the (new SpecificData(classLoader)) impl that we
use inside of AvroMode. Not sure that we have a way of getting around this w/o copying and
rewriting a lot of the Parquet code, so I added Tom explicitly to see if he has any ideas.


J

On Fri, Aug 29, 2014 at 2:23 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:





Didn't seem to work.
I tried output.inputConf("crunch.avro.mode", "SPECIFIC");

output.outputConf("crunch.avro.mode", "SPECIFIC");

as well to no avail.



From: jwills@cloudera.com
Date: Fri, 29 Aug 2014 13:52:35 -0700
Subject: Re: Trouble with Avro records
To: user@crunch.apache.org



Hey Danny,
So I suspect the problem is that the AvroMode info isn't getting propagated to the ParquetFileSourceTarget.
The simplest way to verify the problem is not as simple as it should be, but I'd like you
to try something like this:




SourceTarget<Log> output = new AvroParquetFileSourceTarget<Log>(new Path(path),
Avros.specifics(Log.class));




output.conf("crunch.avro.mode", "SPECIFIC");




...and let me know if that fixes the problem. If so, I can file a JIRA to fix it properly.




J

On Fri, Aug 29, 2014 at 11:15 AM, Josh Wills <jwills@cloudera.com> wrote:


Hey Danny,
I'll take a look at it later today, kind of a crazy AM for me.


J

On Fri, Aug 29, 2014 at 9:11 AM, Danny Morgan <unluckyboy@hotmail.com> wrote:








Okay looks like the AvroMode.setSpecificClassLoader() fix works for Avro files but doesn't
work for Parquet Avro files.
// This works greatPipeline p1 = new MRPipeline(MyLogs.class, getConf());




PCollections logs = ....;SourceTarget<Log> output = At.avroFile(path, Avros.specifics(Log.class))p1.write(logs,
output, WriteMode.OVERWRITE);p1.done();





Pipeline p2 = new MRPipeline(MyLogs.class, getConf());AvroMode.setSpecificClassLoader(Log.class.getClassLoader());PCollection<Log>
logs2 = p2.read(out);p2.done();





//This fails with java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to com.test.LogPipeline p1 = new MRPipeline(MyLogs.class, getConf());




PCollections logs = ....;SourceTarget<Log> output = At.avroFile(path, Avros.specifics(Log.class))SourceTarget<Log>
output = new AvroParquetFileSourceTarget<Log>(new Path(path), Avros.specifics(Log.class));




p1.write(logs, output, WriteMode.OVERWRITE);p1.done();
Pipeline p2 = new MRPipeline(MyLogs.class, getConf());AvroMode.setSpecificClassLoader(Log.class.getClassLoader());




PCollection<Log> logs2 = p2.read(out);p2.done();
Any idea?
Thanks!


From: josh.wills@gmail.com





Date: Thu, 21 Aug 2014 13:54:56 -0700
Subject: Re: Trouble with Avro records
To: user@crunch.apache.org



On Thu, Aug 21, 2014 at 1:46 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:





Thanks Josh, if it is any consolation Crunch has saved me countless hours and cpu cycles over
how we used to do things so I'm glad to be benefiting from your ill-advised decisions :-)








That is always nice to hear-- thanks! 
Danny



From: josh.wills@gmail.com
Date: Thu, 21 Aug 2014 13:33:40 -0700
Subject: Re: Trouble with Avro records
To: user@crunch.apache.org









On Thu, Aug 21, 2014 at 1:28 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:





Awesome adding:

AvroMode.setSpecificClassLoader(Visit.class.getClassLoader());

Did the trick, thanks Josh!

Do you mind clarifying when to use  Avros.records() 
versus Avros.specifics() or Avros.reflects()?

Oops, yes-- sorry about that. The records() method is defined on both the WritableTypeFamily
and the AvroTypeFamily as a method that is supposed to handle "arbitrary" data types that
aren't supported by the built-in POJO and Crunch Tuple methods. Fortunately/unfortunately,
the arbitrary data types depends on the backing implementation: for Writables, records() only
really supports classes that implement Writable, and for Avros, records() checks to see if
the class implements SpecificRecord (at which point it handles it using Avros.specifics) and
if it does not, passes it on to Avros.reflects.









I think that, like many thinks in Crunch, records() reflects an early and somewhat ill-advised
decision I made when I was first creating the API that I would likely do away with if I had
it to do over again. :)









J

Thanks Again!


From: josh.wills@gmail.com









Date: Thu, 21 Aug 2014 13:20:20 -0700
Subject: Re: Trouble with Avro records
To: user@crunch.apache.org

Okay. I suspect this is the problem:









https://issues.apache.org/jira/browse/CRUNCH-442

Gabriel fixed this for the upcoming 0.11 release, but there are a couple of workarounds in
the comments. One is to put the avro jar into your job jar file, instead of relying on the
one that is in the hadoop lib. The other is to configure AvroMode.setSpecificClassLoader w/the
class loader for your Visit class before kicking off the job.












Josh

On Thu, Aug 21, 2014 at 1:10 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:














Hi Josh,

crunch-0.10.0-hadoop2

Thanks.

From: josh.wills@gmail.com
Date: Thu, 21 Aug 2014 13:04:29 -0700
Subject: Re: Trouble with Avro records











To: user@crunch.apache.org

That feels like an AvroMode-related exception; which version of Crunch are you using?


J

On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:





Hi Guys,



Love crunch but having some trouble recently using Avro records. I think someone needs to
write a Crunch book.

I'm trying to aggregate hourly visits to a page by each user. I do one pass to parse the records
and then I try to "group by" unique users and hour and count the number of times they visited
as well as their first visit time in the hour. Here's the Avro schema














{"namespace": "com.test",
 "type": "record",
 "name": "Visit",
 "fields": [
     {"name": "dateid", "type": "int"},            // Year Month Day Hour in PST as an integer
e.g. 2014081103













     {"name": "userid", "type": "string"}, 
     {"name": "vcount",  "type": ["long", "null"]},
     {"name": "firsttimestamp",  "type": ["long", "null"]} // Unixtime stamp of first visit













 ]
}

Here I do the parsing, at first vcount and firstvisit aren't set.

PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing", new
VisitsExtractor(),
          Avros.tableOf(Avros.specifics(Visit.class), Avros.pairs(Avros.longs(), Avros.longs())));














The relevant line from VisitsExtractor:
emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp())));

Everything up to this point works fine, now I want to count up the unique visitors and the
minimum timestamp.














        PTable<Visit, Pair<Long, Long>> agg = visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
Aggregators.MIN_LONGS()));

The above seems to work fine too, now I want to create new Visit classes and fill in the count
and minimum timestamp fields.














        PCollection<Visit> count_visits = agg.parallelDo("visits-count", new DoFn<Pair<Visit,
Pair<Long, Long>>, Visit>() {
          @Override   
          public void process(Pair<Visit, Pair<Long, Long>> p, Emitter<Visit>
emitter) {













            Visit v = Visit.newBuilder(p.first()).build();
            v.setVcount(p.second().first());
            v.setFirsttimestamp(p.second().second());
            emitter.emit(v);
          }
       }, Avros.specifics(Visit.class));













    }

     count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE);

Here's the error:
2014-08-21 15:09:26,245 ERROR run.CrunchReducer (CrunchReducer.java:reduce(54)) - Reducer
exception
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to
com.test.Visit













        at com.test.Logs$1.process(Logs.java:49)
        at com.test.Logs$1.process(Logs.java:1)
        at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
        at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)














So I'm pretty sure the problem is this line:

Visit v = Visit.newBuilder(p.first()).build();

specifically p.first() should be a Visit type but I guess it isn't. I assume the output of
the groupBy operation in the reducers is serializing the key but not using the correct Avro
type to do it?














Also I don't think I understand when I should be using Avros.records() versus Avros.specifics()
when I have a generated avro file.

Thanks!

Danny
 		 	   		  












 		 	   		  

 		 	   		  

 		 	   		  

 		 	   		  

 		 	   		  


-- 
Director of Data ScienceCloudera

Twitter: @josh_wills




-- 
Director of Data ScienceClouderaTwitter: @josh_wills




 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills


 		 	   		   		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		   		 	   		  
Mime
View raw message