crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Weird error writing a collection of Strings
Date Thu, 19 Feb 2015 17:54:55 GMT
The likely cause of that exception is trying to write a PGroupedTable out
to disk directly w/o ungrouping it first, but I don't see any GBK
operations in the snippet you sent-- I assume there's more pipeline logic
around?

J

On Thu, Feb 19, 2015 at 9:49 AM, David Ortiz <dortiz@videologygroup.com>
wrote:

>  Anyone have any idea why the following:
>
>
>
> RecordDeserializer<FteRecordRaw> fteSplit = *new* RecordDeserializer<>(
>
>                            FteRecordRaw.*class*, *new*
> FteEntry().getSchema());
>
> PCollection<String> fteStrings = pipe.read(From.*textFile*(fteIn,
> *strings*()));
>
> PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
>
>                                         *records*(FteRecordRaw.*class*));
>
> fteStrings.write(To.textFile("/*tmp*/*fte*/fteRaw"), WriteMode.OVERWRITE);
>
> fte.parallelDo(*new* RecordToDelimitedString<FteRecordRaw>(),
>
> *strings*()).write(To.*textFile*("/tmp/fte/fteIn"), WriteMode.*OVERWRITE*
> );
>
>
>
> Would yield
>
>
>
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.crunch.types.avro.AvroType cannot be cast to
> org.apache.crunch.types.PGroupedTableType
>
>         at org.apache.crunch.impl.mr.plan.DoNode.toRTNode(DoNode.java:144)
>
>         at
> org.apache.crunch.impl.mr.plan.JobPrototype.serialize(JobPrototype.java:243)
>
>         at
> org.apache.crunch.impl.mr.plan.JobPrototype.build(JobPrototype.java:188)
>
>         at
> org.apache.crunch.impl.mr.plan.JobPrototype.getCrunchJob(JobPrototype.java:134)
>
>         at
> org.apache.crunch.impl.mr.plan.MSCRPlanner.plan(MSCRPlanner.java:200)
>
>         at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:111)
>
>         at
> org.apache.crunch.impl.mr.MRPipeline.runAsync(MRPipeline.java:132)
>
>         at org.apache.crunch.impl.mr.MRPipeline.run(MRPipeline.java:120)
>
>         at
> org.apache.crunch.impl.dist.DistributedPipeline.done(DistributedPipeline.java:119)
>
>         at
> com.videologygroup.crunch.FteWarehouse.run(FteWarehouse.java:212)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
>
>         at
> com.videologygroup.crunch.FteWarehouse.main(FteWarehouse.java:339)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:606)
>
>         at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
>
>
>
> But when I change it to
>
>
>
> RecordDeserializer<FteRecordRaw> fteSplit = *new* RecordDeserializer<>(
>
>                            FteRecordRaw.*class*, *new*
> FteEntry().getSchema());
>
> PCollection<String> fteStrings = pipe.read(From.*textFile*(fteIn,
> *strings*()));
>
> PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
>
>                                         *records*(FteRecordRaw.*class*));
>
> //fteStrings.write(To.textFile("/*tmp*/*fte*/fteRaw"),
> WriteMode.OVERWRITE);
>
> fte.parallelDo(*new* RecordToDelimitedString<FteRecordRaw>(),
>
> *strings*()).write(To.*textFile*("/tmp/fte/fteIn"), WriteMode.*OVERWRITE*
> );
>
>
>
> It runs (minus writing out that collection for debugging purposes)?
>  *This email is intended only for the use of the individual(s) to whom it
> is addressed. If you have received this communication in error, please
> immediately notify the sender and delete the original email.*
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message