crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Brandon Vargo (JIRA)" <>
Subject [jira] [Commented] (CRUNCH-486) Join with custom Writable PType registered using Writables.registerComparable NPEs during shuffle
Date Thu, 08 Jan 2015 21:41:34 GMT


Brandon Vargo commented on CRUNCH-486:

This is about what I had for testing, except that I wasn't checking for the writable type
family. This fixes the issue for joins, but I think there will still be an issue for grouping
by a key that is a TupleWritable containing a custom writable outside of a join in Hadoop
versions before 2.5.0, since the comaparator is not being set in that case. It does fix everything
under 2.5.0, though, since the comparator will get configured now that it implements Configurable.

I don't know the Crunch codebase well enough though to know if there's a place in the MapReduce
implementation that allows the configuration property to be set by all groupBy operations
that use TupleWritable as a key. That might be a better place to set it, if it exists. Perhaps
WritableGroupedTableType's configureShuffle method, if the key is an instance of TupleWritable?
I don't know how to limit it to just the MapReduce pipeline there, though, but perhaps it
doesn't matter if a MapReduce property is set in the configuration when running under a non-MapReduce
pipeline. The only reason I was using the DefaultJoinStrategy is because that was the easiest
place I found to inject the parameter for only one job in the pipeline for testing. Also,
it looks like there are two other comparators for TupleWritables that have different logic:
one under lib.sort and another in lib.join.JoinUtils, which appear to be used for secondary
sorts. I don't know if setting this property more broadly would break those classes.

Also, the constant "mapreduce.job.output.key.comparator.class" is available as {{MRJobConfig.KEY_COMPARATOR}}.
Under Hadoop1, it looks like the property name was "mapred.output.key.comparator.class" instead.
{optionsBuilder.conf(MRJobConfig.KEY_COMPARATOR, TupleWritable.Comparator.class.getName());}
compiles under Hadoop1, but that is as much testing as I did.

I tried to write a test for the patch, but I couldn't find a good way to do it; all of the
tests appear to run in the same JVM, so the global writable codes map would be affected by
the other phases running in the same JVM, not to mention the other tests. The test would be
testing itself more than the patch.

So your patch fixes the join issue that I am seeing and looks good to me, unless you know
of a better place to insert the configuration option so that all groupBy operations work on
pre-2.5.0 Hadoop2.


> Join with custom Writable PType registered using Writables.registerComparable NPEs during
> -------------------------------------------------------------------------------------------------
>                 Key: CRUNCH-486
>                 URL:
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 0.11.0
>            Reporter: Brandon Vargo
>            Assignee: Josh Wills
>            Priority: Minor
>         Attachments: CRUNCH-486.patch
> When joining two PTables on a key that is a custom writable PType, the shuffler will
fail with the following NullPointerException under Hadoop2 if the custom type has been registered
using Writables.registerComparable. This happens regardless of whether a specific integer
code is provided or the default hashCode()-based value is used.
> {noformat}
> org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError:
Error while doing final merge 
> 	at
> 	at
> 	at org.apache.hadoop.mapred.YarnChild$
> 	at Method)
> 	at
> 	at
> 	at org.apache.hadoop.mapred.YarnChild.main(
> Caused by: java.lang.NullPointerException
> 	at java.lang.Class.isAssignableFrom(Native Method)
> 	at org.apache.crunch.types.writable.TupleWritable$Comparator.compareField(
> 	at org.apache.crunch.types.writable.TupleWritable$
> 	at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(
> 	at org.apache.hadoop.util.PriorityQueue.upHeap(
> 	at org.apache.hadoop.util.PriorityQueue.put(
> 	at org.apache.hadoop.mapred.Merger$MergeQueue.merge(
> 	at org.apache.hadoop.mapred.Merger.merge(
> 	at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.finalMerge(
> 	at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.close(
> 	at
> 	... 6 more
> {noformat}
> It appears that the Writables.WRITABLE_CODES entries are not deserialized from the configuration
during the shuffle phase of a join until TupleWritable.setConf() is called. However, because
TupleWritable.Comparator is registered as a raw comparator for TupleWritable, the shuffler
uses the comparator without instantiating or configuring a TupleWritable instance. As a result,
the type codes for the custom types are not available when the comparator starts to run.
> HADOOP-10686 made WritableComparator implement Configurable, but this was not released
until Hadoop 2.5. If I build Crunch against Hadoop 2.5 and copy TupleWritable's setConf()
function to TupleWritable.Comparator, then the shuffle works as expected. However, since Crunch
currently targets Hadoop 2.2, this does not work for the current version of Crunch.
> As as a workaround, it appears that if the {{mapreduce.job.output.key.comparator.class}}
property is set in the configuration, then the instance is created in JobConf.getOutputKeyComparator()
using ReflectionUtils instead of using the WritableComparator registration. ReflectionUtils
will pass the configuration to anything that implements Configurable, so setting {{mapreduce.job.output.key.comparator.class}}
to TupleWritable.Comparator and implementing Configurable might work for Hadoop versions older
than 2.5. I have yet to try this, though, and I have not looked into Hadoop1 to see if this
would also work there.
> If the shuffle is able to register the type codes via either method above, then there
is one small secondary issue that I hit: Writables.registerComparable checks if the type code
is already present in the map; if the type code is already in use, then it throws an exception,
even if the class being registered is the same as the existing class. With the type codes
being initialized during the shuffle phase, any later call to registerComparable for the same
type code and class will fail. I currently have my registerComparable call in a static initialization
block for my PType, so it is called whenever my writable type is first used under Crunch;
in this case, it happens when the reduce phase starts. Checking to see if the class being
registered and the existing class are equal inside of registerComparable before throwing an
error, similar to the one that is in Guava's AbstractBiMap, prevents this exception from being
> The above was happening using 0.11.0-hadoop2 on Hadoop 2.5.0 (CDH 5.2). The modifications
I mention above were made on top of {{d4f23c4}} and also tested on CDH 5.2.

This message was sent by Atlassian JIRA

View raw message