crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Ortiz <dor...@videologygroup.com>
Subject RE: Weird error writing a collection of Strings
Date Thu, 19 Feb 2015 18:16:42 GMT
Yes.  Changed it to

fteStrings.filter(FilterFns.<String>ACCEPT_ALL()).write(To.textFile(“/tmp/fte/fteRaw”),
WriteMode.OVERWRITE);

and it is running now.

Thanks!

From: Josh Wills [mailto:jwills@cloudera.com]
Sent: Thursday, February 19, 2015 1:07 PM
To: user@crunch.apache.org
Subject: Re: Weird error writing a collection of Strings

Yeah, what I mean is that this some other part of the pipeline logic is getting conflated
with the trivial output of this PCollection and is causing some sort of problem. My first
guess is that there is a bug in handling PCollections that are simultaneously inputs and outputs
of a pipeline, like fteStrings. If you add a trivial identity transform, e.g.,

fteStrings.filter(FilterFns.ACCEPT_ALL()).write(To.textFile("/tmp/fte/fteRaw"), WriteMode.OVERWRITE);

Does it work?

On Thu, Feb 19, 2015 at 9:57 AM, David Ortiz <dortiz@videologygroup.com<mailto:dortiz@videologygroup.com>>
wrote:
There is plenty more pipeline logic around.  That snippet is the entirety of that PCollection’s
usage though.

From: Josh Wills [mailto:jwills@cloudera.com<mailto:jwills@cloudera.com>]
Sent: Thursday, February 19, 2015 12:55 PM
To: user@crunch.apache.org<mailto:user@crunch.apache.org>
Subject: Re: Weird error writing a collection of Strings

The likely cause of that exception is trying to write a PGroupedTable out to disk directly
w/o ungrouping it first, but I don't see any GBK operations in the snippet you sent-- I assume
there's more pipeline logic around?

J

On Thu, Feb 19, 2015 at 9:49 AM, David Ortiz <dortiz@videologygroup.com<mailto:dortiz@videologygroup.com>>
wrote:
Anyone have any idea why the following:

RecordDeserializer<FteRecordRaw> fteSplit = new RecordDeserializer<>(
                           FteRecordRaw.class, new FteEntry().getSchema());
PCollection<String> fteStrings = pipe.read(From.textFile(fteIn, strings()));
PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
                                        records(FteRecordRaw.class));
fteStrings.write(To.textFile("/tmp/fte/fteRaw"), WriteMode.OVERWRITE);
fte.parallelDo(new RecordToDelimitedString<FteRecordRaw>(),
strings()).write(To.textFile("/tmp/fte/fteIn"), WriteMode.OVERWRITE);

Would yield

Exception in thread "main" java.lang.ClassCastException: org.apache.crunch.types.avro.AvroType
cannot be cast to org.apache.crunch.types.PGroupedTableType
        at org.apache.crunch.impl.mr.plan.DoNode.toRTNode(DoNode.java:144)
        at org.apache.crunch.impl.mr.plan.JobPrototype.serialize(JobPrototype.java:243)
        at org.apache.crunch.impl.mr.plan.JobPrototype.build(JobPrototype.java:188)
        at org.apache.crunch.impl.mr.plan.JobPrototype.getCrunchJob(JobPrototype.java:134)
        at org.apache.crunch.impl.mr.plan.MSCRPlanner.plan(MSCRPlanner.java:200)
        at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:111)
        at org.apache.crunch.impl.mr.MRPipeline.runAsync(MRPipeline.java:132)
        at org.apache.crunch.impl.mr.MRPipeline.run(MRPipeline.java:120)
        at org.apache.crunch.impl.dist.DistributedPipeline.done(DistributedPipeline.java:119)
        at com.videologygroup.crunch.FteWarehouse.run(FteWarehouse.java:212)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at com.videologygroup.crunch.FteWarehouse.main(FteWarehouse.java:339)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

But when I change it to

RecordDeserializer<FteRecordRaw> fteSplit = new RecordDeserializer<>(
                           FteRecordRaw.class, new FteEntry().getSchema());
PCollection<String> fteStrings = pipe.read(From.textFile(fteIn, strings()));
PCollection<FteRecordRaw> fte = fteStrings.parallelDo(fteSplit,
                                        records(FteRecordRaw.class));
//fteStrings.write(To.textFile("/tmp/fte/fteRaw"), WriteMode.OVERWRITE);
fte.parallelDo(new RecordToDelimitedString<FteRecordRaw>(),
strings()).write(To.textFile("/tmp/fte/fteIn"), WriteMode.OVERWRITE);

It runs (minus writing out that collection for debugging purposes)?
This email is intended only for the use of the individual(s) to whom it is addressed. If you
have received this communication in error, please immediately notify the sender and delete
the original email.



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>
This email is intended only for the use of the individual(s) to whom it is addressed. If you
have received this communication in error, please immediately notify the sender and delete
the original email.



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>
This email is intended only for the use of the individual(s) to whom it is addressed. If you
have received this communication in error, please immediately notify the sender and delete
the original email.
Mime
View raw message