flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Fwd: Hadoop's Configuration object isn't threadsafe
Date Tue, 15 Jul 2014 08:11:53 GMT
Hi folks,

I forward this mail from the Spark dev list as it might be an issue for
Flink as well.
Flink also runs multiple parallel tasks in one JVM and allows to use Hadoop
InputFormats, OutputFormats, and soon also Map- and ReduceTasks.


---------- Forwarded message ----------
From: Andrew Ash <andrew@andrewash.com>
Date: 2014-07-15 7:22 GMT+02:00
Subject: Hadoop's Configuration object isn't threadsafe
To: dev@spark.apache.org


Hi Spark devs,

We discovered a very interesting bug in Spark at work last week in Spark
0.9.1 — that the way Spark uses the Hadoop Configuration object is prone to
thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
 Let me explain:


*Observations*

   - Was running a relatively simple job (read from Avro files, do a map,
   do another map, write back to Avro files)
   - 412 of 413 tasks completed, but the last task was hung in RUNNING state
   - The 412 successful tasks completed in median time 3.4s
   - The last hung task didn't finish even in 20 hours
   - The executor with the hung task was responsible for 100% of one core
   of CPU usage
   - Jstack of the executor attached (relevant thread pasted below)


*Diagnosis*

After doing some code spelunking, we determined the issue was concurrent
use of a Configuration object for each task on an executor.  In Hadoop each
task runs in its own JVM, but in Spark multiple tasks can run in the same
JVM, so the single-threaded access assumptions of the Configuration object
no longer hold in Spark.

The specific issue is that the AvroRecordReader actually _modifies_ the
JobConf it's given when it's instantiated!  It adds a key for the RPC
protocol engine in the process of connecting to the Hadoop FileSystem.
 When many tasks start at the same time (like at the start of a job), many
tasks are adding this configuration item to the one Configuration object at
once.  Internally Configuration uses a java.lang.HashMap, which isn't
threadsafe… The below post is an excellent explanation of what happens in
the situation where multiple threads insert into a HashMap at the same time.

http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html

The gist is that you have a thread following a cycle of linked list nodes
indefinitely.  This exactly matches our observations of the 100% CPU core
and also the final location in the stack trace.

So it seems the way Spark shares a Configuration object between task
threads in an executor is incorrect.  We need some way to prevent
concurrent access to a single Configuration object.


*Proposed fix*

We can clone the JobConf object in HadoopRDD.getJobConf() so each task gets
its own JobConf object (and thus Configuration object).  The optimization
of broadcasting the Configuration object across the cluster can remain, but
on the other side I think it needs to be cloned for each task to allow for
concurrent access.  I'm not sure the performance implications, but the
comments suggest that the Configuration object is ~10KB so I would expect a
clone on the object to be relatively speedy.

Has this been observed before?  Does my suggested fix make sense?  I'd be
happy to file a Jira ticket and continue discussion there for the right way
to fix.


Thanks!
Andrew


P.S.  For others seeing this issue, our temporary workaround is to enable
spark.speculation, which retries failed (or hung) tasks on other machines.



"Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000
nid=0x54b1 runnable [0x00007f92d74f1000]
   java.lang.Thread.State: RUNNABLE
    at java.util.HashMap.transfer(HashMap.java:601)
    at java.util.HashMap.resize(HashMap.java:581)
    at java.util.HashMap.addEntry(HashMap.java:879)
    at java.util.HashMap.put(HashMap.java:505)
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
    at
org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
    at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
    at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
    at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
    at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
    at
org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
    at
org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
    at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Mime
View raw message