flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Date Fri, 01 Dec 2017 11:07:11 GMT
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5107#discussion_r154320903
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
---
    @@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception {
     			heartbeatServices,
     			mock(MetricRegistryImpl.class),
     			fatalErrorHandler,
    -			jobManagerRunner,
    -			jobId);
    +			mockJobManagerRunner,
    +			TEST_JOB_ID);
     
    -		try {
    -			dispatcher.start();
    +		dispatcher.start();
    +	}
     
    -			CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
    +	@After
    +	public void tearDown() throws Exception {
    +		try {
    +			fatalErrorHandler.rethrowError();
    +		} finally {
    +			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    +		}
    +	}
     
    -			// wait for the leader to be elected
    -			leaderFuture.get();
    +	/**
    +	 * Tests that we can submit a job to the Dispatcher which then spawns a
    +	 * new JobManagerRunner.
    +	 */
    +	@Test
    +	public void testJobSubmission() throws Exception {
    +		CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
     
    -			DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
    +		// wait for the leader to be elected
    +		leaderFuture.get();
     
    -			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph,
timeout);
    +		DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
     
    -			acknowledgeFuture.get();
    +		CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph,
timeout);
     
    -			verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start();
    +		acknowledgeFuture.get();
     
    -			// check that no error has occurred
    -			fatalErrorHandler.rethrowError();
    -		} finally {
    -			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    -		}
    +		verify(mockJobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start();
     	}
     
     	/**
     	 * Tests that the dispatcher takes part in the leader election.
     	 */
     	@Test
     	public void testLeaderElection() throws Exception {
    -		TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
    -		TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    -
     		UUID expectedLeaderSessionId = UUID.randomUUID();
    -		CompletableFuture<UUID> leaderSessionIdFuture = new CompletableFuture<>();
    -		SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class);
    -		TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService()
{
    -			@Override
    -			public void confirmLeaderSessionID(UUID leaderSessionId) {
    -				super.confirmLeaderSessionID(leaderSessionId);
    -				leaderSessionIdFuture.complete(leaderSessionId);
    -			}
    -		};
    -
    -		haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
    -		haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
    -		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
    -		final JobID jobId = new JobID();
    -
    -		final TestingDispatcher dispatcher = new TestingDispatcher(
    -			rpcService,
    -			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
    -			new Configuration(),
    -			haServices,
    -			mock(ResourceManagerGateway.class),
    -			mock(BlobServer.class),
    -			heartbeatServices,
    -			mock(MetricRegistryImpl.class),
    -			fatalErrorHandler,
    -			mock(JobManagerRunner.class),
    -			jobId);
     
    -		try {
    -			dispatcher.start();
    +		assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
     
    -			assertFalse(leaderSessionIdFuture.isDone());
    +		dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
     
    -			testingLeaderElectionService.isLeader(expectedLeaderSessionId);
    +		UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture()
    +			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
     
    -			UUID actualLeaderSessionId = leaderSessionIdFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +		assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
     
    -			assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
    +		verify(submittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
    +	}
     
    -			verify(mockSubmittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
    -		} finally {
    -			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    -		}
    +	/**
    +	 * Test callbacks from
    +	 * {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}.
    +	 */
    +	@Test
    +	public void testSubmittedJobGraphListener() throws Exception {
    +		CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
    +		leaderFuture.get();
    +
    +		dispatcher.submitJob(jobGraph, timeout);
    +
    +		// pretend that other Dispatcher has removed job from submittedJobGraphStore
    +		dispatcher.onRemovedJobGraph(TEST_JOB_ID);
    +		assertThat(dispatcher.listJobs(timeout).get(), hasSize(0));
    --- End diff --
    
    Test is not thread-safe.


---

Mime
View raw message