flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9289) Parallelism of generated operators should have max parallism of input
Date Fri, 22 Jun 2018 13:58:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520381#comment-16520381
] 

ASF GitHub Bot commented on FLINK-9289:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r197451641
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
---
    @@ -54,18 +69,89 @@ public JobSubmitHandler(
     
     	@Override
     	protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody,
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException
{
    -		JobGraph jobGraph;
    -		try {
    -			ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
    -			jobGraph = (JobGraph) objectIn.readObject();
    -		} catch (Exception e) {
    -			throw new RestHandlerException(
    -				"Failed to deserialize JobGraph.",
    -				HttpResponseStatus.BAD_REQUEST,
    -				e);
    +		Collection<Path> uploadedFiles = request.getUploadedFiles();
    +		Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
    +			path -> path.getFileName().toString(),
    +			entry -> entry
    +		));
    +
    +		JobSubmitRequestBody requestBody = request.getRequestBody();
    +
    +		Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph",
nameToFile);
    +
    +		Collection<org.apache.flink.core.fs.Path> jarFiles = new ArrayList<>(requestBody.jarFileNames.size());
    +		for (String jarFileName : requestBody.jarFileNames) {
    +			Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile);
    +			jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString()));
    +		}
    +
    +		Collection<Tuple2<String, org.apache.flink.core.fs.Path>> artifacts = new
ArrayList<>(requestBody.artifactFileNames.size());
    +		for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames)
{
    +			Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact",
nameToFile);
    +			artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString())));
     		}
     
    -		return gateway.submitJob(jobGraph, timeout)
    -			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +		Map<String, DistributedCache.DistributedCacheEntry> temporaryHack = artifacts.stream()
    +			.collect(Collectors.toMap(
    +				tuple -> tuple.f0,
    +				// the actual entry definition is mostly irrelevant as only the blobkey is accessed
    +				// blame whoever wrote the ClientUtils API
    +				tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
    +			));
    +
    +		// TODO: use executor
    +		CompletableFuture<JobGraph> jobGraphFuture = CompletableFuture.supplyAsync(()
-> {
    +			JobGraph jobGraph;
    +			try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile)))
{
    +				jobGraph = (JobGraph) objectIn.readObject();
    +			} catch (Exception e) {
    +				throw new CompletionException(new RestHandlerException(
    +					"Failed to deserialize JobGraph.",
    +					HttpResponseStatus.BAD_REQUEST,
    +					e));
    +			}
    +			return jobGraph;
    +		});
    +
    +		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
    +
    +		CompletableFuture<JobGraph> finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture,
(jobGraph, blobServerPort) -> {
    +			final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
    +			try (BlobClient blobClient = new BlobClient(address, new Configuration())) {
    +				Collection<PermanentBlobKey> jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(),
jarFiles, blobClient);
    +				ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph);
    +
    +				Collection<Tuple2<String, PermanentBlobKey>> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(),
temporaryHack, blobClient);
    +				ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys);
    +			} catch (IOException e) {
    +				throw new CompletionException(new RestHandlerException(
    +					"Could not upload job files.",
    +					HttpResponseStatus.INTERNAL_SERVER_ERROR,
    +					e));
    +			}
    +			return jobGraph;
    +		});
    +
    +		CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph
-> gateway.submitJob(jobGraph, timeout));
    +
    +		return jobSubmissionFuture.thenCombine(jobGraphFuture,
    +			(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +	}
    --- End diff --
    
    This method is quite long. We could think about moving the generation of the `jarFiles`
and `temporaryHack` structures into individual methods.


> Parallelism of generated operators should have max parallism of input
> ---------------------------------------------------------------------
>
>                 Key: FLINK-9289
>                 URL: https://issues.apache.org/jira/browse/FLINK-9289
>             Project: Flink
>          Issue Type: Bug
>          Components: DataSet API
>    Affects Versions: 1.5.0, 1.4.2, 1.6.0
>            Reporter: Fabian Hueske
>            Assignee: Xingcan Cui
>            Priority: Major
>              Labels: pull-request-available
>
> The DataSet API aims to chain generated operators such as key extraction mappers to their
predecessor. This is done by assigning the same parallelism as the input operator.
> If a generated operator has more than two inputs, the operator cannot be chained anymore
and the operator is generated with default parallelism. This can lead to a {code}NoResourceAvailableException:
Not enough free slots available to run the job.{code} as reported by a user on the mailing
list: https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E
> I suggest to set the parallelism of a generated operator to the max parallelism of all
of its inputs to fix this problem.
> Until the problem is fixed, a workaround is to set the default parallelism at the {{ExecutionEnvironment}}:
> {code}
> ExecutionEnvironment env = ...
> env.setParallelism(2);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message