flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/8] flink git commit: [FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks
Date Thu, 16 Mar 2017 14:05:42 GMT
Repository: flink
Updated Branches:
  refs/heads/master d160b5e56 -> 24408e190


[FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory
leaks

This closes #3548


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70252f34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70252f34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70252f34

Branch: refs/heads/master
Commit: 70252f3468916758e8bc456bbf482549c38ad7ff
Parents: afd36f9
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Mar 15 16:44:41 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 79 +++++++++++++-------
 .../runtime/checkpoint/PendingCheckpoint.java   | 38 ++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   | 16 +++-
 .../checkpoint/PendingCheckpointTest.java       | 18 +++++
 4 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0592e3d..cc60837 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.TaskStateHandles;
 
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,9 +50,10 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -134,7 +136,7 @@ public class CheckpointCoordinator {
 	private final int maxConcurrentCheckpointAttempts;
 
 	/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
-	private final Timer timer;
+	private final ScheduledThreadPoolExecutor timer;
 
 	/** Actor that receives status updates from the execution graph this coordinator works for
*/
 	private JobStatusListener jobStatusListener;
@@ -142,7 +144,8 @@ public class CheckpointCoordinator {
 	/** The number of consecutive failed trigger attempts */
 	private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
 
-	private ScheduledTrigger currentPeriodicTrigger;
+	/** A handle to the current periodic trigger, to cancel it when necessary */
+	private ScheduledFuture<?> currentPeriodicTrigger;
 
 	/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */
 	private long lastCheckpointCompletionNanos;
@@ -218,7 +221,13 @@ public class CheckpointCoordinator {
 
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 
-		this.timer = new Timer("Checkpoint Timer", true);
+		this.timer = new ScheduledThreadPoolExecutor(1,
+				new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
+
+		// make sure the timer internally cleans up and does not hold onto stale scheduled tasks
+		this.timer.setRemoveOnCancelPolicy(true);
+		this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+		this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 
 		if (externalizeSettings.externalizeCheckpoints()) {
 			LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory);
@@ -265,7 +274,7 @@ public class CheckpointCoordinator {
 				triggerRequestQueued = false;
 
 				// shut down the thread that handles the timeouts and pending triggers
-				timer.cancel();
+				timer.shutdownNow();
 
 				// clear and discard all pending checkpoints
 				for (PendingCheckpoint pending : pendingCheckpoints.values()) {
@@ -392,7 +401,7 @@ public class CheckpointCoordinator {
 				if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
 					triggerRequestQueued = true;
 					if (currentPeriodicTrigger != null) {
-						currentPeriodicTrigger.cancel();
+						currentPeriodicTrigger.cancel(false);
 						currentPeriodicTrigger = null;
 					}
 					return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -404,13 +413,14 @@ public class CheckpointCoordinator {
 
 				if (durationTillNextMillis > 0) {
 					if (currentPeriodicTrigger != null) {
-						currentPeriodicTrigger.cancel();
+						currentPeriodicTrigger.cancel(false);
 						currentPeriodicTrigger = null;
 					}
-					ScheduledTrigger trigger = new ScheduledTrigger();
 					// Reassign the new trigger to the currentPeriodicTrigger
-					currentPeriodicTrigger = trigger;
-					timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+					currentPeriodicTrigger = timer.scheduleAtFixedRate(
+							new ScheduledTrigger(),
+							durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
 					return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 				}
 			}
@@ -483,7 +493,7 @@ public class CheckpointCoordinator {
 			}
 
 			// schedule the timer that will clean up the expired checkpoints
-			TimerTask canceller = new TimerTask() {
+			final Runnable canceller = new Runnable() {
 				@Override
 				public void run() {
 					synchronized (lock) {
@@ -519,7 +529,7 @@ public class CheckpointCoordinator {
 						if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
 							triggerRequestQueued = true;
 							if (currentPeriodicTrigger != null) {
-								currentPeriodicTrigger.cancel();
+								currentPeriodicTrigger.cancel(false);
 								currentPeriodicTrigger = null;
 							}
 							return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -531,14 +541,15 @@ public class CheckpointCoordinator {
 
 						if (durationTillNextMillis > 0) {
 							if (currentPeriodicTrigger != null) {
-								currentPeriodicTrigger.cancel();
+								currentPeriodicTrigger.cancel(false);
 								currentPeriodicTrigger = null;
 							}
 
-							ScheduledTrigger trigger = new ScheduledTrigger();
 							// Reassign the new trigger to the currentPeriodicTrigger
-							currentPeriodicTrigger = trigger;
-							timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+							currentPeriodicTrigger = timer.scheduleAtFixedRate(
+									new ScheduledTrigger(),
+									durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
 							return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 						}
 					}
@@ -546,7 +557,15 @@ public class CheckpointCoordinator {
 					LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
 
 					pendingCheckpoints.put(checkpointID, checkpoint);
-					timer.schedule(canceller, checkpointTimeout);
+
+					ScheduledFuture<?> cancellerHandle = timer.schedule(
+							canceller,
+							checkpointTimeout, TimeUnit.MILLISECONDS);
+
+					if (!checkpoint.setCancellerHandle(cancellerHandle)) {
+						// checkpoint is already disposed!
+						cancellerHandle.cancel(false);
+					}
 				}
 				// end of lock scope
 
@@ -866,20 +885,25 @@ public class CheckpointCoordinator {
 
 			// trigger the checkpoint from the trigger timer, to finish the work of this thread before
 			// starting with the next checkpoint
-			ScheduledTrigger trigger = new ScheduledTrigger();
 			if (periodicScheduling) {
 				if (currentPeriodicTrigger != null) {
-					currentPeriodicTrigger.cancel();
+					currentPeriodicTrigger.cancel(false);
 				}
-				currentPeriodicTrigger = trigger;
-				timer.scheduleAtFixedRate(trigger, 0L, baseInterval);
+				currentPeriodicTrigger = timer.scheduleAtFixedRate(
+						new ScheduledTrigger(),
+						0L, baseInterval, TimeUnit.MILLISECONDS);
 			}
 			else {
-				timer.schedule(trigger, 0L);
+				timer.execute(new ScheduledTrigger());
 			}
 		}
 	}
 
+	@VisibleForTesting
+	int getNumScheduledTasks() {
+		return timer.getQueue().size();
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Checkpoint State Restoring
 	// --------------------------------------------------------------------------------------------
@@ -1006,8 +1030,9 @@ public class CheckpointCoordinator {
 			stopCheckpointScheduler();
 
 			periodicScheduling = true;
-			currentPeriodicTrigger = new ScheduledTrigger();
-			timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
+			currentPeriodicTrigger = timer.scheduleAtFixedRate(
+					new ScheduledTrigger(), 
+					baseInterval, baseInterval, TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -1017,7 +1042,7 @@ public class CheckpointCoordinator {
 			periodicScheduling = false;
 
 			if (currentPeriodicTrigger != null) {
-				currentPeriodicTrigger.cancel();
+				currentPeriodicTrigger.cancel(false);
 				currentPeriodicTrigger = null;
 			}
 
@@ -1050,7 +1075,7 @@ public class CheckpointCoordinator {
 
 	// ------------------------------------------------------------------------
 
-	private class ScheduledTrigger extends TimerTask {
+	private final class ScheduledTrigger implements Runnable {
 
 		@Override
 		public void run() {

http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 5ca6040..b7eb037 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -46,6 +46,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -99,6 +100,8 @@ public class PendingCheckpoint {
 	@Nullable
 	private PendingCheckpointStats statsCallback;
 
+	private volatile ScheduledFuture<?> cancellerHandle;
+
 	// --------------------------------------------------------------------------------------------
 
 	public PendingCheckpoint(
@@ -197,6 +200,27 @@ public class PendingCheckpoint {
 		this.statsCallback = trackerCallback;
 	}
 
+	/**
+	 * Sets the handle for the canceller to this pending checkoint.
+	 * 
+	 * @return true, if the handle was set, false, if the checkpoint is already disposed;
+	 */
+	public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
+		synchronized (lock) {
+			if (this.cancellerHandle == null) {
+				if (!discarded) {
+					this.cancellerHandle = cancellerHandle;
+					return true;
+				} else {
+					return false;
+				}
+			}
+			else {
+				throw new IllegalStateException("A canceller handle was already set");
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Progress and Completion
 	// ------------------------------------------------------------------------
@@ -490,10 +514,24 @@ public class PendingCheckpoint {
 				discarded = true;
 				notYetAcknowledgedTasks.clear();
 				acknowledgedTasks.clear();
+				cancelCanceller();
 			}
 		}
 	}
 
+	private void cancelCanceller() {
+		try {
+			final ScheduledFuture<?> canceller = this.cancellerHandle;
+			if (canceller != null) {
+				canceller.cancel(false);
+			}
+		}
+		catch (Exception e) {
+			// this code should not throw exceptions
+			LOG.warn("Error while cancelling checkpoint timeout task", e);
+		}
+	}
+
 	/**
 	 * Reports a failed checkpoint with the given optional cause.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1691370..d8bba59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -299,6 +299,9 @@ public class CheckpointCoordinatorTest {
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
+			// we have one task scheduled that will cancel after timeout
+			assertEquals(1, coord.getNumScheduledTasks());
+
 			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
 
@@ -336,6 +339,9 @@ public class CheckpointCoordinatorTest {
 			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
 			assertTrue(checkpoint.isDiscarded());
 
+			// the canceler is also removed
+			assertEquals(0, coord.getNumScheduledTasks());
+
 			// validate that we have no new pending checkpoint
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -389,6 +395,7 @@ public class CheckpointCoordinatorTest {
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumScheduledTasks());
 
 			// trigger the first checkpoint. this should succeed
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -399,6 +406,7 @@ public class CheckpointCoordinatorTest {
 			// validate that we have a pending checkpoint
 			assertEquals(2, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(2, coord.getNumScheduledTasks());
 
 			Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
 			long checkpoint1Id = it.next().getKey();
@@ -439,13 +447,13 @@ public class CheckpointCoordinatorTest {
 			}
 
 			// decline checkpoint from one of the tasks, this should cancel the checkpoint
-			// and trigger a new one
 			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
 			assertTrue(checkpoint1.isDiscarded());
 
 			// validate that we have only one pending checkpoint left
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, coord.getNumScheduledTasks());
 
 			// validate that it is the same second checkpoint from earlier
 			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
@@ -506,6 +514,7 @@ public class CheckpointCoordinatorTest {
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumScheduledTasks());
 
 			// trigger the first checkpoint. this should succeed
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -513,6 +522,7 @@ public class CheckpointCoordinatorTest {
 			// validate that we have a pending checkpoint
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, coord.getNumScheduledTasks());
 
 			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
@@ -556,6 +566,9 @@ public class CheckpointCoordinatorTest {
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
+			// the canceler should be removed now
+			assertEquals(0, coord.getNumScheduledTasks());
+
 			// validate that the relevant tasks got a confirmation message
 			{
 				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId),
eq(timestamp), any(CheckpointOptions.class));
@@ -580,6 +593,7 @@ public class CheckpointCoordinatorTest {
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumScheduledTasks());
 
 			CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
 			assertEquals(jid, successNew.getJobId());

http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 6f04f39..55b5fe0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -287,6 +288,23 @@ public class PendingCheckpointTest {
 		}
 	}
 
+	@Test
+	public void testSetCanceller() {
+		final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true,
true, true);
+
+		PendingCheckpoint aborted = createPendingCheckpoint(props, null);
+		aborted.abortDeclined();
+		assertTrue(aborted.isDiscarded());
+		assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
+
+		PendingCheckpoint pending = createPendingCheckpoint(props, null);
+		ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
+
+		assertTrue(pending.setCancellerHandle(canceller));
+		pending.abortDeclined();
+		verify(canceller).cancel(false);
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String
targetDirectory) {


Mime
View raw message