flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/3] flink git commit: [FLINK-1808] [streaming] Send barrier requests only when the execution graph is running
Date Sun, 05 Apr 2015 08:53:06 GMT
Repository: flink
Updated Branches:
  refs/heads/master f36eb54ee -> 1da4b6437


[FLINK-1808] [streaming] Send barrier requests only when the execution graph is running

Closes #551


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1da4b643
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1da4b643
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1da4b643

Branch: refs/heads/master
Commit: 1da4b6437e33f794dd3603f029010f0cc70607d1
Parents: 1cf49e9
Author: Paris Carbone <seniorcarbone@gmail.com>
Authored: Tue Mar 31 13:51:07 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Apr 5 10:51:20 2015 +0200

----------------------------------------------------------------------
 .../runtime/jobmanager/StreamCheckpointCoordinator.scala     | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1da4b643/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index d9a3421..f42d08ab 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -23,7 +23,6 @@ import java.lang.Long
 import akka.actor._
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.execution.ExecutionState.RUNNING
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
 import org.apache.flink.runtime.jobgraph.JobStatus._
 import org.apache.flink.runtime.jobgraph.JobVertexID
@@ -84,13 +83,16 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
         case FAILED | CANCELED | FINISHED =>
           log.info("Stopping monitor for terminated job {}", executionGraph.getJobID)
           self ! PoisonPill
-        case _ =>
+        case RUNNING =>
           curId += 1
           log.debug("Sending Barrier to vertices of Job " + executionGraph.getJobName)
           vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex &&
                   v.getExecutionState == RUNNING).foreach(vertex
           => vertex.getCurrentAssignedResource.getInstance.getTaskManager
                     ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
+        case _ =>
+          log.debug("Omitting sending barrier since graph is in {} state for job {}",
+            executionGraph.getState, executionGraph.getJobID)
       }
       
     case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
@@ -112,7 +114,7 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
       ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
       acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
       states = states.filterKeys(_._3 >= ackId)
-      log.debug("[FT-MONITOR] Last global barrier is " + ackId)
+      log.debug("Last global barrier is " + ackId)
       executionGraph.loadOperatorStates(states)
       
   }


Mime
View raw message