crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Create custom Crunch source (CustomDataSource) for CustomInputFormat<Void, CustomNonWritableClass>
Date Tue, 10 Jun 2014 15:26:01 GMT
Hey Christian,

Good to see you again, I hope all is well. This is a complex setup, but the
good news is that we had to do it before for HBase 0.96, which also returns
non-Writable values in an InputFormat. The code you're going to want to use
as your reference is the HFileSource in crunch-hbase:

https://github.com/apache/crunch/blob/master/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java

A few comments:
1) First, ignore the Void key; instead of trying to return a PTable, you're
going to return a PCollection<NonWritableType> as the result of this
source. The key to doing that is in the Converter implementation that is
associated with the Source; look at the HBaseValueConverter (which is just
a simple pass-through value converter) to see how we do it for HFileSource.
The key thing to note in that converter is that the function
applyPTypeTransforms() returns false; this means that when reading/writing
data using that Converter, we don't apply the map functions from the PType
associated with the source (which is the right thing to do here as well.)
2) I'm assuming that there is some hadoop Serialization that supports the
non-Writable value type you're supporting that Hadoop has to be configured
to read; be sure to override the configureSource() method in your Source to
add those serializations to the Job configuration (again, see how it's done
in HFileSource.configureSource)
3) Given all that, the PType for your non-writable class that is associated
with the source should primarily be concerned with how to serialize it
during a shuffle or a read/write from another input format (like Avro, or
SequenceFile, or whatever), as we do in HBaseTypes. It won't actually be
used for reading/writing from the custom input format.

Hope that helps.

J


On Tue, Jun 10, 2014 at 2:42 AM, Christian Tzolov <
christian.tzolov@gmail.com> wrote:

> Hi all,
>
> I am trying to create a Crunch source for a custom InputFormat that has
> structure like this: CustomInputFormat<Void, CustomNonWritableClass>
>
> I've tried two implementations with no success. I must be missing
> something but not sure what?
>
> Implementation 1: Derive PType<Pair<Void, CustomNonWritableClass>> using
> MapWritable as base type
> ------------------------------------------------------------
> --------------------------------------------------------
>
> PType<Pair<Void, CustomNonWritableClass>> derivedType = typeFamily.derived(
>    (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null,
> null).getClass(),
>    new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {public
> Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}},
>    new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {public
> MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}},
>    typeFamily.records(MapWritable.class)
> );
>
> public class CustomDataSource extends FileTableSourceImpl<Void,
> CustomNonWritableClass > {
>
>    public CustomDataSource() {
>           super(new Path("xsource"),
>           (PTableType<Void, CustomNonWritableClass >) derivedType),
>           FormatBundle.forInput(CustomInputFormat.class));
>    }
>    ...
> }
>
> This implementation fails before submitting the job with the following
> error:
>
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.crunch.types.writable.WritableType cannot be cast to
> org.apache.crunch.types.PTableType
>       at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...)
>
>
>
> Implementation 2: Derive PType<CustomNonWritableClass> using MapWritable
> as base type
>
> --------------------------------------------------------------------------------------------------------------------
>
> public static MapWritableToCustomNonWritableClass extends
> MapFn<MapWritable, CustomNonWritableClass> {
>       public CustomNonWritableClass map(MapWritable input) {...}
> }
> public static CustomNonWritableClassToMapWritable
> extends MapFn<CustomNonWritableClass, MapWritable>() {
>      public MapWritable map(CustomNonWritableClass input) {...}
> }
>
> PType<CustomNonWritableClass> derivedType = typeFamily.derived(
>    CustomNonWritableClass.class,
>    new MapWritableToCustomNonWritableClass(),
>    new CustomNonWritableClassToMapWritable(),
>    typeFamily.records(MapWritable.class)
> );
>
> public class CustomDataSource extends
>  FileSourceImpl<CustomNonWritableClass> {
>
>    public CustomDataSource() {
>        super(new Path("xsource"),
>        (PTableType<Void, CustomNonWritableClass >) derivedType),
>        FormatBundle.forInput(CustomInputFormat.class));
>    }
>    ...
> }
>
> When run this gets submitted to the cluster but the MR job fails with:
>
> 2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290]
> org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task:
> attempt_1401786307497_0078_m_000000_0 - exited :
> java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be
> cast to org.apache.hadoop.io.MapWritable
>      at
> com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1)
>      at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63)
>      at org.apache.crunch.MapFn.process(MapFn.java:34)
>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
>      at
> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>      at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>
> Thanks,
> Christian
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message