spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Imran Rashid (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-18890) Do all task serialization in CoarseGrainedExecutorBackend thread (rather than TaskSchedulerImpl)
Date Fri, 06 Jan 2017 17:40:59 GMT

    [ https://issues.apache.org/jira/browse/SPARK-18890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15805087#comment-15805087
] 

Imran Rashid commented on SPARK-18890:
--------------------------------------

[~gq] I think you misunderstood my suggestion about using a broadcast for the task.  I'm not
suggesting using a broadcast to contain *all* the task information, only the information which
is shared across all tasks in a taskset. eg., the preferred location is ignored on the executor,
so we wouldn't even bother serializing it either.  Conceptually, this means we'd have new
classes  specially for sending the minimal necessary data to the executor, like:

{code}
/**
  * metadata about the taskset needed by the executor for all tasks in this taskset.  Subset
of the
  * full data kept on the driver to make it faster to serialize and send to executors.
  */
class ExecutorTaskSetMeta(
  val stageId: Int,
  val stageAttemptId: Int,
  val properties: Properties,
  val addedFiles: Map[String, String],
  val addedJars: Map[String, String]
  // maybe task metrics here?
)

class ExecutorTaskData(
  val partitionId: Int,
  val attemptNumber: Int,
  val taskId: Long,
  val taskBinary: Broadcast[Array[Byte]],
  val taskSetMeta: Broadcast[ExecutorTaskSetMeta]
)
{code}

Then all the info you'd need to send to the executors would be a serialized version of ExecutorTaskData.
 Furthermore, given the simplicity of that class, you could serialize manually, and then for
each task you could just modify the first two ints & one long directly in the byte buffer.
 (You could do the same trick for serialization even if ExecutorTaskSetMeta was not a broadcast,
but that will keep the msgs small as well.)

There a bunch of details I'm skipping here: you'd also need to do some special handling for
the TaskMetrics; the way tasks get started in the executor would change; you'd also need to
refactor {{Task}} to let it get reconstructed from this information (or add more to ExecutorTaskSetMeta);
and probably other details I'm overlooking now.

But if we really see task serialization as an issue, this seems like the right approach.

> Do all task serialization in CoarseGrainedExecutorBackend thread (rather than TaskSchedulerImpl)
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18890
>                 URL: https://issues.apache.org/jira/browse/SPARK-18890
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Kay Ousterhout
>            Priority: Minor
>
>  As part of benchmarking this change: https://github.com/apache/spark/pull/15505 and
alternatives, [~shivaram] and I found that moving task serialization from TaskSetManager (which
happens as part of the TaskSchedulerImpl's thread) to CoarseGranedSchedulerBackend leads to
approximately a 10% reduction in job runtime for a job that counted 10,000 partitions (that
each had 1 int) using 20 machines.  Similar performance improvements were reported in the
pull request linked above.  This would appear to be because the TaskSchedulerImpl thread is
the bottleneck, so moving serialization to CGSB reduces runtime.  This change may *not* improve
runtime (and could potentially worsen runtime) in scenarios where the CGSB thread is the bottleneck
(e.g., if tasks are very large, so calling launch to send the tasks to the executor blocks
on the network).
> One benefit of implementing this change is that it makes it easier to parallelize the
serialization of tasks (different tasks could be serialized by different threads).  Another
benefit is that all of the serialization occurs in the same place (currently, the Task is
serialized in TaskSetManager, and the TaskDescription is serialized in CGSB).
> I'm not totally convinced we should fix this because it seems like there are better ways
of reducing the serialization time (e.g., by re-using a single serialized object with the
Task/jars/files and broadcasting it for each stage) but I wanted to open this JIRA to document
the discussion.
> cc [~witgo]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message