flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Issue with running Flink Python jobs on cluster
Date Wed, 13 Jul 2016 10:11:58 GMT
Hello Geoffrey,

How often does this occur?

Flink distributes the user-code and the python library using the 
Distributed Cache.

Either the file is deleted right after being created for some reason, or 
the DC returns a file name before the file was created (which shouldn't 
happen, it should block it is available).

If you are up to debugging this i would suggest looking into FileCache 
class and verifying whether the file in question is in fact created.

The logs of the TaskManager of which the exception occurs could be of 
interest too; could you send them to me?


On 13.07.2016 04:11, Geoffrey Mon wrote:
> Hello all,
> I've set up Flink on a very small cluster of one master node and five 
> worker nodes, following the instructions in the documentation 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). 
> I can run the included examples like WordCount and PageRank across the 
> entire cluster, but when I try to run simple Python examples, I 
> sometimes get a strange error on the first PythonMapPartition about 
> the temporary folders that contain the streams of data between Python 
> and Java.
> If I run jobs on only the taskmanager on the master node, Python 
> examples run fine. However, if the jobs use the worker nodes, then I 
> get the following error:
> org.apache.flink.client.program.ProgramInvocationException: The 
> program execution failed: Job execution failed.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
> <snip>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
> <snip>
> Caused by: java.lang.Exception: The user defined 'open()' method 
> caused an exception: External process for task MapPartition 
> (PythonMap) terminated prematurely.
> python: can't open file 
> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':

> [Errno 2] No such file or directory
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
> <snip>
> Caused by: java.lang.RuntimeException: External process for task 
> MapPartition (PythonMap) terminated prematurely.
> python: can't open file 
> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':

> [Errno 2] No such file or directory
> at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
> at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
> at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
> ... 5 more
> I'm suspecting this issue has something to do with the data sending 
> between the master and the workers, but I haven't been able to find 
> any solutions. Presumably the temporary files weren't received 
> properly and thus were not created properly?
> Thanks in advance.
> Cheers,
> Geoffrey

View raw message