flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chesnay Schepler (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3203) Python API crashing when run in OGS
Date Tue, 05 Jan 2016 17:21:39 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15083394#comment-15083394
] 

Chesnay Schepler commented on FLINK-3203:
-----------------------------------------

The following test contains a program that uses the distributed cache:

https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest.java

The relevant code parts:

{code:java}
public static void main(String[] args) {
	String textPath = args[0];
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	env.registerCachedFile(textPath, "cache_test");

	List<Tuple1<String>> result = env
		.readTextFile(textPath)
		.flatMap(new WordChecker())
		.collect();
}

public static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>>
{
	private static final long serialVersionUID = 1L;
	private final Set<String> wordList = new HashSet<>();
	@Override
	public void open(Configuration conf) throws FileNotFoundException, IOException {
		File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
		BufferedReader reader = new BufferedReader(new FileReader(file));
		String tempString;
		while ((tempString = reader.readLine()) != null) {
			wordList.add(tempString);
		}
		reader.close();
	}
	@Override
	public void flatMap(String word, Collector<Tuple1<String>> out) throws Exception
{
		if (wordList.contains(word)) {
			out.collect(new Tuple1<>(word));
		}
	}
}
{code}

The Python API is built on top the Java API, and thus makes use of several of it's features.
One of these is the Distributed Cache, which is used to distribute the Flink Python Library
and the user python plan file across the cluster (note that this is also done when running
locally). Right now I'm trying to see whether the Distributed Cache itself is the problem
(which will be the case if this program fails), or the way the Python API uses it.

> Python API crashing when run in OGS
> -----------------------------------
>
>                 Key: FLINK-3203
>                 URL: https://issues.apache.org/jira/browse/FLINK-3203
>             Project: Flink
>          Issue Type: Bug
>          Components: Python API
>    Affects Versions: 0.10.0
>         Environment: Rocks 6.1 SP1, CentOS release 6.7 (2.6.32-573.7.1.el6.x86_64), java/oraclejdk/1.8.0_45,
Python 2.6.6
>            Reporter: Omar Alvarez
>
> When trying to execute the Python example without HDFS, the FlatMap fails with the following
error:
> {code:title=PyExample|borderStyle=solid}
> 01/05/2016 13:09:38     Job execution switched to status RUNNING.
> 01/05/2016 13:09:38     DataSource (ValueSource)(1/1) switched to SCHEDULED
> 01/05/2016 13:09:38     DataSource (ValueSource)(1/1) switched to DEPLOYING
> 01/05/2016 13:09:38     DataSource (ValueSource)(1/1) switched to RUNNING
> 01/05/2016 13:09:38     MapPartition (PythonFlatMap -> PythonCombine)(1/1) switched
to SCHEDULED
> 01/05/2016 13:09:38     MapPartition (PythonFlatMap -> PythonCombine)(1/1) switched
to DEPLOYING
> 01/05/2016 13:09:38     DataSource (ValueSource)(1/1) switched to FINISHED
> 01/05/2016 13:09:38     MapPartition (PythonFlatMap -> PythonCombine)(1/1) switched
to RUNNING
> 01/05/2016 13:09:38     MapPartition (PythonFlatMap -> PythonCombine)(1/1) switched
to FAILED
> java.lang.Exception: The user defined 'open()' method caused an exception: An error occurred
while copying the file.
>         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484)
>         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: An error occurred while copying the file.
>         at org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>        	at org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.startPython(PythonStreamer.java:68)
>        	at org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.setupProcess(PythonStreamer.java:58)
>        	at org.apache.flink.languagebinding.api.java.common.streaming.Streamer.open(Streamer.java:67)
>        	at org.apache.flink.languagebinding.api.java.python.functions.PythonMapPartition.open(PythonMapPartition.java:47)
>        	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>        	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>        	... 3 more
> Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not exist or the
user running Flink ('omar.alvarez') has insufficient permissions to access it.
>        	at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:107)
>        	at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
>        	at org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
>        	at org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:306)
>        	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>        	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>        	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>        	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>        	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>        	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>        	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>        	... 1 more
> {code}
> It is important to mention that I am using modified Flink cluster launch scripts to use
the OGS engine. The modified scripts and usage case can be found in https://github.com/omaralvarez/flink-OGS-GE.
> The same example in the Java API works correctly, and the user has sufficient permissions
to write the file. If use interactive nodes instead of the qsub command to run the example
it does not fail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message