flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8797) Port AbstractOperatorRestoreTestBase to MiniClusterResource
Date Tue, 27 Feb 2018 13:14:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378553#comment-16378553
] 

ASF GitHub Bot commented on FLINK-8797:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r170915918
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
---
    @@ -104,175 +91,97 @@ public static void beforeClass() {
     		SavepointSerializers.setFailWhenLegacyStateDetected(false);
     	}
     
    -	@BeforeClass
    -	public static void setupCluster() throws Exception {
    -		final Configuration configuration = new Configuration();
    -
    -		FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
    -
    -		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    -
    -		highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
    -			configuration,
    -			TestingUtils.defaultExecutor());
    -
    -		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    -			configuration,
    -			actorSystem,
    -			TestingUtils.defaultExecutor(),
    -			TestingUtils.defaultExecutor(),
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			Option.empty(),
    -			Option.apply("jm"),
    -			Option.apply("arch"),
    -			TestingJobManager.class,
    -			TestingMemoryArchivist.class);
    -
    -		jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
    -			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    -			actorSystem,
    -			timeout);
    -
    -		archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
    -
    -		Configuration tmConfig = new Configuration();
    -		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    -
    -		ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    -			tmConfig,
    -			ResourceID.generate(),
    -			actorSystem,
    -			highAvailabilityServices,
    -			NoOpMetricRegistry.INSTANCE,
    -			"localhost",
    -			Option.apply("tm"),
    -			true,
    -			TestingTaskManager.class);
    -
    -		taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
    -
    -		// Wait until connected
    -		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    -		Await.ready(taskManager.ask(msg, timeout), timeout);
    -	}
    -
    -	@AfterClass
    -	public static void tearDownCluster() throws Exception {
    -		if (highAvailabilityServices != null) {
    -			highAvailabilityServices.closeAndCleanupAllData();
    -		}
    -
    -		if (actorSystem != null) {
    -			actorSystem.shutdown();
    -		}
    -
    -		if (archiver != null) {
    -			archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (jobManager != null) {
    -			jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -
    -		if (taskManager != null) {
    -			taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
    -		}
    -	}
    -
     	@Test
     	public void testMigrationAndRestore() throws Throwable {
    +		ClassLoader classLoader = this.getClass().getClassLoader();
    +		ClusterClient<?> clusterClient = miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
     		// submit job with old version savepoint and create a migrated savepoint in the new
version
    -		String savepointPath = migrateJob();
    +		String savepointPath = migrateJob(classLoader, clusterClient, deadline);
     		// restore from migrated new version savepoint
    -		restoreJob(savepointPath);
    +		restoreJob(classLoader, clusterClient, deadline, savepointPath);
     	}
     
    -	private String migrateJob() throws Throwable {
    +	private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient,
Deadline deadline) throws Throwable {
    +
     		URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/"
+ getMigrationSavepointName());
     		if (savepointResource == null) {
     			throw new IllegalArgumentException("Savepoint file does not exist.");
     		}
     		JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
     		jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToMigrate.getJobID());
     
    -		// Submit job graph
    -		msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
    -		result = Await.result(jobManager.ask(msg, timeout), timeout);
    +		clusterClient.submitJob(jobToMigrate, classLoader);
     
    -		if (result instanceof JobManagerMessages.JobResultFailure) {
    -			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure)
result;
    -			throw new Exception(failure.cause());
    -		}
    -		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
    +		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.RUNNING),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait for all tasks to be running
    -		msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
    -		Await.result(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS));
     
     		// Trigger savepoint
     		File targetDirectory = tmpFolder.newFolder();
    -		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
    +		String savepointPath = null;
     
     		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not
running
     		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed
(see FLINK-4714)
    -		boolean retry = true;
    -		for (int i = 0; retry && i < 10; i++) {
    -			Future<Object> future = jobManager.ask(msg, timeout);
    -			result = Await.result(future, timeout);
    -
    -			if (result instanceof JobManagerMessages.CancellationFailure) {
    -				Thread.sleep(50L);
    -			} else {
    -				retry = false;
    +		while (deadline.hasTimeLeft() && savepointPath == null) {
    +			try {
    +				savepointPath = clusterClient.cancelWithSavepoint(
    +					jobToMigrate.getJobID(),
    +					targetDirectory.getAbsolutePath());
    +			} catch (Exception e) {
    +				if (!e.toString().matches(".* savepoint for the job .* failed.*")) {
    +					throw e;
    +				}
     			}
     		}
     
    -		if (result instanceof JobManagerMessages.CancellationFailure) {
    -			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure)
result;
    -			throw new Exception(failure.cause());
    -		}
    +		assertNotNull(savepointPath);
     
    -		String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
    +		jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
    +			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
    +			deadline.timeLeft().toMillis() / 50,
    +			Time.milliseconds(50),
    +			(jobStatus) -> jobStatus.equals(JobStatus.CANCELED),
    +			TestingUtils.defaultScheduledExecutor());
     
    -		// Wait until canceled
    -		msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
    -		Await.ready(jobManager.ask(msg, timeout), timeout);
    +		assertEquals(JobStatus.CANCELED, jobStatusFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS));
     
     		return savepointPath;
     	}
     
    -	private void restoreJob(String savepointPath) throws Exception {
    +	private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient,
Deadline deadline, String savepointPath) throws Exception {
     		JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
     		jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath,
allowNonRestoredState));
     
    -		Object msg;
    -		Object result;
    +		assertNotNull(jobToRestore.getJobID());
    --- End diff --
    
    this cannot happen afaik.


> Port AbstractOperatorRestoreTestBase to MiniClusterResource
> -----------------------------------------------------------
>
>                 Key: FLINK-8797
>                 URL: https://issues.apache.org/jira/browse/FLINK-8797
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message