flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Geoffrey Mon <geof...@gmail.com>
Subject Re: Issue with running Flink Python jobs on cluster
Date Sun, 17 Jul 2016 06:43:33 GMT
The Java program I used to test DistributedCache was faulty since it
actually created the cache from files on the machine on which the program
was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines
instead of virtual machines. I found the same error of the Python libraries
and plan file not being found in the temporary directory. Has anyone else
been able to successfully set up a Flink cluster to run Python jobs? I've
been beginning to suspect that there may be some issues with running Python
jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <geofbot@gmail.com> wrote:

> I wrote a simple Java plan that reads a file in the distributed cache and
> uses the first line from that file in a map operation. Sure enough, it
> works locally, but fails when the job is sent to a taskmanager on a worker
> node. Since DistributedCache seems to work for everyone else, I'm thinking
> that maybe some sort of file permissions are not properly set such that
> Flink is not able to successfully write distributed cache files.
>
> I used inotify-tools to watch the temporary files directory on both the
> master node and worker node. When the plan is being prepared, the
> jobmanager node wrote the Python modules and plan file to the temporary
> files directory. However, on the worker node, the directory tree was
> created, but the job failed before any of the module or plan files were
> even attempted to be written. Interestingly enough, there were no error
> messages or warnings about the cache.
>
> Cheers,
> Geoffrey
>
> On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <chesnay@apache.org>
> wrote:
>
>> Could you write a java job that uses the Distributed cache to distribute
>> files?
>>
>> If this fails then the DC is faulty, if it doesn't something in the
>> Python API is wrong.
>>
>>
>> On 15.07.2016 08:06, Geoffrey Mon wrote:
>>
>> I've come across similar issues when trying to set up Flink on Amazon EC2
>> instances. Presumably there is something wrong with my setup? Here is the
>> flink-conf.yaml I am using:
>>
>> https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml
>>
>> Thanks,
>> Geoffrey
>>
>> On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon < <geofbot@gmail.com>
>> geofbot@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Here is the TaskManager log on pastebin:
>>> http://pastebin.com/XAJ56gn4
>>>
>>> I will look into whether the files were created.
>>>
>>> By the way, the cluster is made with virtual machines running on
>>> BlueData EPIC. I don't know if that might be related to the problem.
>>>
>>> Thanks,
>>> Geoffrey
>>>
>>>
>>> On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <chesnay@apache.org>
>>> wrote:
>>>
>>>> Hello Geoffrey,
>>>>
>>>> How often does this occur?
>>>>
>>>> Flink distributes the user-code and the python library using the
>>>> Distributed Cache.
>>>>
>>>> Either the file is deleted right after being created for some reason,
>>>> or the DC returns a file name before the file was created (which shouldn't
>>>> happen, it should block it is available).
>>>>
>>>> If you are up to debugging this i would suggest looking into FileCache
>>>> class and verifying whether the file in question is in fact created.
>>>>
>>>> The logs of the TaskManager of which the exception occurs could be of
>>>> interest too; could you send them to me?
>>>>
>>>> Regards,
>>>> Chesnay
>>>>
>>>>
>>>> On 13.07.2016 04:11, Geoffrey Mon wrote:
>>>>
>>>> Hello all,
>>>>
>>>> I've set up Flink on a very small cluster of one master node and five
>>>> worker nodes, following the instructions in the documentation (
>>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
>>>> I can run the included examples like WordCount and PageRank across the
>>>> entire cluster, but when I try to run simple Python examples, I sometimes
>>>> get a strange error on the first PythonMapPartition about the temporary
>>>> folders that contain the streams of data between Python and Java.
>>>>
>>>> If I run jobs on only the taskmanager on the master node, Python
>>>> examples run fine. However, if the jobs use the worker nodes, then I get
>>>> the following error:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>> execution failed: Job execution failed.
>>>> at
>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
>>>> <snip>
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
>>>> <snip>
>>>> Caused by: java.lang.Exception: The user defined 'open()' method caused
>>>> an exception: External process for task MapPartition (PythonMap) terminated
>>>> prematurely.
>>>> python: can't open file
>>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
>>>> [Errno 2] No such file or directory
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>>>> <snip>
>>>> Caused by: java.lang.RuntimeException: External process for task
>>>> MapPartition (PythonMap) terminated prematurely.
>>>> python: can't open file
>>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
>>>> [Errno 2] No such file or directory
>>>> at
>>>> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
>>>> at
>>>> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
>>>> at
>>>> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
>>>> at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
>>>> ... 5 more
>>>>
>>>> I'm suspecting this issue has something to do with the data sending
>>>> between the master and the workers, but I haven't been able to find any
>>>> solutions. Presumably the temporary files weren't received properly and
>>>> thus were not created properly?
>>>>
>>>> Thanks in advance.
>>>>
>>>> Cheers,
>>>> Geoffrey
>>>>
>>>>
>>>>
>>
>>

Mime
View raw message