crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Tzolov <christian.tzo...@gmail.com>
Subject Re: Create custom Crunch source (CustomDataSource) for CustomInputFormat<Void, CustomNonWritableClass>
Date Thu, 12 Jun 2014 17:58:37 GMT
Hi Josh,

Good to see you too. Thanks for the HFileSource reference. The converter
did the trick and i can read the data using the custom inputformat.
Now I am struggling with the NonWritableType as i have no control over the
implementation and the later provides no means for serialization. There are
couple of ideas to explore but those are not crunch related.

Thanks again for the helpful information.

Cheers, Chris

P.S. Between your post and today my son was born! Have a drink or two on
our behalf ;)


On Tue, Jun 10, 2014 at 5:26 PM, Josh Wills <jwills@cloudera.com> wrote:

> Hey Christian,
>
> Good to see you again, I hope all is well. This is a complex setup, but
> the good news is that we had to do it before for HBase 0.96, which also
> returns non-Writable values in an InputFormat. The code you're going to
> want to use as your reference is the HFileSource in crunch-hbase:
>
>
> https://github.com/apache/crunch/blob/master/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
>
> A few comments:
> 1) First, ignore the Void key; instead of trying to return a PTable,
> you're going to return a PCollection<NonWritableType> as the result of this
> source. The key to doing that is in the Converter implementation that is
> associated with the Source; look at the HBaseValueConverter (which is just
> a simple pass-through value converter) to see how we do it for HFileSource.
> The key thing to note in that converter is that the function
> applyPTypeTransforms() returns false; this means that when reading/writing
> data using that Converter, we don't apply the map functions from the PType
> associated with the source (which is the right thing to do here as well.)
> 2) I'm assuming that there is some hadoop Serialization that supports the
> non-Writable value type you're supporting that Hadoop has to be configured
> to read; be sure to override the configureSource() method in your Source to
> add those serializations to the Job configuration (again, see how it's done
> in HFileSource.configureSource)
> 3) Given all that, the PType for your non-writable class that is
> associated with the source should primarily be concerned with how to
> serialize it during a shuffle or a read/write from another input format
> (like Avro, or SequenceFile, or whatever), as we do in HBaseTypes. It won't
> actually be used for reading/writing from the custom input format.
>
> Hope that helps.
>
> J
>
>
> On Tue, Jun 10, 2014 at 2:42 AM, Christian Tzolov <
> christian.tzolov@gmail.com> wrote:
>
>> Hi all,
>>
>> I am trying to create a Crunch source for a custom InputFormat that has
>> structure like this: CustomInputFormat<Void, CustomNonWritableClass>
>>
>> I've tried two implementations with no success. I must be missing
>> something but not sure what?
>>
>> Implementation 1: Derive PType<Pair<Void, CustomNonWritableClass>> using
>> MapWritable as base type
>> ------------------------------------------------------------
>> --------------------------------------------------------
>>
>> PType<Pair<Void, CustomNonWritableClass>> derivedType =
>> typeFamily.derived(
>>    (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null,
>> null).getClass(),
>>    new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {public
>> Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}},
>>    new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {public
>> MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}},
>>    typeFamily.records(MapWritable.class)
>> );
>>
>> public class CustomDataSource extends FileTableSourceImpl<Void,
>> CustomNonWritableClass > {
>>
>>    public CustomDataSource() {
>>           super(new Path("xsource"),
>>           (PTableType<Void, CustomNonWritableClass >) derivedType),
>>           FormatBundle.forInput(CustomInputFormat.class));
>>    }
>>    ...
>> }
>>
>> This implementation fails before submitting the job with the following
>> error:
>>
>> Exception in thread "main" java.lang.ClassCastException:
>> org.apache.crunch.types.writable.WritableType cannot be cast to
>> org.apache.crunch.types.PTableType
>>       at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...)
>>
>>
>>
>> Implementation 2: Derive PType<CustomNonWritableClass> using MapWritable
>> as base type
>>
>> --------------------------------------------------------------------------------------------------------------------
>>
>> public static MapWritableToCustomNonWritableClass extends
>> MapFn<MapWritable, CustomNonWritableClass> {
>>       public CustomNonWritableClass map(MapWritable input) {...}
>> }
>> public static CustomNonWritableClassToMapWritable
>> extends MapFn<CustomNonWritableClass, MapWritable>() {
>>      public MapWritable map(CustomNonWritableClass input) {...}
>> }
>>
>> PType<CustomNonWritableClass> derivedType = typeFamily.derived(
>>    CustomNonWritableClass.class,
>>    new MapWritableToCustomNonWritableClass(),
>>    new CustomNonWritableClassToMapWritable(),
>>    typeFamily.records(MapWritable.class)
>> );
>>
>> public class CustomDataSource extends
>>  FileSourceImpl<CustomNonWritableClass> {
>>
>>    public CustomDataSource() {
>>        super(new Path("xsource"),
>>        (PTableType<Void, CustomNonWritableClass >) derivedType),
>>        FormatBundle.forInput(CustomInputFormat.class));
>>    }
>>    ...
>> }
>>
>> When run this gets submitted to the cluster but the MR job fails with:
>>
>> 2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290]
>> org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task:
>> attempt_1401786307497_0078_m_000000_0 - exited :
>> java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be
>> cast to org.apache.hadoop.io.MapWritable
>>      at
>> com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1)
>>      at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63)
>>      at org.apache.crunch.MapFn.process(MapFn.java:34)
>>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
>>      at
>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>>      at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>>
>> Thanks,
>> Christian
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Mime
View raw message