flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files
Date Wed, 13 Jun 2018 11:47:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510982#comment-16510982
] 

ASF GitHub Bot commented on FLINK-9280:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6147#discussion_r195035751
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
---
    @@ -342,4 +320,54 @@ public void run() {
     			}
     		}
     	}
    +
    +	public static Path compressDirectory(Path directory) throws IOException {
    +		FileSystem fs = directory.getFileSystem();
    +		java.nio.file.Path tmp = Files.createTempFile("flink-distributed-cache", ".zip");
    +		try (ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmp.toFile())))
{
    +			addToZip(directory, fs, directory.getParent(), out);
    +		}
    +		return new Path(tmp.toUri());
    +	}
    +
    +	private static void addToZip(Path fileOrDirectory, FileSystem fs, Path rootDir, ZipOutputStream
out) throws IOException {
    +		String relativePath = fileOrDirectory.getPath().replace(rootDir.getPath() + '/', "");
    +		if (fs.getFileStatus(fileOrDirectory).isDir()) {
    +			out.putNextEntry(new ZipEntry(relativePath + '/'));
    +			for (FileStatus containedFile : fs.listStatus(fileOrDirectory)) {
    +				addToZip(containedFile.getPath(), fs, rootDir, out);
    +			}
    +		} else {
    +			ZipEntry entry = new ZipEntry(relativePath);
    +			out.putNextEntry(entry);
    +
    +			try (FSDataInputStream in = fs.open(fileOrDirectory)) {
    +				IOUtils.copyBytes(in, out, false);
    +			}
    +			out.closeEntry();
    +		}
    +	}
    +
    +	@VisibleForTesting
    +	static Path expandDirectory(File file, File targetDirectory, boolean isExecutable) throws
IOException {
    +		java.nio.file.Path rootDir = null;
    +		try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +			ZipEntry entry;
    +			while ((entry = zis.getNextEntry()) != null) {
    +				java.nio.file.Path relativePath = Paths.get(entry.getName());
    +				rootDir = relativePath.getName(0);
    +
    +				java.nio.file.Path newFile = targetDirectory.toPath().resolve(relativePath);
    +				if (entry.isDirectory()) {
    +					Files.createDirectories(newFile);
    +				} else {
    +					Files.copy(zis, newFile);
    +					//noinspection ResultOfMethodCallIgnored
    +					newFile.toFile().setExecutable(isExecutable);
    --- End diff --
    
    This method violates the SRP by expanding the zip and setting file permissions. Might
be easier to separate these steps (especially if we move these methods to `FileUtils`).


> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
>                 Key: FLINK-9280
>                 URL: https://issues.apache.org/jira/browse/FLINK-9280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Job-Submission, REST
>    Affects Versions: 1.5.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Critical
>             Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob server,
sets the blob keys in the jobgraph, and then uploads this graph to The {{JobSubmitHandler}}
which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the blobserver
before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an optional list
of jar files, that were previously uploaded through the {{JarUploadHandler}}. If present,
the handler would upload these jars to the blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message