flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From senorcarb...@apache.org
Subject flink git commit: [FLINK-3256] Fix colocation group re-instantiation
Date Mon, 25 Jan 2016 15:47:58 GMT
Repository: flink
Updated Branches:
  refs/heads/master ed3810b12 -> 440618823


[FLINK-3256] Fix colocation group re-instantiation

This closes #1526


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44061882
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44061882
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44061882

Branch: refs/heads/master
Commit: 440618823f6f2d4f1d9f8c62c63895f83b32c374
Parents: ed3810b
Author: Paris Carbone <parisc@kth.se>
Authored: Wed Jan 20 03:03:41 2016 +0100
Committer: Paris Carbone <parisc@kth.se>
Committed: Mon Jan 25 15:49:00 2016 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  16 +-
 .../executiongraph/ExecutionJobVertex.java      |   3 -
 .../ExecutionGraphRestartTest.java              | 166 ++++++++++++++-----
 3 files changed, 135 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44061882/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9085483..a03f0bf 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -76,6 +77,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -147,7 +150,7 @@ public class ExecutionGraph implements Serializable {
 
 	/** All vertices, in the order in which they were created **/
 	private final List<ExecutionJobVertex> verticesInCreationOrder;
-
+	
 	/** All intermediate results that are part of this graph */
 	private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
 
@@ -719,7 +722,7 @@ public class ExecutionGraph implements Serializable {
 							res.getId(), res, previousDataSet));
 				}
 			}
-
+			
 			this.verticesInCreationOrder.add(ejv);
 		}
 	}
