crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: groupByKey with Avro
Date Thu, 24 Sep 2015 14:56:23 GMT
Hi Tahir,

I think that the issue here is that Avro reflection isn't up to the
task of encoding/decoding an ImmutableBytesWritable instance.

The PTableType that you supply
(Avros.tableOf(Avros.reflects(ImmutableBytesWritable.class),
Avros.reflects(ABC.class))) specifies how data is to be serialized
between the map and reduce phase, or between multiple mapreduce jobs.
However, in order to use Avros.reflects, the class being serialized
and deserialized needs to have all of its fields accessible via
reflection (typically via getters and setters).

I'm guessing that when you're running this in a unit test, you're
using the MemPipeline. The MemPipeline doesn't do any actual
serialization of values, which is why this wouldn't be an issue there.

Probably the easiest way to get around this is to use the
Avros.writables [1] method to define the PType for the
ImmutableBytesWritable class. This would mean that your PTableType
would be defined as follows:

    Avros.tableOf(Avros.writables(ImmutableBytesWritable.class),
Avros.reflects(ABC.class))

Depending on what ABC is (whether or not it's really serializable via
reflection) you could also consider using the WritableTypeFamily (via
the Writables [2] class) instead of the AvroTypeFamily (via Avros).

- Gabriel


1. https://crunch.apache.org/apidocs/0.12.0/org/apache/crunch/types/avro/Avros.html#writables(java.lang.Class)
2. https://crunch.apache.org/apidocs/0.12.0/org/apache/crunch/types/writable/Writables.html

On Thu, Sep 24, 2015 at 3:56 PM, Tahir Hameed <tahirh@gmail.com> wrote:
> To add to the above, my unit tests seems to be running fine without any
> Exception, but when I run the above lines of code on the cluster, I receive
> the above error.
>
>
> On Thu, Sep 24, 2015 at 3:46 PM, Tahir Hameed <tahirh@gmail.com> wrote:
>>
>>
>> Hi,
>>
>> I've the following case :
>>
>> PGroupedTable<ImmutableBytesWritable, ABC> o = abcTable
>>                 .parallelDo(new ABCDoFN(),
>> Avros.tableOf(Avros.reflects(ImmutableBytesWritable.class),
>> Avros.reflects(ABC.class)))
>>                 .groupByKey();
>>
>> This raises the following error:
>>
>> 2015-09-24 15:43:08,625 WARN [main] org.apache.hadoop.mapred.YarnChild:
>> Exception running child : org.apache.avro.UnresolvedUnionException: Not in
>> union ["null",{"type":"bytes","java-class":"[B"}]: [B@3195e45d
>> 	at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:604)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
>> 	at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
>> 	at
>> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>> 	at
>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
>> 	at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
>> 	at
>> org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:128)
>> 	at
>> org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:113)
>> 	at
>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1146)
>> 	at
>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
>> 	at
>> org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
>> 	at
>> org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
>> 	at
>> org.apache.crunch.impl.mr.emit.OutputEmitter.emit(OutputEmitter.java:41)
>> 	at org.apache.crunch.MapFn.process(MapFn.java:34)
>> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>> 	at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> 	at
>> com.bol.step.enrichmentdashboard.fn.OffersDoFN.process(OffersDoFN.java:36)
>> 	at
>> com.bol.step.enrichmentdashboard.fn.OffersDoFN.process(OffersDoFN.java:17)
>> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>> 	at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> 	at org.apache.crunch.MapFn.process(MapFn.java:34)
>> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>> 	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109)
>> 	at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>> 	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
>> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
>> 	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
>> 	at java.security.AccessController.doPrivileged(Native Method)
>> 	at javax.security.auth.Subject.doAs(Subject.java:415)
>> 	at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>> 	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
>>
>>
>>
>> The error above is only caused by the groupByKey() method. I am not sure
>> why this is happening. Can someone point me in the right direction?
>>
>>
>> Tahir
>
>

Mime
View raw message