crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Tzolov <christian.tzo...@gmail.com>
Subject Create custom Crunch source (CustomDataSource) for CustomInputFormat<Void, CustomNonWritableClass>
Date Tue, 10 Jun 2014 09:42:51 GMT
Hi all,

I am trying to create a Crunch source for a custom InputFormat that has
structure like this: CustomInputFormat<Void, CustomNonWritableClass>

I've tried two implementations with no success. I must be missing something
but not sure what?

Implementation 1: Derive PType<Pair<Void, CustomNonWritableClass>> using
MapWritable as base type
------------------------------------------------------------
--------------------------------------------------------

PType<Pair<Void, CustomNonWritableClass>> derivedType = typeFamily.derived(
   (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null,
null).getClass(),
   new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {public
Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}},
   new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {public
MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}},
   typeFamily.records(MapWritable.class)
);

public class CustomDataSource extends FileTableSourceImpl<Void,
CustomNonWritableClass > {

   public CustomDataSource() {
          super(new Path("xsource"),
          (PTableType<Void, CustomNonWritableClass >) derivedType),
          FormatBundle.forInput(CustomInputFormat.class));
   }
   ...
}

This implementation fails before submitting the job with the following
error:

Exception in thread "main" java.lang.ClassCastException:
org.apache.crunch.types.writable.WritableType cannot be cast to
org.apache.crunch.types.PTableType
      at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...)



Implementation 2: Derive PType<CustomNonWritableClass> using MapWritable as
base type
--------------------------------------------------------------------------------------------------------------------

public static MapWritableToCustomNonWritableClass extends
MapFn<MapWritable, CustomNonWritableClass> {
      public CustomNonWritableClass map(MapWritable input) {...}
}
public static CustomNonWritableClassToMapWritable
extends MapFn<CustomNonWritableClass, MapWritable>() {
     public MapWritable map(CustomNonWritableClass input) {...}
}

PType<CustomNonWritableClass> derivedType = typeFamily.derived(
   CustomNonWritableClass.class,
   new MapWritableToCustomNonWritableClass(),
   new CustomNonWritableClassToMapWritable(),
   typeFamily.records(MapWritable.class)
);

public class CustomDataSource extends
 FileSourceImpl<CustomNonWritableClass> {

   public CustomDataSource() {
       super(new Path("xsource"),
       (PTableType<Void, CustomNonWritableClass >) derivedType),
       FormatBundle.forInput(CustomInputFormat.class));
   }
   ...
}

When run this gets submitted to the cluster but the MR job fails with:

2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290]
org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task:
attempt_1401786307497_0078_m_000000_0 - exited :
java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be
cast to org.apache.hadoop.io.MapWritable
     at
com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1)
     at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63)
     at org.apache.crunch.MapFn.process(MapFn.java:34)
     at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
     at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
     at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)

Thanks,
Christian

Mime
View raw message