spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Leif Walsh (JIRA)" <>
Subject [jira] [Commented] (SPARK-22340) pyspark setJobGroup doesn't match java threads
Date Tue, 24 Oct 2017 22:47:00 GMT


Leif Walsh commented on SPARK-22340:

Ok, this is fairly straightforward.  The problem is that from the Python side, {{setJobGroup}}
isn't thread-local, it's global.  Here is a tight reproducer:

import concurrent.futures
import threading
import time
executor = concurrent.futures.ThreadPoolExecutor()

latch_1 = threading.Event()
latch_2 = threading.Event()

def wait(x):
    return x

def multiple_job_groups():
    sc.setJobGroup('imajobgroup', 'helloitme')
    groups = []
    sc.parallelize([1, 1]).map(wait).collect()
    sc.parallelize([1, 1]).map(wait).collect()
    return groups

def another_job_group():
    sc.setJobGroup('another', 'itnotme')
    sc.parallelize([1, 1]).map(wait).collect()

future_1 = executor.submit(multiple_job_groups)
future_2 = executor.submit(another_job_group)

The result is that {{another_job_group}} modifies the local property in between the first
and second executions of {{multiple_job_groups}}'s jobs, and we get this result:

['imajobgroup', 'another', 'another']

I think I can "solve" this by wrapping {{SparkContext}} with a lock (to sequence the execution
of {{setJobGroup}} and something in py4j that will release the lock during JVM execution,
which feels Very Dangerous.

Would greatly appreciate it if we could do something to really solve this inside pyspark,
but will attempt the Dangerous on my side for now.

> pyspark setJobGroup doesn't match java threads
> ----------------------------------------------
>                 Key: SPARK-22340
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.0.2
>            Reporter: Leif Walsh
> With pyspark, {{sc.setJobGroup}}'s documentation says
> {quote}
> Assigns a group ID to all the jobs started by this thread until the group ID is set to
a different value or cleared.
> {quote}
> However, this doesn't appear to be associated with Python threads, only with Java threads.
 As such, a Python thread which calls this and then submits multiple jobs doesn't necessarily
get its jobs associated with any particular spark job group.  For example:
> {code}
> def run_jobs():
>     sc.setJobGroup('hello', 'hello jobs')
>     x = sc.range(100).sum()
>     y = sc.range(1000).sum()
>     return x, y
> import concurrent.futures
> with concurrent.futures.ThreadPoolExecutor() as executor:
>     future = executor.submit(run_jobs)
>     sc.cancelJobGroup('hello')
>     future.result()
> {code}
> In this example, depending how the action calls on the Python side are allocated to Java
threads, the jobs for {{x}} and {{y}} won't necessarily be assigned the job group {{hello}}.
> First, we should clarify the docs if this truly is the case.
> Second, it would be really helpful if we could make the job group assignment reliable
for a Python thread, though I’m not sure the best way to do this.  As it stands, job groups
are pretty useless from the pyspark side, if we can't rely on this fact.
> My only idea so far is to mimic the TLS behavior on the Python side and then patch every
point where job submission may take place to pass that in, but this feels pretty brittle.
In my experience with py4j, controlling threading there is a challenge. 

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message