crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: MemPipeline works, but running from files w/ MRPipeline fails.
Date Mon, 06 Jan 2014 04:28:05 GMT
Yes! I'm in the midst of re-writing the user guide and am adding a whole
section on this issue. Mind if I cut and paste the content so far and get
some feedback?

In contrast to MapReduce, Crunch uses Java serialization to serialize the
contents of all of the DoFns in a pipeline definition into the job.xml file
used to configure the MapReduce jobs that it executes. The abstract DoFn
base class implements the `java.io.Serializable` interface, which means
that all of the member variables of a DoFn must be either serializable or
marked as `transient`. There is an [excellent overview of Java
serializability](
http://docs.oracle.com/javase/tutorial/jndi/objects/serial.html) that is
worth reviewing if you aren't familiar with it already.

If your DoFn needs to work with a class that does not implement
Serializable and cannot be modified (for example, because it is defined in
a third-party library), you should use the `transient` keyword on that
member variable so that serializing the DoFn won't fail if that object
happens to be defined. You can create an instance of the object during
runtime using the `initialize` method described in the following section.

One place where the serializable DoFns can trip up new Crunch developers is
when they specify in-line DoFns inside of methods of non-serializable outer
classes. Although their pipelines compile fine and will execute locally
with Crunch's MemPipeline, the MRPipeline or SparkPipeline versions will
fail with Java's `NotSerializableException`. Let's look at the following
example:

public class MyPipeline extends Configured implements Tool {
  public static void main(String[] args) throws Exception {
    ToolRunner.run(new Configuration(), new MyPipeline(), args);
  }

  public int run(String[] args) throws Exception {
    Pipeline pipeline = new MRPipeline(MyPipeline.class, getConf());
    PCollection<String> lines = pipeline.readTextFile(args[0]);

    // This will throw a NotSerializableException!
    PCollection<String> words = lines.parallelDo(new DoFn<String, String>()
{
      @Override
      public void process(String line, Emitter<String> emitter) {
        for (String word : line.split("\\s+")) {
          emitter.emit(word);
        }
      }
    }, Writables.strings());

    words.count().write(To.textFile(args[1]));

    pipeline.done();
  }
}
</pre>

Here, the inline DoFn that splits a line up into words is an inner class of
`MyPipeline`. Inner classes contain references to their parent outer
classes, so unless MyPipeline implements the Serializable interface, the
NotSerializableException will be thrown when Crunch tries to serialize the
inner DoFn. The other way to solve this problem while still using inner
classes is to define the operations they perform inside of `static` methods
on the class, e.g.:

<pre>
  public static PCollection<String> splitLines(PCollection<String> lines) {
    // This will work fine because the DoFn is defined inside of a static
method.
    return lines.parallelDo(new DoFn<String, String>() {
      @Override
      public void process(String line, Emitter<String> emitter) {
        for (String word : line.split("\\s+")) {
          emitter.emit(word);
        }
      }
    }, Writables.strings());
  }
</pre>

DoFn instances that are defined inline inside of static methods do not
contain references to their outer classes, and so can be serialized
regardless of whether the outer class is serializable or not. Using static
methods to define your business logic in terms of a series of DoFns can
also make your code easier to test by using in-memory PCollection
implementations in your unit tests.

FIN

Hope that isn't too long and is helpful.



On Sun, Jan 5, 2014 at 7:48 PM, Jay Vyas <jayunit100@gmail.com> wrote:

> Hi again crunch!
>
> Well , although my MemPipeline implementations worked great.  But my
> CrunchETL.java class is failing when i run it on a "real" file in my local
> filesystem:  Here are the two ways that it breaks (link to the source is
> also at the end of this email).
>
>
> CrunchETL e = new CrunchETL(input,output).numberOfTransactionsByState()
> results in this error:
>
> 1) Caused by: java.io.NotSerializableException:
> org.bigtop.bigpetstore.etl.CrunchETL
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
>
> To make this easy to debug, i added a main method into the CrunchETL.java
> class, however that method fails with a different exception:
>
> 2) java.lang.NoSuchMethodError: org.apache.avro.mapred.AvroKey: method
> <init>()V not found
>
> ** The link to the class is below **
>
>
> https://github.com/jayunit100/bigpetstore/blob/d23ce6c7de2fcf94da76be4266a471c1b5de70ec/src/main/java/org/bigtop/bigpetstore/etl/CrunchETL.java
>
> Any thoughts on why i get these failures?
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message