crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Vyas <jayunit...@gmail.com>
Subject Re: MemPipeline works, but running from files w/ MRPipeline fails.
Date Mon, 06 Jan 2014 13:19:37 GMT
Ah yes ! I suspected this :)...I was suspecting some object related funniness going on, which
is why I wrote the 2nd bullet (run while job from static main) and tested it :) ... However
It gets a different error .  


Seems to do essentially the same thing as you are suggesting, but results in a "java.lang.NoSuchMethodError:
org.apache.avro.mapred.AvroKey: method <init>()V not found" when trying to process the
lines:

https://github.com/jayunit100/bigpetstore/blob/d23ce6c7de2fcf94da76be4266a471c1b5de70ec/src/main/java/org/bigtop/bigpetstore/etl/CrunchETL.java

If Any thoughts on this let me know.  I've never seen this error before... It looks like it's
related to reflection but I'm not sure.

However one thing is clear: it definetly works on the mempipeline implementation, so probably
is related to the dofunc?

> On Jan 5, 2014, at 11:28 PM, Josh Wills <jwills@cloudera.com> wrote:
> 
> Yes! I'm in the midst of re-writing the user guide and am adding a whole section on this
issue. Mind if I cut and paste the content so far and get some feedback?
> 
> In contrast to MapReduce, Crunch uses Java serialization to serialize the contents of
all of the DoFns in a pipeline definition into the job.xml file used to configure the MapReduce
jobs that it executes. The abstract DoFn base class implements the `java.io.Serializable`
interface, which means that all of the member variables of a DoFn must be either serializable
or marked as `transient`. There is an [excellent overview of Java serializability](http://docs.oracle.com/javase/tutorial/jndi/objects/serial.html)
that is worth reviewing if you aren't familiar with it already.
> 
> If your DoFn needs to work with a class that does not implement Serializable and cannot
be modified (for example, because it is defined in a third-party library), you should use
the `transient` keyword on that member variable so that serializing the DoFn won't fail if
that object happens to be defined. You can create an instance of the object during runtime
using the `initialize` method described in the following section.
> 
> One place where the serializable DoFns can trip up new Crunch developers is when they
specify in-line DoFns inside of methods of non-serializable outer classes. Although their
pipelines compile fine and will execute locally with Crunch's MemPipeline, the MRPipeline
or SparkPipeline versions will fail with Java's `NotSerializableException`. Let's look at
the following example:
> 
> public class MyPipeline extends Configured implements Tool {
>   public static void main(String[] args) throws Exception {
>     ToolRunner.run(new Configuration(), new MyPipeline(), args);
>   }
> 
>   public int run(String[] args) throws Exception {
>     Pipeline pipeline = new MRPipeline(MyPipeline.class, getConf());
>     PCollection<String> lines = pipeline.readTextFile(args[0]);
> 
>     // This will throw a NotSerializableException!
>     PCollection<String> words = lines.parallelDo(new DoFn<String, String>()
{
>       @Override
>       public void process(String line, Emitter<String> emitter) {
>         for (String word : line.split("\\s+")) {
>           emitter.emit(word);
>         }
>       }
>     }, Writables.strings());
> 
>     words.count().write(To.textFile(args[1]));
> 
>     pipeline.done();
>   }
> }
> </pre>
> 
> Here, the inline DoFn that splits a line up into words is an inner class of `MyPipeline`.
Inner classes contain references to their parent outer classes, so unless MyPipeline implements
the Serializable interface, the NotSerializableException will be thrown when Crunch tries
to serialize the inner DoFn. The other way to solve this problem while still using inner classes
is to define the operations they perform inside of `static` methods on the class, e.g.:
> 
> <pre>
>   public static PCollection<String> splitLines(PCollection<String> lines)
{
>     // This will work fine because the DoFn is defined inside of a static method.
>     return lines.parallelDo(new DoFn<String, String>() {
>       @Override
>       public void process(String line, Emitter<String> emitter) {
>         for (String word : line.split("\\s+")) {
>           emitter.emit(word);
>         }
>       }
>     }, Writables.strings());
>   }
> </pre>
> 
> DoFn instances that are defined inline inside of static methods do not contain references
to their outer classes, and so can be serialized regardless of whether the outer class is
serializable or not. Using static methods to define your business logic in terms of a series
of DoFns can also make your code easier to test by using in-memory PCollection implementations
in your unit tests.
> 
> FIN
> 
> Hope that isn't too long and is helpful.
> 
> 
> 
>> On Sun, Jan 5, 2014 at 7:48 PM, Jay Vyas <jayunit100@gmail.com> wrote:
>> Hi again crunch! 
>> 
>> Well , although my MemPipeline implementations worked great.  But my CrunchETL.java
class is failing when i run it on a "real" file in my local filesystem:  Here are the two
ways that it breaks (link to the source is also at the end of this email).
>> 
>> 
>> CrunchETL e = new CrunchETL(input,output).numberOfTransactionsByState() results in
this error:
>> 
>> 1) Caused by: java.io.NotSerializableException: org.bigtop.bigpetstore.etl.CrunchETL
>>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
>>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
>>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
>>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
>>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
>> 
>> To make this easy to debug, i added a main method into the CrunchETL.java class,
however that method fails with a different exception:
>> 
>> 2) java.lang.NoSuchMethodError: org.apache.avro.mapred.AvroKey: method <init>()V
not found
>> 
>> ** The link to the class is below ** 
>> 
>> https://github.com/jayunit100/bigpetstore/blob/d23ce6c7de2fcf94da76be4266a471c1b5de70ec/src/main/java/org/bigtop/bigpetstore/etl/CrunchETL.java
>> 
>> Any thoughts on why i get these failures?
> 
> 
> 
> -- 
> Director of Data Science
> Cloudera
> Twitter: @josh_wills

Mime
View raw message