@@ -849,7 +852,16 @@ public class ExecutionGraph implements Serializable {
 
 				this.currentExecutions.clear();
 
+				Collection<CoLocationGroup> colGroups = new HashSet<>();
+				
 				for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
+					
+					CoLocationGroup cgroup = jv.getCoLocationGroup();
+					if(cgroup != null && !colGroups.contains(cgroup)){
+						cgroup.resetConstraints();
+						colGroups.add(cgroup);
+					}
+					
 					jv.resetForNewExecution();
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44061882/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 93ae7c1..bc368ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -362,9 +362,6 @@ public class ExecutionJobVertex implements Serializable {
 			if (slotSharingGroup != null) {
 				slotSharingGroup.clearTaskAssignment();
 			}
-			if (coLocationGroup != null) {
-				coLocationGroup.resetConstraints();
-			}
 			
 			// reset vertices one by one. if one reset fails, the "vertices in final state"
 			// fields will be consistent to handle triggered cancel calls

http://git-wip-us.apache.org/repos/asf/flink/blob/44061882/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 127ae33..0c3af8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -28,7 +28,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 import scala.concurrent.duration.Deadline;
@@ -38,7 +40,8 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
@@ -94,6 +97,72 @@ public class ExecutionGraphRestartTest {
 	}
 
 	@Test
+	public void testConstraintsAfterRestart() throws Exception {
+		
+		//setting up
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+			new SimpleActorGateway(TestingUtils.directExecutionContext()),
+			NUM_TASKS);
+		
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+		
+		JobVertex groupVertex = new JobVertex("Task1");
+		groupVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+		groupVertex.setParallelism(NUM_TASKS);
+
+		JobVertex groupVertex2 = new JobVertex("Task2");
+		groupVertex2.setInvokableClass(Tasks.NoOpInvokable.class);
+		groupVertex2.setParallelism(NUM_TASKS);
+
+		SlotSharingGroup sharingGroup = new SlotSharingGroup();
+		groupVertex.setSlotSharingGroup(sharingGroup);
+		groupVertex2.setSlotSharingGroup(sharingGroup);
+		groupVertex.setStrictlyCoLocatedWith(groupVertex2);
+		
+		//initiate and schedule job
+		JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2);
+		ExecutionGraph eg = new ExecutionGraph(
+			TestingUtils.defaultExecutionContext(),
+			new JobID(),
+			"test job",
+			new Configuration(),
+			AkkaUtils.getDefaultTimeout());
+		eg.setNumberOfRetriesLeft(1);
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+		
+		eg.scheduleForExecution(scheduler);
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		
+		//sanity checks
+		validateConstraints(eg);
+
+		//restart automatically
+		restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), false);
+		
+		//checking execution vertex properties
+		validateConstraints(eg);
+
+		haltExecution(eg);
+	}
+
+	private void validateConstraints(ExecutionGraph eg) {
+		
+		ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
+		
+		for(int i=0; i<NUM_TASKS; i++){
+			CoLocationConstraint constr1 = tasks[0].getTaskVertices()[i].getLocationConstraint();
+			CoLocationConstraint constr2 = tasks[1].getTaskVertices()[i].getLocationConstraint();
+			assertNotNull(constr1.getSharedSlot());
+			assertTrue(constr1.isAssigned());
+			assertEquals(constr1, constr2);
+		}
+		
+	}
+
+	@Test
 	public void testRestartAutomatically() throws Exception {
 		Instance instance = ExecutionGraphTestUtils.getInstance(
 				new SimpleActorGateway(TestingUtils.directExecutionContext()),
@@ -122,50 +191,7 @@ public class ExecutionGraphRestartTest {
 		eg.scheduleForExecution(scheduler);
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
-
-		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
-		assertEquals(JobStatus.FAILING, eg.getState());
-
-		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-			vertex.getCurrentExecutionAttempt().cancelingComplete();
-		}
-
-		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
-
-		// Wait for async restart
-		Deadline deadline = timeout.fromNow();
-		while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
-			Thread.sleep(100);
-		}
-
-		assertEquals(JobStatus.RUNNING, eg.getState());
-
-		// Wait for deploying after async restart
-		deadline = timeout.fromNow();
-		boolean success = false;
-
-		while (deadline.hasTimeLeft() && !success) {
-			success = true;
-
-			for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-				if (vertex.getCurrentExecutionAttempt().getAssignedResource() == null) {
-					success = false;
-					Thread.sleep(100);
-					break;
-				}
-			}
-		}
-
-		if (deadline.hasTimeLeft()) {
-			for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-				vertex.getCurrentExecutionAttempt().markFinished();
-			}
-
-			assertEquals(JobStatus.FINISHED, eg.getState());
-		}
-		else {
-			fail("Failed to wait until all execution attempts left the state DEPLOYING.");
-		}
+		restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true);
 	}
 
 	@Test
@@ -360,4 +386,54 @@ public class ExecutionGraphRestartTest {
 		verify(eg, never()).restart();
 		assertEquals(1, eg.getNumberOfRetriesLeft());
 	}
+
+	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean
haltAfterRestart) throws InterruptedException {
+
+		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+		assertEquals(JobStatus.FAILING, eg.getState());
+
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+		}
+
+		// Wait for async restart
+		Deadline deadline = timeout.fromNow();
+		while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
+			Thread.sleep(100);
+		}
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		// Wait for deploying after async restart
+		deadline = timeout.fromNow();
+		boolean success = false;
+
+		while (deadline.hasTimeLeft() && !success) {
+			success = true;
+
+			for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+				if (vertex.getCurrentExecutionAttempt().getAssignedResource() == null) {
+					success = false;
+					Thread.sleep(100);
+					break;
+				}
+			}
+		}
+
+		if (haltAfterRestart) {
+			if (deadline.hasTimeLeft()) {
+				haltExecution(eg);
+			} else {
+				fail("Failed to wait until all execution attempts left the state DEPLOYING.");
+			}
+		}
+	}
+
+	private static void haltExecution(ExecutionGraph eg) {
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().markFinished();
+		}
+
+		assertEquals(JobStatus.FINISHED, eg.getState());
+	}
 }


Mime
View raw message