flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [4/4] flink git commit: [FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource
Date Sun, 11 Mar 2018 15:27:45 GMT
[FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6732669a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6732669a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6732669a

Branch: refs/heads/master
Commit: 6732669a684de0b230046b8f4291e367e35d9477
Parents: ccb78b0
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Feb 27 13:42:09 2018 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Sun Mar 11 08:17:21 2018 -0700

----------------------------------------------------------------------
 .../AbstractOperatorRestoreTestBase.java        | 248 ++++++-------------
 1 file changed, 81 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6732669a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 9710c20..72f700a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -19,55 +19,40 @@
 package org.apache.flink.test.state.operator.restore;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
-import org.apache.flink.runtime.testingUtils.TestingTaskManager;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.net.URL;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /**
  * Abstract class to verify that it is possible to migrate a savepoint across upgraded Flink
versions and that the
@@ -79,16 +64,21 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
+	private static final int NUM_TMS = 1;
+	private static final int NUM_SLOTS_PER_TM = 4;
+	private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);
+
 	@Rule
 	public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-	private static ActorSystem actorSystem = null;
-	private static HighAvailabilityServices highAvailabilityServices = null;
-	private static ActorGateway jobManager = null;
-	private static ActorGateway archiver = null;
-	private static ActorGateway taskManager = null;
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			new Configuration(),
+			NUM_TMS,
+			NUM_SLOTS_PER_TM),
+		true);
 
-	private static final FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
 	private final boolean allowNonRestoredState;
 
 	protected AbstractOperatorRestoreTestBase() {
@@ -104,91 +94,21 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger
{
 		SavepointSerializers.setFailWhenLegacyStateDetected(false);
 	}
 
-	@BeforeClass
-	public static void setupCluster() throws Exception {
-		final Configuration configuration = new Configuration();
-
-		FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
-
-		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-
-		highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
-			configuration,
-			TestingUtils.defaultExecutor());
-
-		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
-			configuration,
-			actorSystem,
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			highAvailabilityServices,
-			NoOpMetricRegistry.INSTANCE,
-			Option.empty(),
-			Option.apply("jm"),
-			Option.apply("arch"),
-			TestingJobManager.class,
-			TestingMemoryArchivist.class);
-
-		jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
-			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-			actorSystem,
-			timeout);
-
-		archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());
-
-		Configuration tmConfig = new Configuration();
-		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-
-		ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
-			tmConfig,
-			ResourceID.generate(),
-			actorSystem,
-			highAvailabilityServices,
-			NoOpMetricRegistry.INSTANCE,
-			"localhost",
-			Option.apply("tm"),
-			true,
-			TestingTaskManager.class);
-
-		taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());
-
-		// Wait until connected
-		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
-		Await.ready(taskManager.ask(msg, timeout), timeout);
-	}
-
-	@AfterClass
-	public static void tearDownCluster() throws Exception {
-		if (highAvailabilityServices != null) {
-			highAvailabilityServices.closeAndCleanupAllData();
-		}
-
-		if (actorSystem != null) {
-			actorSystem.shutdown();
-		}
-
-		if (archiver != null) {
-			archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-		}
-
-		if (jobManager != null) {
-			jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-		}
-
-		if (taskManager != null) {
-			taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-		}
-	}
-
 	@Test
 	public void testMigrationAndRestore() throws Throwable {
+		ClassLoader classLoader = this.getClass().getClassLoader();
+		ClusterClient<?> clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
+		clusterClient.setDetached(true);
+		final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
+
 		// submit job with old version savepoint and create a migrated savepoint in the new version
-		String savepointPath = migrateJob();
+		String savepointPath = migrateJob(classLoader, clusterClient, deadline);
 		// restore from migrated new version savepoint
-		restoreJob(savepointPath);
+		restoreJob(classLoader, clusterClient, deadline, savepointPath);
 	}
 
-	private String migrateJob() throws Throwable {
+	private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient,
Deadline deadline) throws Throwable {
+
 		URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/"
+ getMigrationSavepointName());
 		if (savepointResource == null) {
 			throw new IllegalArgumentException("Savepoint file does not exist.");
@@ -196,86 +116,80 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger
{
 		JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
 		jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
 
-		Object msg;
-		Object result;
-
-		// Submit job graph
-		msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
-		result = Await.result(jobManager.ask(msg, timeout), timeout);
+		assertNotNull(jobToMigrate.getJobID());
 
-		if (result instanceof JobManagerMessages.JobResultFailure) {
-			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
-			throw new Exception(failure.cause());
-		}
-		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+		clusterClient.submitJob(jobToMigrate, classLoader);
 
-		// Wait for all tasks to be running
-		msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
-		Await.result(jobManager.ask(msg, timeout), timeout);
+		CompletableFuture<JobStatus> jobRunningFuture = FutureUtils.retrySuccesfulWithDelay(
+			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+			Time.milliseconds(50),
+			deadline,
+			(jobStatus) -> jobStatus == JobStatus.RUNNING,
+			TestingUtils.defaultScheduledExecutor());
+		assertEquals(
+			JobStatus.RUNNING,
+			jobRunningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 
 		// Trigger savepoint
 		File targetDirectory = tmpFolder.newFolder();
-		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
+		String savepointPath = null;
 
 		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
 		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed
(see FLINK-4714)
-		boolean retry = true;
-		for (int i = 0; retry && i < 10; i++) {
-			Future<Object> future = jobManager.ask(msg, timeout);
-			result = Await.result(future, timeout);
-
-			if (result instanceof JobManagerMessages.CancellationFailure) {
-				Thread.sleep(50L);
-			} else {
-				retry = false;
+		while (deadline.hasTimeLeft() && savepointPath == null) {
+			try {
+				savepointPath = clusterClient.cancelWithSavepoint(
+					jobToMigrate.getJobID(),
+					targetDirectory.getAbsolutePath());
+			} catch (Exception e) {
+				String exceptionString = ExceptionUtils.stringifyException(e);
+				if (!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") // legacy
+						|| exceptionString.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*")
// flip6
+						|| exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*")))
{ // flip6
+					throw e;
+				}
 			}
 		}
 
-		if (result instanceof JobManagerMessages.CancellationFailure) {
-			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure)
result;
-			throw new Exception(failure.cause());
-		}
-
-		String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
+		assertNotNull("Could not take savepoint.", savepointPath);
 
-		// Wait until canceled
-		msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
-		Await.ready(jobManager.ask(msg, timeout), timeout);
+		CompletableFuture<JobStatus> jobCanceledFuture = FutureUtils.retrySuccesfulWithDelay(
+			() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
+			Time.milliseconds(50),
+			deadline,
+			(jobStatus) -> jobStatus == JobStatus.CANCELED,
+			TestingUtils.defaultScheduledExecutor());
+		assertEquals(
+			JobStatus.CANCELED,
+			jobCanceledFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 
 		return savepointPath;
 	}
 
-	private void restoreJob(String savepointPath) throws Exception {
+	private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline
deadline, String savepointPath) throws Exception {
 		JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
 		jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath,
allowNonRestoredState));
 
-		Object msg;
-		Object result;
-
-		// Submit job graph
-		msg = new JobManagerMessages.SubmitJob(jobToRestore, ListeningBehaviour.DETACHED);
-		result = Await.result(jobManager.ask(msg, timeout), timeout);
+		assertNotNull("Job doesn't have a JobID.", jobToRestore.getJobID());
 
-		if (result instanceof JobManagerMessages.JobResultFailure) {
-			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
-			throw new Exception(failure.cause());
-		}
-		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
-
-		msg = new JobManagerMessages.RequestJobStatus(jobToRestore.getJobID());
-		JobStatus status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(msg,
timeout), timeout)).status();
-		while (!status.isTerminalState()) {
-			status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(msg, timeout),
timeout)).status();
-		}
+		clusterClient.submitJob(jobToRestore, classLoader);
 
-		Assert.assertEquals(JobStatus.FINISHED, status);
+		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
+			() -> clusterClient.getJobStatus(jobToRestore.getJobID()),
+			Time.milliseconds(50),
+			deadline,
+			(jobStatus) -> jobStatus == JobStatus.FINISHED,
+			TestingUtils.defaultScheduledExecutor());
+		assertEquals(
+			JobStatus.FINISHED,
+			jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 	}
 
 	private JobGraph createJobGraph(ExecutionMode mode) {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
 		env.setRestartStrategy(RestartStrategies.noRestart());
-		env.setStateBackend(new MemoryStateBackend());
+		env.setStateBackend((StateBackend) new MemoryStateBackend());
 
 		switch (mode) {
 			case MIGRATE:


Mime
View raw message