flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject incubator-flink git commit: Fixes race condition in ExecutionGraph which allowed a job to go into the finished state without all job vertices having properly processed the finalizeOnMaster method.
Date Thu, 18 Dec 2014 10:56:44 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 63ef8e86a -> 94cfe1487


Fixes race condition in ExecutionGraph which allowed a job to go into the finished state without
all job vertices having properly processed the finalizeOnMaster method.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/94cfe148
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/94cfe148
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/94cfe148

Branch: refs/heads/master
Commit: 94cfe14879bf6d399d816d267b00c8a99eee8165
Parents: 63ef8e8
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Dec 18 11:37:23 2014 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Dec 18 11:47:58 2014 +0100

----------------------------------------------------------------------
 .../executiongraph/ExecutionJobVertex.java      |  7 +-
 .../runtime/jobmanager/JobManagerITCase.java    | 75 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/94cfe148/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 9f8b56a..3ac0962 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -323,9 +323,8 @@ public class ExecutionJobVertex {
 		synchronized (stateMonitor) {
 			if (!finishedSubtasks[subtask]) {
 				finishedSubtasks[subtask] = true;
-				numSubtasksInFinalState++;
 				
-				if (numSubtasksInFinalState == parallelism) {
+				if (numSubtasksInFinalState+1 == parallelism) {
 					
 					// call finalizeOnMaster hook
 					try {
@@ -334,12 +333,16 @@ public class ExecutionJobVertex {
 					catch (Throwable t) {
 						getGraph().fail(t);
 					}
+
+					numSubtasksInFinalState++;
 					
 					// we are in our final state
 					stateMonitor.notifyAll();
 					
 					// tell the graph
 					graph.jobVertexInFinalState(this);
+				}else{
+					numSubtasksInFinalState++;
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/94cfe148/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index ae7857f..3f1c3b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -793,6 +793,51 @@ public class JobManagerITCase {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testSubtaskInFinalStateRaceCondition() {
+		final int NUM_TASKS = 1;
+
+		try{
+			final AbstractJobVertex source = new AbstractJobVertex("Source");
+			final WaitingOnFinalizeJobVertex sink = new WaitingOnFinalizeJobVertex("Sink", 500);
+			source.setInvokableClass(WaitingNoOpInvokable.class);
+			sink.setInvokableClass(NoOpInvokable.class);
+
+			source.setParallelism(NUM_TASKS);
+			sink.setParallelism(NUM_TASKS);
+
+			final JobGraph jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source,
+					sink);
+
+			final JobManager jm = startJobManager(2*NUM_TASKS);
+
+			try{
+				JobSubmissionResult result = jm.submitJob(jobGraph);
+
+				if(result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS){
+					fail(result.getDescription());
+				}
+
+				ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+				if(eg != null){
+					eg.waitForJobEnd();
+					assertEquals(JobStatus.FINISHED, eg.getState());
+				}
+
+				assertTrue("Sink has to have called finalizeOnMaster before job can finish.", sink
+						.finished);
+
+				waitForTaskThreadsToBeTerminated();
+			}finally{
+				jm.shutdown();
+			}
+		}catch(Exception e){
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Simple test tasks
@@ -886,4 +931,34 @@ public class JobManagerITCase {
 			}
 		}
 	}
+
+	public static final class WaitingOnFinalizeJobVertex extends AbstractJobVertex {
+		private final long waitingTime;
+		public boolean finished = false;
+
+		public WaitingOnFinalizeJobVertex(String name, long waitingTime) {
+			super(name);
+			this.waitingTime = waitingTime;
+		}
+
+		@Override
+		public void finalizeOnMaster(ClassLoader loader) throws Exception {
+			Thread.sleep(waitingTime);
+			finished = true;
+		}
+	}
+
+	public static final class WaitingNoOpInvokable extends AbstractInvokable{
+		private static long waitingTime = 100;
+
+		@Override
+		public void registerInputOutput() {
+
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			Thread.sleep(waitingTime);
+		}
+	}
 }


Mime
View raw message