flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Rohrmann (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10370) DistributedCache does not work in job cluster mode.
Date Thu, 20 Sep 2018 07:14:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16621599#comment-16621599
] 

Till Rohrmann commented on FLINK-10370:
---------------------------------------

This problem should only arise in the detached job-mode. In the attached job-mode we still
use the session mode underneath and everything should work.

In job-mode the main method is still executed on the client side. But what's no longer possible
is to upload a local file to the {{BlobServer}} (because it is not yet running). What we would
have to do in this case is to include the local files in the set of files which we upload
to HDFS in order to start the Yarn application. That would include the files in the classpath
of every instance, making them easily accessible.

For the container entrypoint, one needs to make sure that all required local files are stored
on the image.

> DistributedCache does not work in job cluster mode.
> ---------------------------------------------------
>
>                 Key: FLINK-10370
>                 URL: https://issues.apache.org/jira/browse/FLINK-10370
>             Project: Flink
>          Issue Type: Bug
>          Components: Cluster Management, Job-Submission
>    Affects Versions: 1.6.0
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>
> When using job cluster mode the client does not follow a standard submission path during
which {{DistributedCacheEntries}} are written into {{Configuration}}. Therefore the files
cannot be accessed in the job.
> How to reproduce:
> Simple job that uses {{DistributedCache}}:
> {code}
> public class DistributedCacheViaDfsTestProgram {
> 	public static void main(String[] args) throws Exception {
> 		final ParameterTool params = ParameterTool.fromArgs(args);
> 		final String inputFile = "hdfs://172.17.0.2:8020/home/hadoop-user/in";
> 		final String outputFile = "/tmp/out";
> 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		env.setParallelism(1);
> 		env.registerCachedFile(inputFile, "test_data", false);
> 		env.fromElements(1)
> 			.map(new TestMapFunction())
> 			.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE);
> 		env.execute("Distributed Cache Via Blob Test Program");
> 	}
> 	static class TestMapFunction extends RichMapFunction<Integer, String> {
> 		@Override
> 		public String map(Integer value) throws Exception {
> 			final Path testFile = getRuntimeContext().getDistributedCache().getFile("test_data").toPath();
> 			return Files.readAllLines(testFile)
> 				.stream()
> 				.collect(Collectors.joining("\n"));
> 		}
> 	}
> }
> {code}
> If one runs this program e.g. in yarn job cluster mode this will produce:
> {code}
> java.lang.IllegalArgumentException: File with name 'test_data' is not available. Did
you forget to register the file?
> 	at org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:110)
> 	at org.apache.flink.streaming.tests.DistributedCacheViaDfsTestProgram$TestMapFunction.map(DistributedCacheViaDfsTestProgram.java:59)
> 	at org.apache.flink.streaming.tests.DistributedCacheViaDfsTestProgram$TestMapFunction.map(DistributedCacheViaDfsTestProgram.java:55)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> 	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:164)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> This job will run fine though, if it will be submitted to yarn-session cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message