crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Create custom Crunch source (CustomDataSource) for CustomInputFormat<Void, CustomNonWritableClass>
Date Thu, 12 Jun 2014 18:01:38 GMT
On Thu, Jun 12, 2014 at 10:58 AM, Christian Tzolov <
christian.tzolov@gmail.com> wrote:

> Hi Josh,
>
> Good to see you too. Thanks for the HFileSource reference. The converter
> did the trick and i can read the data using the custom inputformat.
> Now I am struggling with the NonWritableType as i have no control over the
> implementation and the later provides no means for serialization. There are
> couple of ideas to explore but those are not crunch related.
>
> Thanks again for the helpful information.
>
> Cheers, Chris
>
> P.S. Between your post and today my son was born! Have a drink or two on
> our behalf ;)
>

Hey, that's awesome! Congratulations!!


>
>
> On Tue, Jun 10, 2014 at 5:26 PM, Josh Wills <jwills@cloudera.com> wrote:
>
>> Hey Christian,
>>
>> Good to see you again, I hope all is well. This is a complex setup, but
>> the good news is that we had to do it before for HBase 0.96, which also
>> returns non-Writable values in an InputFormat. The code you're going to
>> want to use as your reference is the HFileSource in crunch-hbase:
>>
>>
>> https://github.com/apache/crunch/blob/master/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
>>
>> A few comments:
>> 1) First, ignore the Void key; instead of trying to return a PTable,
>> you're going to return a PCollection<NonWritableType> as the result of this
>> source. The key to doing that is in the Converter implementation that is
>> associated with the Source; look at the HBaseValueConverter (which is just
>> a simple pass-through value converter) to see how we do it for HFileSource.
>> The key thing to note in that converter is that the function
>> applyPTypeTransforms() returns false; this means that when reading/writing
>> data using that Converter, we don't apply the map functions from the PType
>> associated with the source (which is the right thing to do here as well.)
>> 2) I'm assuming that there is some hadoop Serialization that supports the
>> non-Writable value type you're supporting that Hadoop has to be configured
>> to read; be sure to override the configureSource() method in your Source to
>> add those serializations to the Job configuration (again, see how it's done
>> in HFileSource.configureSource)
>> 3) Given all that, the PType for your non-writable class that is
>> associated with the source should primarily be concerned with how to
>> serialize it during a shuffle or a read/write from another input format
>> (like Avro, or SequenceFile, or whatever), as we do in HBaseTypes. It won't
>> actually be used for reading/writing from the custom input format.
>>
>> Hope that helps.
>>
>> J
>>
>>
>> On Tue, Jun 10, 2014 at 2:42 AM, Christian Tzolov <
>> christian.tzolov@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I am trying to create a Crunch source for a custom InputFormat that has
>>> structure like this: CustomInputFormat<Void, CustomNonWritableClass>
>>>
>>> I've tried two implementations with no success. I must be missing
>>> something but not sure what?
>>>
>>> Implementation 1: Derive PType<Pair<Void, CustomNonWritableClass>>
using
>>> MapWritable as base type
>>> ------------------------------------------------------------
>>> --------------------------------------------------------
>>>
>>> PType<Pair<Void, CustomNonWritableClass>> derivedType =
>>> typeFamily.derived(
>>>    (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null,
>>> null).getClass(),
>>>    new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {public
>>> Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}},
>>>    new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {public
>>> MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}},
>>>    typeFamily.records(MapWritable.class)
>>> );
>>>
>>> public class CustomDataSource extends FileTableSourceImpl<Void,
>>> CustomNonWritableClass > {
>>>
>>>    public CustomDataSource() {
>>>           super(new Path("xsource"),
>>>           (PTableType<Void, CustomNonWritableClass >) derivedType),
>>>           FormatBundle.forInput(CustomInputFormat.class));
>>>    }
>>>    ...
>>> }
>>>
>>> This implementation fails before submitting the job with the following
>>> error:
>>>
>>> Exception in thread "main" java.lang.ClassCastException:
>>> org.apache.crunch.types.writable.WritableType cannot be cast to
>>> org.apache.crunch.types.PTableType
>>>       at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...)
>>>
>>>
>>>
>>> Implementation 2: Derive PType<CustomNonWritableClass> using MapWritable
>>> as base type
>>>
>>> --------------------------------------------------------------------------------------------------------------------
>>>
>>> public static MapWritableToCustomNonWritableClass extends
>>> MapFn<MapWritable, CustomNonWritableClass> {
>>>       public CustomNonWritableClass map(MapWritable input) {...}
>>> }
>>> public static CustomNonWritableClassToMapWritable
>>> extends MapFn<CustomNonWritableClass, MapWritable>() {
>>>      public MapWritable map(CustomNonWritableClass input) {...}
>>> }
>>>
>>> PType<CustomNonWritableClass> derivedType = typeFamily.derived(
>>>    CustomNonWritableClass.class,
>>>    new MapWritableToCustomNonWritableClass(),
>>>    new CustomNonWritableClassToMapWritable(),
>>>    typeFamily.records(MapWritable.class)
>>> );
>>>
>>> public class CustomDataSource extends
>>>  FileSourceImpl<CustomNonWritableClass> {
>>>
>>>    public CustomDataSource() {
>>>        super(new Path("xsource"),
>>>        (PTableType<Void, CustomNonWritableClass >) derivedType),
>>>        FormatBundle.forInput(CustomInputFormat.class));
>>>    }
>>>    ...
>>> }
>>>
>>> When run this gets submitted to the cluster but the MR job fails with:
>>>
>>> 2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290]
>>> org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task:
>>> attempt_1401786307497_0078_m_000000_0 - exited :
>>> java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be
>>> cast to org.apache.hadoop.io.MapWritable
>>>      at
>>> com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1)
>>>      at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63)
>>>      at org.apache.crunch.MapFn.process(MapFn.java:34)
>>>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>>>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
>>>      at
>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>>>      at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>>>
>>> Thanks,
>>> Christian
>>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message