incubator-crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kiyan Ahmadizadeh (JIRA)" <>
Subject [jira] [Commented] (CRUNCH-73) Scrunch applications using PipelineApp do not properly serialize closures to MapReduce tasks.
Date Fri, 21 Sep 2012 20:59:08 GMT


Kiyan Ahmadizadeh commented on CRUNCH-73:

The problem is that PipelineApp extends the Scala trait DelayedInit, which uses final static
references in its implementation.  

Scoobi has hit this problem as well.  See this note in their user guide:

Seeing as the ability to transfer side data in function closures is a big usability win, I
propose changing PipelineApp to not use the DelayedInit trait.  Instead, clients should override
a method 

run(args: Array[String]): Unit

to implement their pipeline logic.  While this creates a need for a touch more boilerplate
code when authoring a PipelineApp, I think the value of sending side data through closures
outweighs this.  

I've written a patch and verified it works using a pipelineapp I've been developing.  I'm
attaching the first version for review.  I'll attach another version shortly that includes
a test.
> Scrunch applications using PipelineApp do not properly serialize closures to MapReduce
> ---------------------------------------------------------------------------------------------
>                 Key: CRUNCH-73
>                 URL:
>             Project: Crunch
>          Issue Type: Bug
>          Components: Scrunch
>    Affects Versions: 0.4.0
>            Reporter: Kiyan Ahmadizadeh
>            Assignee: Kiyan Ahmadizadeh
> One of the great potential advantages of using Scala for writing MapReduce pipelines
is the ability to send side data as part of function closures, rather than through Hadoop
Configurations or the Distributed Cache.  As an absurdly simple example, consider the following
Scala PipelineApp that divides all elements of a numeric PCollection by an arbitrary argument:
> object DivideApp extends PipelineApp {
>   val divisor = Integer.valueOf(args(0))
>   val nums = read(From.textFile("numbers.txt"))
>   val dividedNums = { n => n / divisor }
>   dividedNums.write(To.textFile("dividedNums"))
>   run()
> }
> Executing this PipelineApp fails.  MapReduce tasks get a value of "null" for divisor
(or 0 if divisor is forced to be a primitive numeric type).  This indicates that an error
is occurring in the serialization of Scala function closures that causes unbound variables
in the closure to take on their default JVM values.  

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see:

View raw message