crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Tzolov <christian.tzo...@gmail.com>
Subject New data source XDataSource for CustomInputFormat<Void, CustomNonWritableClass>
Date Tue, 10 Jun 2014 08:45:37 GMT
Hi all,

I am trying to create a Crunch data source for a custom InputFormat that
has this structure  CustomInputFormat<Void, CustomNonWritableClass>

Sorry for the long mail. I've tried two implementations with no success. I
must be missing something but not sure where?



1. Implementation: Derive PType<Pair<Void, CustomNonWritableClass>> using
MapWritable as base type
----------------------------------------------------------------------------------------------------------------------------------------
PType<Pair<Void, CustomNonWritableClass>> derivedType = typeFamily.derived(

   (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null,
null).getClass(),

   new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {
      public Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}
   },

   new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {
      public MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}
   },

   typeFamily.records(MapWritable.class)
);


public class CustomDataSource extends FileTableSourceImpl<Void,
CustomNonWritableClass > {

   public CustomDataSource() {
      super(new Path("xsource"),

          (PTableType<Void, CustomNonWritableClass >) derivedType),

          FormatBundle.forInput(CustomInputFormat.class));
   }
   ...
}


When I try this implementation it fails before submitting the job with the
following error:

   Exception in thread "main" java.lang.ClassCastException:
org.apache.crunch.types.writable.WritableType cannot be cast to
org.apache.crunch.types.PTableType
      at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...)


2. Implementation: Derive PType<CustomNonWritableClass> using MapWritable
as base type
----------------------------------------------------------------------------------------------------------------------------------------

public static MapWritableToCustomNonWritableClass extends
MapFn<MapWritable, CustomNonWritableClass> {
      public CustomNonWritableClass map(MapWritable input) {...}
}

public static CustomNonWritableClassToMapWritable
extends MapFn<CustomNonWritableClass, MapWritable>() {
     public MapWritable map(CustomNonWritableClass input) {...}
}

PType<CustomNonWritableClass> derivedType = typeFamily.derived(

   CustomNonWritableClass.class,

   new MapWritableToCustomNonWritableClass(),

   new CustomNonWritableClassToMapWritable(),

   typeFamily.records(MapWritable.class)
);

public class CustomDataSource extends
 FileSourceImpl<CustomNonWritableClass> {

   public CustomDataSource() {
      super(new Path("xsource"),

          (PTableType<Void, CustomNonWritableClass >) derivedType),

          FormatBundle.forInput(CustomInputFormat.class));
   }
   ...
}

When run this gets submitted to the cluster, starts the MR job but
eventually fails with:

2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290]
org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task:
attempt_1401786307497_0078_m_000000_0 - exited :
java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass
cannot be cast to org.apache.hadoop.io.MapWritable
	at com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1)
	at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63)
	at org.apache.crunch.MapFn.process(MapFn.java:34)
	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
	at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
	at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)



Thanks,
Christian

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message