flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #6147: [FLINK-9280][rest] Rework JobSubmitHandler to acce...
Date Wed, 13 Jun 2018 14:38:21 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6147#discussion_r195107887
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
---
    @@ -71,20 +108,192 @@ public void testSerializationFailureHandling() throws Exception
{
     
     	@Test
     	public void testSuccessfulJobSubmission() throws Exception {
    -		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
    -		when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
    +		DispatcherGateway mockGateway = new JobGraphCapturingMockGateway();
     		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
     
     		JobSubmitHandler handler = new JobSubmitHandler(
     			CompletableFuture.completedFuture("http://localhost:1234"),
     			mockGatewayRetriever,
     			RpcUtils.INF_TIMEOUT,
    -			Collections.emptyMap());
    +			Collections.emptyMap(),
    +			new Configuration());
     
     		JobGraph job = new JobGraph("testjob");
     		JobSubmitRequestBody request = new JobSubmitRequestBody(job);
     
     		handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()),
mockGateway)
     			.get();
     	}
    +
    +	@Test
    +	public void testJarHandling() throws Exception {
    +		final String jarName = "jar";
    +
    +		JobGraphCapturingMockGateway jobGraphCapturingMockGateway = new JobGraphCapturingMockGateway();
    +		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
    +
    +		JobSubmitHandler handler = new JobSubmitHandler(
    +			CompletableFuture.completedFuture("http://localhost:1234"),
    +			mockGatewayRetriever,
    +			RpcUtils.INF_TIMEOUT,
    +			Collections.emptyMap(),
    +			new Configuration());
    +
    +		Path tmp = TMP_FOLDER.newFolder().toPath();
    +		Path clientStorageDirectory = Files.createDirectory(tmp.resolve("client-storage-directory"));
    +		Path serverStorageDirectory = Files.createDirectory(tmp.resolve("server-storage-directory"));
    +
    +		Path jar = Paths.get(jarName);
    +		Files.createFile(clientStorageDirectory.resolve(jar));
    +		Files.createFile(serverStorageDirectory.resolve(jar));
    +
    +		JobGraph job = new JobGraph("testjob");
    +		job.addJar(new org.apache.flink.core.fs.Path(jar.toUri()));
    +		JobSubmitRequestBody serializedJobGraphBody = new JobSubmitRequestBody(job);
    +		JobSubmitRequestBody request = new JobSubmitRequestBody(serializedJobGraphBody.serializedJobGraph,
Collections.singletonList(serverStorageDirectory.resolve(jar)), Collections.emptyList(), serverStorageDirectory);
    +
    +		handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()),
jobGraphCapturingMockGateway)
    +			.get();
    +
    +		JobGraph submittedJobGraph = jobGraphCapturingMockGateway.jobGraph;
    +		List<org.apache.flink.core.fs.Path> userJars = submittedJobGraph.getUserJars();
    +
    +		// ensure we haven't changed the total number of jars
    +		Assert.assertEquals(1, userJars.size());
    +
    +		// this entry should be changed, a replacement jar exists in the server storage directory
    +		Assert.assertEquals(new org.apache.flink.core.fs.Path(serverStorageDirectory.resolve(jar).toUri()),
userJars.get(0));
    --- End diff --
    
    I think updating `JobGraph#userJars` and `JobGraph#userArtifacts` is not really necessary.
Maybe we should even mark them `transient` in order to emphasize that they won't be transmitted.
Given that, I think we don't have to do these tests.


---

Mime
View raw message