spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tobias Pfeiffer <...@preferred.jp>
Subject Re: Keep state inside map function
Date Thu, 31 Jul 2014 01:11:28 GMT
Hi,

On Thu, Jul 31, 2014 at 2:23 AM, Sean Owen <sowen@cloudera.com> wrote:
>
> ... you can run setup code before mapping a bunch of records, and
> after, like so:
>
> rdd.mapPartitions { partition =>
>    // Some setup code here
>    partition.map(yourfunction)
>    // Some cleanup code here
> }
>

Please be careful with that, it will not work as expected. First, it would
have to be

rdd.mapPartitions { partition =>
   // Some setup code here
   val result = partition.map(yourfunction)
   // Some cleanup code here
   result
}

because the function passed in to mapPartitions() needs to return an
Iterator, and if you do it like this, then the cleanup code will run
*before* the processing takes place because partition.map() is executed
lazily.

One example of what actually works is:

rdd.mapPartitions { partition =>
   if (!partition.isEmpty) {
     // Some setup code here
     partition.map(item => {
       val output = yourfunction(item)
       if (!partition.hasNext) {
         // Some cleanup code here
       }
       output
     })
   } else {
     // return an empty Iterator of your return type
   }
}

That is not very pretty, but it is the only way I found to actually get
tearDown code run after map() is run.

Tobias

Mime
View raw message