flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...
Date Fri, 22 Jun 2018 13:06:04 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6199#discussion_r197435093
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
---
    @@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
     		return this.userJarBlobKeys;
     	}
     
    -	/**
    -	 * Uploads the previously added user JAR files to the job manager through
    -	 * the job manager's BLOB server. The BLOB servers' address is given as a
    -	 * parameter. This function issues a blocking call.
    -	 *
    -	 * @param blobServerAddress of the blob server to upload the jars to
    -	 * @param blobClientConfig the blob client configuration
    -	 * @throws IOException Thrown, if the file upload to the JobManager failed.
    -	 */
    -	public void uploadUserJars(
    -			InetSocketAddress blobServerAddress,
    -			Configuration blobClientConfig) throws IOException {
    -		if (!userJars.isEmpty()) {
    -			List<PermanentBlobKey> blobKeys = BlobClient.uploadFiles(
    -				blobServerAddress, blobClientConfig, jobID, userJars);
    -
    -			for (PermanentBlobKey blobKey : blobKeys) {
    -				if (!userJarBlobKeys.contains(blobKey)) {
    -					userJarBlobKeys.add(blobKey);
    -				}
    -			}
    -		}
    -	}
    -
     	@Override
     	public String toString() {
     		return "JobGraph(jobId: " + jobID + ")";
     	}
     
    -	/**
    -	 * Configures JobGraph with user specified artifacts. If the files are in local system
it uploads them
    -	 * to the BLOB server, otherwise it just puts metadata for future remote access from
within task executor.
    -	 *
    -	 * @param blobServerAddress of the blob server to upload the files to
    -	 * @param blobClientConfig the blob client configuration
    -	 * @throws IOException Thrown, if the file upload to the Blob server failed.
    -	 */
    -	public void uploadUserArtifacts(
    -			InetSocketAddress blobServerAddress,
    -			Configuration blobClientConfig) throws IOException {
    -
    -		Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> uploadToBlobServer
= new HashSet<>();
    -		Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> distributeViaDFS
= new HashSet<>();
    -
    -		for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact :
userArtifacts.entrySet()) {
    -			Path filePath = new Path(userArtifact.getValue().filePath);
    -
    -			try {
    -				if (filePath.getFileSystem().isDistributedFS()) {
    -					distributeViaDFS.add(userArtifact);
    -				} else {
    -					uploadToBlobServer.add(userArtifact);
    -				}
    -
    -			} catch (IOException ex) {
    -				distributeViaDFS.add(userArtifact);
    -			}
    +	public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) {
    +		byte[] serializedBlobKey;
    +		try {
    +			serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
    +		} catch (IOException e) {
    +			throw new FlinkRuntimeException("Could not serialize blobkey " + blobKey + ".", e);
     		}
    +		userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry(
    +			originalEntry.filePath,
    +			originalEntry.isExecutable,
    +			serializedBlobKey,
    +			originalEntry.isZipped
    +		));
    +	}
     
    -		uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer);
    -
    -		for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact :
distributeViaDFS) {
    +	public void finalizeUserArtifactEntries() {
    --- End diff --
    
    Maybe rename to `writeUserArtifactEntriesToConfiguration`


---

Mime
View raw message