spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maciej Szymkiewicz (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-20347) Provide AsyncRDDActions in Python
Date Mon, 17 Apr 2017 10:22:41 GMT

    [ https://issues.apache.org/jira/browse/SPARK-20347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15970933#comment-15970933
] 

Maciej Szymkiewicz edited comment on SPARK-20347 at 4/17/17 10:22 AM:
----------------------------------------------------------------------

This is a nice idea but I wonder what would be the benefit over using {{concurrent.futures}}?
These work just fine with PySpark (at least in simple case) and the only overhead is creating
the executor. It is not like we have worry about GIL here.

For my  own internal usage I went with  monkey patch like this (plus some `SparkContext.stop`
patching):

{code}
from pyspark.rdd import RDD
from pyspark import SparkConf, SparkContext
from concurrent.futures import ThreadPoolExecutor


def _get_executor():
    sc = SparkContext.getOrCreate()
    if not hasattr(sc, "_thread_pool_executor"):
        max_workers = int(sc.getConf().get("spark.driver.cores") or 1)
        sc._thread_pool_executor = ThreadPoolExecutor(max_workers=max_workers)
    return sc._thread_pool_executor

def asyncCount(self):
    return _get_executor().submit(self.count)

def foreachAsync(self, f):
    return _get_executor().submit(self.foreach, f)

RDD.asyncCount = asyncCount
RDD.foreachAsync = foreachAsync

sc = SparkContext(master="local[*]", conf=SparkConf().set("spark.driver.cores", 3))
f = rdd.asyncCount()
{code}

One possible caveat is lack of direct legacy Python support but there is a 3rd party backport
and I will argue this beats implementing this from scratch.

Furthermore we get a solution which integrates with existing libraries and should require
a minimal maintenance. 



was (Author: zero323):
This is a nice idea but I wonder what would be the benefit over using {{concurrent.futures}}?
These work just fine with PySpark (at least in simple case) and the only overhead is creating
the executor. It is not like we have worry about GIL here.

For my  own internal usage I went with  monkey patch like this (plus some `SparkContext.stop`
patching):

{code}
from pyspark.rdd import RDD
from pyspark import SparkConf, SparkContext
from concurrent.futures import ThreadPoolExecutor


def _get_executor():
    sc = SparkContext.getOrCreate()
    if not hasattr(sc, "_thread_pool_executor"):
        max_workers = int(sc.getConf().get("spark.driver.cores") or 1)
        sc._thread_pool_executor = ThreadPoolExecutor(max_workers=max_workers)
    return sc._thread_pool_executor

def asyncCount(self):
    return _get_executor().submit(self.count)

def foreachAsync(self, f):
    return _get_executor().submit(self.foreach, f)

RDD.asyncCount = asyncCount
RDD.foreachAsync = foreachAsync

sc = SparkContext(master="local[*]", conf=SparkConf().set("spark.driver.cores", 3))
f = rdd.asyncCount()
{code}


> Provide AsyncRDDActions in Python
> ---------------------------------
>
>                 Key: SPARK-20347
>                 URL: https://issues.apache.org/jira/browse/SPARK-20347
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>    Affects Versions: 2.2.0
>            Reporter: holdenk
>            Priority: Minor
>
> In core Spark AsyncRDDActions allows people to perform non-blocking RDD actions. In Python
where threading & is a bit more involved there could be value in exposing this, the easiest
way might involve using the Py4J callback server on the driver.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message