flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From uce <...@git.apache.org>
Subject [GitHub] flink pull request #2256: [FLINK-4150] [runtime] Don't clean up BlobStore on...
Date Thu, 21 Jul 2016 14:10:34 GMT
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2256#discussion_r71711908
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
---
    @@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception
{
     		}
     	}
     
    +	/**
    +	 * Tests that the persisted job is not removed from the job graph store
    +	 * after the postStop method of the JobManager. Furthermore, it checks
    +	 * that BLOBs of the JobGraph are recovered properly and cleaned up after
    +	 * the job finishes.
    +	 */
    +	@Test
    +	public void testBlobRecoveryAfterLostJobManager() throws Exception {
    +		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
    +		FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS);
    +		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
    +		Configuration flinkConfiguration = new Configuration();
    +		UUID leaderSessionID = UUID.randomUUID();
    +		UUID newLeaderSessionID = UUID.randomUUID();
    +		int slots = 2;
    +		ActorRef archiveRef = null;
    +		ActorRef jobManagerRef = null;
    +		ActorRef taskManagerRef = null;
    +
    +		String haStoragePath = temporaryFolder.newFolder().toString();
    +
    +		flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
    +		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, haStoragePath);
    +		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
    +
    +		try {
    +			MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore();
    +			TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
    +			TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService();
    +
    +			archiveRef = system.actorOf(Props.create(
    +					MemoryArchivist.class,
    +					10), "archive");
    +
    +			jobManagerRef = createJobManagerActor(
    +					"jobmanager-0",
    +					flinkConfiguration,
    +					myLeaderElectionService,
    +					mySubmittedJobGraphStore,
    +					3600000,
    +					timeout,
    +					jobRecoveryTimeout, archiveRef);
    +
    +			ActorGateway jobManager = new AkkaActorGateway(jobManagerRef, leaderSessionID);
    +
    +			taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    +					flinkConfiguration,
    +					ResourceID.generate(),
    +					system,
    +					"localhost",
    +					Option.apply("taskmanager"),
    +					Option.apply((LeaderRetrievalService) myLeaderRetrievalService),
    +					true,
    +					TestingTaskManager.class);
    +
    +			ActorGateway tmGateway = new AkkaActorGateway(taskManagerRef, leaderSessionID);
    +
    +			Future<Object> tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +			Await.ready(tmAlive, deadline.timeLeft());
    +
    +			JobVertex sourceJobVertex = new JobVertex("Source");
    +			sourceJobVertex.setInvokableClass(BlockingInvokable.class);
    +			sourceJobVertex.setParallelism(slots);
    +
    +			JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
    +
    +			// Upload fake JAR file to first JobManager
    +			File jarFile = temporaryFolder.newFile();
    +			ZipOutputStream out = new ZipOutputStream(new FileOutputStream(jarFile));
    +			out.close();
    +
    +			jobGraph.addJar(new Path(jarFile.toURI()));
    +			JobClient.uploadJarFiles(jobGraph, jobManager, deadline.timeLeft());
    +
    +			Future<Object> isLeader = jobManager.ask(
    +					TestingJobManagerMessages.getNotifyWhenLeader(),
    +					deadline.timeLeft());
    +
    +			Future<Object> isConnectedToJobManager = tmGateway.ask(
    +					new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +					deadline.timeLeft());
    +
    +			// tell jobManager that he's the leader
    +			myLeaderElectionService.isLeader(leaderSessionID);
    +			// tell taskManager who's the leader
    +			myLeaderRetrievalService.notifyListener(jobManager.path(), leaderSessionID);
    +
    +			Await.ready(isLeader, deadline.timeLeft());
    +			Await.ready(isConnectedToJobManager, deadline.timeLeft());
    +
    +			// submit blocking job
    +			Future<Object> jobSubmitted = jobManager.ask(
    +					new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
    +					deadline.timeLeft());
    +
    +			Await.ready(jobSubmitted, deadline.timeLeft());
    +
    +			// Wait for running
    +			Future<Object> jobRunning = jobManager.ask(
    +					new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING),
    +					deadline.timeLeft());
    +
    +			Await.ready(jobRunning, deadline.timeLeft());
    +
    +			// terminate the job manager
    +			jobManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
    +
    +			Future<Boolean> terminatedFuture = Patterns.gracefulStop(jobManagerRef, deadline.timeLeft());
    +			Boolean terminated = Await.result(terminatedFuture, deadline.timeLeft());
    +			assertTrue("Failed to stop job manager", terminated);
    +
    +			// job stays in the submitted job graph store
    +			assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +			// start new job manager
    +			myLeaderElectionService.reset();
    +
    +			jobManagerRef = createJobManagerActor(
    +					"jobmanager-1",
    +					flinkConfiguration,
    +					myLeaderElectionService,
    +					mySubmittedJobGraphStore,
    +					500,
    +					timeout,
    +					jobRecoveryTimeout,
    +					archiveRef);
    +
    +			jobManager = new AkkaActorGateway(jobManagerRef, newLeaderSessionID);
    +
    +			Future<Object> isAlive = jobManager.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +			isLeader = jobManager.ask(
    +					TestingJobManagerMessages.getNotifyWhenLeader(),
    +					deadline.timeLeft());
    +
    +			isConnectedToJobManager = tmGateway.ask(
    +					new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +					deadline.timeLeft());
    +
    +			Await.ready(isAlive, deadline.timeLeft());
    +
    +			// tell new jobManager that he's the leader
    +			myLeaderElectionService.isLeader(newLeaderSessionID);
    +			// tell taskManager who's the leader
    +			myLeaderRetrievalService.notifyListener(jobManager.path(), newLeaderSessionID);
    +
    +			Await.ready(isLeader, deadline.timeLeft());
    +			Await.ready(isConnectedToJobManager, deadline.timeLeft());
    +
    +			jobRunning = jobManager.ask(
    +					new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING),
    +					deadline.timeLeft());
    +
    +			// wait that the job is recovered and reaches state RUNNING
    +			Await.ready(jobRunning, deadline.timeLeft());
    +
    +			Future<Object> jobFinished = jobManager.ask(
    +					new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
    +					deadline.timeLeft());
    +
    +			BlockingInvokable.unblock();
    +
    +			// wait til the job has finished
    +			Await.ready(jobFinished, deadline.timeLeft());
    +
    +			// check that the job has been removed from the submitted job graph store
    +			assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +			// Check that the BLOB store files are removed
    +			File rootPath = new File(haStoragePath);
    +
    +			boolean cleanedUpFiles = false;
    +			while (deadline.hasTimeLeft()) {
    +				if (listFiles(rootPath).isEmpty()) {
    --- End diff --
    
    Yes, that is true. We will for example have empty folders `<root>/blob/cache` in
this test. I've added a method to try to delete the parent directory when deleting a BLOB
(same as what are currently doing in `AbstractFileStateHandle`). I will adjust this check
to check that the directory is empty.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message