spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kevin Conaway (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD
Date Fri, 16 Mar 2018 23:11:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403098#comment-16403098
] 

Kevin Conaway commented on SPARK-16087:
---------------------------------------

[~srowen] I finally got around to digging in to this after two of my co-workers ran in to
the issue.

I believe that this issue is isolated to the _LocalBackend_ scheduler.  It [hardcodes|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala#L56] the
executor hostname to _localhost_ which is then used in [reviveOffers|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala#L83]

The count action on the unioned RDD gets split in two to types of tasks.  One group contains
tasks to compute the count of the rdd2.  The other group contains the cached results of the
count from rdd1.  Those tasks are _ResultTasks_ and the task [preferred locations|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L303] are
generated from the hadoop input split.  In my case, the input split location host was _127.0.0.1_ as
that what is reported from the name node.

When this task gets added as a [pending task|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L207] in
the _TaskSetManager_, it is added to the _pendingTasksForHost_ map which maps the host
name -> list of tasks.  So here, the key of the map would be _127.0.0.1_ and the list
of tasks would be all of the pending tasks for the host.

On the task scheduling side, _reviveOffers_ is called on the _LocalBackend._ This passes
in the list of _WorkerOffers_ (with the executor host hardcoded to _localhost_) to [TaskSchedulerImpl#resourceOffers|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L317] where
for each task set, it calculates the available locality levels for that task set.  In this
case, _NODE_LOCAL_ is added because the there is an executor alive on _localhost_ and there
are tasks pending for _localhost_ (as well as on _127.0.0.1)._  For each available locality
level, the call stack wends down to [TaskSetManager#resourceOffer|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L432] with
the host and locality.  Even though the _maxLocality_ may be _ANY_, [TaskSetManager#getAllowedLocalityLevel|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L542] will
force it _NODE_LOCAL_ because it sees that there are tasks waiting in the _pendingTasksForHost_
map.

_TaskSetManager#resourceOffer_ then calls _dequeueTask_ with the host/locality set to _localhost/NODE_LOCAL_. 
In dequeueTask, it tries to [find pending tasks|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L376] for _localhost_ but
there aren't any since they are actually keyed under _127.0.0.1_.  And thus, nothing happens.

The _localhost_ task location for the second group of tasks comes from the [BlockManager|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L187] which
gets the hostname from the [NettyBlockTransferService|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L110]. 
The _NettyBlockTransferService_ calls _Utils.localHostName_ to determine the current host. 
This is normally resolved to the the IP of whatever _localhost_ is, unless something calls _Utils.setCustomHostname_. 
It won't surprise you to learn that [Executor|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/executor/Executor.scala#L71] constructor
body does indeed call this method with the hostname passed to it.  As mentioned above, when
using the LocalBackend, the executor hostname is [hard coded to localhost|https://github.com/apache/spark/blob/v1.6.3/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala#L59]

Thats whats happening in 1.6.3. 

As of 2.3.0, SPARK-14437 changed the [NettyTransferService|https://github.com/apache/spark/blob/v2.3.0/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L49] to
use the [canonical ip address of localhost|https://github.com/apache/spark/blob/v2.3.0/core/src/main/scala/org/apache/spark/internal/config/package.scala#L282]. 
This binding happens before the Executor overwrites the host name.  Therefore, the tasks
in the task set will get a different ip address other than _localhost_ and _TaskSetManager#computeValidLocalityLevels_
will not add _NODE_LOCAL_ to the list of valid locality levels because none of the pending
task locations match _localhost_.

However, its trivial to force the issue by setting _spark.driver.host=localhost_ because
the underlying problem, as described above still exists.  This could be one refactor away
from occurring again.

Thoughts on this?  I'm not sure of the best way forward.  My initial thought was to modify _LocalBackend/LocalSchedulerBackend_
to add _WorkerOffer_s for all possible bindings of _localhost_ but I don't know if thats
the correct path

[~kayousterhout], any thoughts?  I see that you do a lot of work on the scheduler

> Spark Hangs When Using Union With Persisted Hadoop RDD
> ------------------------------------------------------
>
>                 Key: SPARK-16087
>                 URL: https://issues.apache.org/jira/browse/SPARK-16087
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.4.1, 1.6.1, 2.0.1
>            Reporter: Kevin Conaway
>            Priority: Critical
>         Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 2016-06-21 at
4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, part-00000, part-00001, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop sequence
file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
>     public static void main(String [] args) throws Exception {
>         JavaSparkContext sc = new JavaSparkContext(
>             new SparkConf()
>                 .set("spark.serializer", KryoSerializer.class.getName())
>                 .set("spark.master", "local[*]")
>                 .setAppName(SparkBug.class.getName())
>         );
>         JavaPairRDD<LongWritable, BytesWritable> rdd1 = sc.sequenceFile(
>            "hdfs://localhost:9000/part-00000",
>             LongWritable.class,
>             BytesWritable.class
>         ).mapToPair(new PairFunction<Tuple2<LongWritable, BytesWritable>, LongWritable,
BytesWritable>() {
>             @Override
>             public Tuple2<LongWritable, BytesWritable> call(Tuple2<LongWritable,
BytesWritable> tuple) throws Exception {
>                 return new Tuple2<>(
>                     new LongWritable(tuple._1.get()),
>                     new BytesWritable(tuple._2.copyBytes())
>                 );
>             }
>         }).persist(
>             StorageLevel.MEMORY_ONLY()
>         );
>         System.out.println("Before union: " + rdd1.count());
>         JavaPairRDD<LongWritable, BytesWritable> rdd2 = sc.sequenceFile(
>             "hdfs://localhost:9000/part-00001",
>             LongWritable.class,
>             BytesWritable.class
>         );
>         JavaPairRDD<LongWritable, BytesWritable> joined = rdd1.union(rdd2);
>         System.out.println("After union: " + joined.count());
>     }
> }
> {code}
> You'll need to upload the attached part-00000 and part-00001 to a local hdfs instance
(I'm just using a dummy [Single Node Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) before the
union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message