flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mikhail Pryakhin <m.prya...@gmail.com>
Subject Re: Add custom configuration files to TMs classpath on YARN
Date Wed, 27 Sep 2017 16:36:01 GMT
Hi Robert,

Thanks for your reply!

>I believe you should be able to load non-class files through the classloader as well.
Could you please clarify what you mean by this?

>Did you see any code that excludes non-class files? 
No I didn't, but I did see the following code here [1]:

if (shipFile.isDirectory()) {
	// add directories to the classpath
	java.nio.file.Path shipPath = shipFile.toPath();
	final java.nio.file.Path parentPath = shipPath.getParent();

	Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
		public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
			throws IOException {
			java.nio.file.Path relativePath = parentPath.relativize(file);


			return FileVisitResult.CONTINUE;
} else {
	// add files to the classpath

the code above traverses the folder's content I passed via --yarnship option and appends non
class files to the classpath in case a shipfile is a directory. That eventually gives no results
as we all know only the following files can be set as jvm classpath: .class files, .jar files,
.zip files or folders.

I believe that in case the code above doesn't traverse directories contents then everything
will work as expected.

For instance if I pass a file then it appends to the classpath as is, if I specify a folder
then it goes to the classpath as folder.
By the meantime it is not possible to pass multiple yarnship options, but I also created another
jira ticket [2] that proposes to add the ability to specify multiple yarnship folders.

What do you think about that?

[1] https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:
[2] https://issues.apache.org/jira/browse/FLINK-6950 <https://issues.apache.org/jira/browse/FLINK-6950>

Thanks in advance

Kind Regards,
Mike Pryakhin

> On 27 Sep 2017, at 18:30, Robert Metzger <rmetzger@apache.org> wrote:
> Hi Mike,
> For using the DistributedCache approach, you need to have HDFS or another distributed
FS available to distribute the files.
> I would actually like to understand why you said " then this file is copied to the yarn
cluster and added to JVM class  [...] but is ignored by TM JVM as it is neither jar(zip) file
nor directory..."
> I believe you should be able to load non-class files through the classloader as well.
> Did you see any code that excludes non-class files? Afaik the Taskmanagers have access
to all files (of any type) that are passed using the --ship command (or in the lib/ folder).
> On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin <m.pryahin@gmail.com <mailto:m.pryahin@gmail.com>>
> Hi Nico,
> Thanks a lot for you help, but unfortunately, the workaround you suggested doesn't work
for me.
> I tried to leverage the StreamExecutionEnvironment#registerCachedFile method but failed
because this instance is created when the application master has already been started therefore
the classpath to run the application somewhere on YARN cluster has already been created by
means of org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to pass a
local folder at the moment I submit the application so that it is included in the application
YARN classpath.
> The option you suggested works well if I need to cache a file that is available for me
at the moment I want to register it (for example a file on HDFS).
> Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to pass user-specified
folders to the YARN application classpath?
> Kind Regards,
> Mike Pryakhin
>> On 21 Jun 2017, at 16:55, Mikhail Pryakhin <m.pryahin@gmail.com <mailto:m.pryahin@gmail.com>>
>> Hi Nico!
>> Sounds great, will give it a try and return back with results soon.
>> Thank you so much for your help!!
>> Kind Regards,
>> Mike Pryakhin
>>> On 21 Jun 2017, at 16:36, Nico Kruber <nico@data-artisans.com <mailto:nico@data-artisans.com>>
>>> A workaround may be to use the DistributedCache. It apparently is not 
>>> documented much but the JavaDoc mentions roughly how to use it:
>>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
>>> flink/api/java/ExecutionEnvironment.java#L954
>>> /**
>>> * Registers a file at the distributed cache under the given name. The file will

>>> be accessible
>>> * from any user-defined function in the (distributed) runtime under a local 
>>> path. Files
>>> * may be local files (as long as all relevant workers have access to it), or

>>> files in a distributed file system.
>>> * The runtime will copy the files temporarily to a local cache, if needed.
>>> * <p>
>>> * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be 
>>> obtained inside UDFs via
>>> * {@link 
>>> org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
>>> provides access
>>> * {@link org.apache.flink.api.common.cache.DistributedCache} via 
>>> * {@link 
>>> org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
>>> * 
>>> * @param filePath The path of the file, as a URI (e.g. "file:///some/path <>"
>>> "hdfs://host:port/and/path <>")
>>> * @param name The name under which the file is registered.
>>> */
>>> public void registerCachedFile(String filePath, String name){
>>> 	registerCachedFile(filePath, name, false);
>>> }
>>> You could pass the actual file URL to use for each instance of your job that

>>> requires a different file via a simple job parameter:
>>> public static void main(String[] args) throws Exception {
>>> 	ParameterTool params = ParameterTool.fromArgs(args);
>>> 	...
>>> 	env.registerCachedFile(params.get("config_file", <default/path>), 
>>> "extConfig");
>>> 	...
>>> }
>>> Flink's DistributedCache will then cache the file locally and you can use it
>>> a RichFunction like in
>>> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/ <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/>
>>> apache/flink/test/distributedCache/DistributedCacheTest.java#L99
>>> public class MyFunction extends AbstractRichFunction {
>>> 	private static final long serialVersionUID = 1L;
>>> 	@Override
>>> 	public void open(Configuration conf) throws IOException {
>>> 		File file = 
>>> getRuntimeContext().getDistributedCache().getFile("extConfig");
>>> ...
>>> 	}
>>> }
>>> Nico
>>> On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
>>>> Hi guys,
>>>> any news?
>>>> I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
>>>> <https://issues.apache.org/jira/browse/FLINK-6949 <https://issues.apache.org/jira/browse/FLINK-6949>>.
>>>> Kind Regards,
>>>> Mike Pryakhin
>>>>> On 16 Jun 2017, at 16:35, Mikhail Pryakhin <m.pryahin@gmail.com <mailto:m.pryahin@gmail.com>>
>>>>> Hi all,
>>>>> I run my flink job on yarn cluster and need to supply job configuration
>>>>> parameters via configuration file alongside with the job jar.
>>>>> (configuration file can't be packaged into jobs jar file). I tried to
>>>>> the configuration file into the folder that is passed via --yarnship
>>>>> option to the flink run command, then this file is copied to the yarn
>>>>> cluster and added to JVM class path like 'path/application.conf' but
>>>>> ignored by TM JVM as it is neither jar(zip) file nor directory...
>>>>> A looked through the YarnClusterDescriptor class where the
>>>>> ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
>>>>> flink (YarnClusterDescriptor especially) to add my configuration file
>>>>> the TM JVM classpath... Is there any way to do so? If not do you consider
>>>>> to have such an ability to add files? (like in spark I just can pass
>>>>> files via --files option)
>>>>> Thanks in advance.
>>>>> Kind Regards,
>>>>> Mike Pryakhin

View raw message