flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-3800] [runtime] Introduce SUSPENDED job status
Date Thu, 23 Jun 2016 16:04:14 GMT
[FLINK-3800] [runtime] Introduce SUSPENDED job status

The SUSPENDED job status is a new ExecutionGraph state which can be reached from all
non-terminal states when calling suspend on the ExecutionGraph. Unlike the FAILED,
FINISHED and CANCELED state, the SUSPENDED state does not trigger the deletion of the
job from the HA storage. Therefore, this state can be used to handle the loss of
leadership or the shutdown of a JobManager so that the ExecutionGraph is stopped but
can still be recovered. SUSPENDED is also a terminal state but it can be differentiated as
a locally terminal state from FAILED, CANCELED and FINISHED which are globally
terminal states.

Add test case for suspend signal

Add test case for suspending restarting job

Add test case for HA job recovery when losing leadership

Add online documentation for the job status

Add ASF license header to job_status.svg

Not throw exception when calling ExecutionGraph.restart and job is in state SUSPENDED

This closes #2096.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6420c1c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6420c1c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6420c1c2

Branch: refs/heads/master
Commit: 6420c1c264ed3ce0c32ba164c2cdb85ccdccf265
Parents: cfe6293
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Jun 9 11:37:14 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Jun 23 15:12:10 2016 +0200

----------------------------------------------------------------------
 docs/internals/fig/job_status.svg               | 973 +++++++++++++++++++
 docs/internals/job_scheduling.md                |  23 +-
 .../webmonitor/BackPressureStatsTracker.java    |   4 +-
 .../webmonitor/handlers/JobDetailsHandler.java  |   2 +-
 .../runtime/checkpoint/HeapStateStore.java      |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  80 +-
 .../restart/FixedDelayRestartStrategy.java      |   8 +-
 .../restart/NoRestartStrategy.java              |   3 -
 .../executiongraph/restart/RestartStrategy.java |   5 -
 .../flink/runtime/jobgraph/JobStatus.java       |  38 +-
 .../runtime/webmonitor/WebMonitorUtils.java     |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  27 +-
 .../ExecutionGraphRestartTest.java              | 121 +++
 .../ExecutionGraphSignalsTest.java              |  66 ++
 .../jobmanager/JobManagerHARecoveryTest.java    | 310 ++++++
 .../LeaderChangeJobRecoveryTest.java            |  10 +-
 .../LeaderChangeStateCleanupTest.java           |  15 +-
 .../LeaderElectionRetrievalTestingCluster.java  |  15 +-
 .../runtime/taskmanager/TaskCancelTest.java     |   2 +-
 .../testutils/JobManagerActorTestUtils.java     |   2 +-
 .../runtime/testingUtils/TestingCluster.scala   |   4 -
 .../testutils/JobManagerCommunicationUtils.java |   2 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |  51 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   2 +-
 24 files changed, 1660 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/docs/internals/fig/job_status.svg
----------------------------------------------------------------------
diff --git a/docs/internals/fig/job_status.svg b/docs/internals/fig/job_status.svg
new file mode 100644
index 0000000..a1093bc
--- /dev/null
+++ b/docs/internals/fig/job_status.svg
@@ -0,0 +1,973 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<!-- Created with Inkscape (http://www.inkscape.org/) -->
+
+<svg
+   xmlns:dc="http://purl.org/dc/elements/1.1/"
+   xmlns:cc="http://creativecommons.org/ns#"
+   xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+   xmlns:svg="http://www.w3.org/2000/svg"
+   xmlns="http://www.w3.org/2000/svg"
+   xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
+   xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
+   width="240mm"
+   height="220mm"
+   viewBox="0 0 850.3937 779.52755"
+   id="svg2"
+   version="1.1"
+   inkscape:version="0.91 r13725"
+   sodipodi:docname="job_status.svg">
+  <defs
+     id="defs4">
+    <marker
+       inkscape:stockid="Arrow2Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="marker4534"
+       style="overflow:visible"
+       inkscape:isstock="true">
+      <path
+         inkscape:connector-curvature="0"
+         id="path4536"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="scale(-0.6,-0.6)" />
+    </marker>
+    <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker7718"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend"
+       inkscape:collect="always">
+      <path
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path7720"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="marker6764"
+       style="overflow:visible"
+       inkscape:isstock="true"
+       inkscape:collect="always">
+      <path
+         id="path6766"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="scale(-0.6,-0.6)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker6634"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend"
+       inkscape:collect="always">
+      <path
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path6636"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="marker6510"
+       style="overflow:visible"
+       inkscape:isstock="true"
+       inkscape:collect="always">
+      <path
+         id="path6512"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="scale(-0.6,-0.6)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker6392"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend"
+       inkscape:collect="always">
+      <path
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path6394"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="marker6280"
+       style="overflow:visible"
+       inkscape:isstock="true"
+       inkscape:collect="always">
+      <path
+         id="path6282"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="scale(-0.6,-0.6)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker6174"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend"
+       inkscape:collect="always">
+      <path
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path6176"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="marker6074"
+       style="overflow:visible"
+       inkscape:isstock="true"
+       inkscape:collect="always">
+      <path
+         id="path6076"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="scale(-0.6,-0.6)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="marker5892"
+       style="overflow:visible"
+       inkscape:isstock="true"
+       inkscape:collect="always">
+      <path
+         id="path5894"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="scale(-0.6,-0.6)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker5237"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend"
+       inkscape:collect="always">
+      <path
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path5239"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="marker5157"
+       style="overflow:visible"
+       inkscape:isstock="true"
+       inkscape:collect="always">
+      <path
+         id="path5159"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="scale(-0.6,-0.6)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker5075"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend"
+       inkscape:collect="always">
+      <path
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path5077"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="marker5005"
+       style="overflow:visible"
+       inkscape:isstock="true"
+       inkscape:collect="always">
+      <path
+         id="path5007"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="scale(-0.6,-0.6)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker4947"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend"
+       inkscape:collect="always">
+      <path
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path4949"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="marker4831"
+       style="overflow:visible"
+       inkscape:isstock="true"
+       inkscape:collect="always">
+      <path
+         id="path4833"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="scale(-0.6,-0.6)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="Arrow2Mend"
+       style="overflow:visible"
+       inkscape:isstock="true">
+      <path
+         id="path4486"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="scale(-0.6,-0.6)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow2Lend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="Arrow2Lend"
+       style="overflow:visible"
+       inkscape:isstock="true">
+      <path
+         id="path4480"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         transform="matrix(-1.1,0,0,-1.1,-1.1,0)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow1Lend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="Arrow1Lend"
+       style="overflow:visible"
+       inkscape:isstock="true">
+      <path
+         id="path4462"
+         d="M 0,0 5,-5 -12.5,0 5,5 0,0 Z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:1pt;stroke-opacity:1"
+         transform="matrix(-0.8,0,0,-0.8,-10,0)"
+         inkscape:connector-curvature="0" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow1Mend"
+       orient="auto"
+       refY="0"
+       refX="0"
+       id="Arrow1Mend"
+       style="overflow:visible"
+       inkscape:isstock="true">
+      <path
+         id="path4468"
+         d="M 0,0 5,-5 -12.5,0 5,5 0,0 Z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:1pt;stroke-opacity:1"
+         transform="matrix(-0.4,0,0,-0.4,-4,0)"
+         inkscape:connector-curvature="0" />
+    </marker>
+  </defs>
+  <sodipodi:namedview
+     id="base"
+     pagecolor="#ffffff"
+     bordercolor="#666666"
+     borderopacity="1.0"
+     inkscape:pageopacity="0.0"
+     inkscape:pageshadow="2"
+     inkscape:zoom="1.4"
+     inkscape:cx="584.30425"
+     inkscape:cy="378.45547"
+     inkscape:document-units="px"
+     inkscape:current-layer="layer1"
+     showgrid="true"
+     inkscape:window-width="2134"
+     inkscape:window-height="1377"
+     inkscape:window-x="1533"
+     inkscape:window-y="16"
+     inkscape:window-maximized="0">
+    <inkscape:grid
+       type="xygrid"
+       id="grid4136" />
+  </sodipodi:namedview>
+  <metadata
+     id="metadata7">
+    <rdf:RDF>
+      <cc:Work
+         rdf:about="">
+        <dc:format>image/svg+xml</dc:format>
+        <dc:type
+           rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
+        <dc:title></dc:title>
+      </cc:Work>
+    </rdf:RDF>
+  </metadata>
+  <g
+     inkscape:label="Layer 1"
+     inkscape:groupmode="layer"
+     id="layer1"
+     transform="translate(0,-272.83465)">
+    <g
+       id="g4324"
+       transform="translate(-30.285714,162.34191)">
+      <rect
+         ry="22.587013"
+         rx="21.337021"
+         y="462.81091"
+         x="79.020126"
+         height="45.174026"
+         width="126.60261"
+         id="rect4140"
+         style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1" />
+      <text
+         sodipodi:linespacing="125%"
+         id="text4142"
+         y="494.71799"
+         x="93.243065"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+         xml:space="preserve"><tspan
+           y="494.71799"
+           x="93.243065"
+           id="tspan4144"
+           sodipodi:role="line">Created</tspan></text>
+    </g>
+    <g
+       id="g4286"
+       transform="translate(-39.560883,231.66354)">
+      <rect
+         style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
+         id="rect4254"
+         width="126.60261"
+         height="45.174026"
+         x="336.96799"
+         y="393.48929"
+         rx="21.337021"
+         ry="22.587013" />
+      <text
+         xml:space="preserve"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+         x="348.88379"
+         y="422.97327"
+         id="text4182"
+         sodipodi:linespacing="125%"><tspan
+           sodipodi:role="line"
+           id="tspan4184"
+           x="348.88379"
+           y="422.97327">Running</tspan></text>
+    </g>
+    <g
+       id="g4426"
+       transform="translate(38,166)">
+      <rect
+         style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:24, 4;stroke-dashoffset:0;stroke-opacity:1"
+         id="rect4260"
+         width="126.60261"
+         height="45.174026"
+         x="532.07977"
+         y="459.15283"
+         rx="21.337021"
+         ry="22.587013" />
+      <text
+         sodipodi:linespacing="125%"
+         id="text4199"
+         y="491.05991"
+         x="544.06885"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+         xml:space="preserve"><tspan
+           y="491.05991"
+           x="544.06885"
+           id="tspan4201"
+           sodipodi:role="line">Finished</tspan></text>
+    </g>
+    <g
+       id="g4276"
+       transform="translate(-8.802002,175.91335)">
+      <rect
+         style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
+         id="rect4256"
+         width="126.60261"
+         height="45.174026"
+         x="319.79538"
+         y="264.69485"
+         rx="21.337021"
+         ry="22.587013" />
+      <text
+         sodipodi:linespacing="125%"
+         id="text4216"
+         y="294.17883"
+         x="342.99048"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+         xml:space="preserve"><tspan
+           y="294.17883"
+           x="342.99048"
+           id="tspan4218"
+           sodipodi:role="line">Failing</tspan></text>
+    </g>
+    <g
+       id="g4421"
+       transform="translate(40,166)">
+      <rect
+         ry="22.587013"
+         rx="21.337021"
+         y="274.60822"
+         x="529.78455"
+         height="45.174026"
+         width="126.60261"
+         id="rect4258"
+         style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:16, 4;stroke-dashoffset:0;stroke-opacity:1" />
+      <text
+         xml:space="preserve"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+         x="556.71503"
+         y="306.51529"
+         id="text4222"
+         sodipodi:linespacing="125%"><tspan
+           sodipodi:role="line"
+           id="tspan4224"
+           x="556.71503"
+           y="306.51529">Failed</tspan></text>
+    </g>
+    <g
+       id="g4416"
+       transform="translate(14,166)">
+      <rect
+         ry="22.500114"
+         rx="26.670492"
+         y="639.99316"
+         x="238.02916"
+         height="45.000229"
+         width="158.24863"
+         id="rect4252"
+         style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1" />
+      <text
+         sodipodi:linespacing="125%"
+         id="text4228"
+         y="669.39026"
+         x="252.4257"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+         xml:space="preserve"><tspan
+           y="669.39026"
+           x="252.4257"
+           id="tspan4230"
+           sodipodi:role="line">Cancelling</tspan></text>
+    </g>
+    <g
+       id="g4431"
+       transform="translate(38,166)">
+      <rect
+         ry="22.551325"
+         rx="23.453072"
+         y="639.94202"
+         x="519.86011"
+         height="45.10265"
+         width="139.15814"
+         id="rect4262"
+         style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:24, 4;stroke-dashoffset:0;stroke-opacity:1" />
+      <text
+         xml:space="preserve"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+         x="531.91351"
+         y="671.81342"
+         id="text4234"
+         sodipodi:linespacing="125%"><tspan
+           sodipodi:role="line"
+           id="tspan4236"
+           x="531.91351"
+           y="671.81342">Canceled</tspan></text>
+    </g>
+    <g
+       id="g4411"
+       transform="translate(14,166)">
+      <rect
+         style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
+         id="rect4250"
+         width="158.24863"
+         height="45.000229"
+         x="32.46925"
+         y="274.6951"
+         rx="26.670492"
+         ry="22.500114" />
+      <text
+         sodipodi:linespacing="125%"
+         id="text4240"
+         y="304.09219"
+         x="47.170963"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+         xml:space="preserve"><tspan
+           y="304.09219"
+           x="47.170963"
+           id="tspan4242"
+           sodipodi:role="line">Restarting</tspan></text>
+    </g>
+    <g
+       id="g4436"
+       transform="translate(11.142857,169.57143)">
+      <rect
+         style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:24, 4;stroke-dashoffset:0;stroke-opacity:1"
+         id="rect4264"
+         width="167.29372"
+         height="44.953941"
+         x="492.55664"
+         y="790.35205"
+         rx="28.194908"
+         ry="22.476971" />
+      <text
+         xml:space="preserve"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+         x="507.03235"
+         y="819.72601"
+         id="text4246"
+         sodipodi:linespacing="125%"><tspan
+           sodipodi:role="line"
+           id="tspan4248"
+           x="507.03235"
+           y="819.72601">Suspended</tspan></text>
+    </g>
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#Arrow2Mend)"
+       d="m 175.31595,646.72195 122.27415,0.17603"
+       id="path4441"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4324"
+       inkscape:connection-end="#g4286"
+       sodipodi:nodetypes="cc" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4831)"
+       d="m 423.67902,643.73984 146.73144,0"
+       id="path4443"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4286"
+       inkscape:connection-end="#g4426" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5157)"
+       d="M 143.45346,625.15282 C 240.11678,592.2528 289.05237,539.16028 337.31308,485.78223"
+       id="path4445"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4324"
+       inkscape:connection-end="#g4276"
+       sodipodi:nodetypes="cc" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5075)"
+       d="m 437.2653,459.19522 132.84994,10e-6"
+       id="path4447"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4276"
+       inkscape:connection-end="#g4421" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4947)"
+       d="M 140.03639,670.32685 308.21925,805.99316"
+       id="path4449"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4324"
+       inkscape:connection-end="#g4416" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5005)"
+       d="m 409.8612,824.4933 148.36356,3e-5"
+       id="path4451"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4416"
+       inkscape:connection-end="#g4431" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5237)"
+       d="M 119.50359,668.89828 C 120,902.36221 317.44733,915.62541 504.35792,974.20919"
+       id="path4453"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4324"
+       inkscape:connection-end="#g4436"
+       sodipodi:nodetypes="cc" />
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="182.57143"
+       y="643.07654"
+       id="text4913"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan4915"
+         x="182.57143"
+         y="643.07654"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Schedule job</tspan></text>
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="432"
+       y="638.36218"
+       id="text4929"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan4931"
+         x="432"
+         y="638.36218"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices </tspan><tspan
+         sodipodi:role="line"
+         x="432"
+         y="657.11218"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
+         id="tspan4933">in final state</tspan></text>
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="424"
+       y="820.36218"
+       id="text5063"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan5065"
+         x="424"
+         y="820.36218"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices </tspan><tspan
+         sodipodi:role="line"
+         x="424"
+         y="839.11218"
+         id="tspan5067"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">in final state</tspan></text>
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="442"
+       y="456.36221"
+       id="text5139"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan5141"
+         x="442"
+         y="456.36221"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices </tspan><tspan
+         sodipodi:role="line"
+         x="442"
+         y="475.11221"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
+         id="tspan5143">in final state &amp; </tspan><tspan
+         sodipodi:role="line"
+         x="442"
+         y="493.86221"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
+         id="tspan5145">not restartable</tspan></text>
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="264"
+       y="578.36218"
+       id="text5227"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan5229"
+         x="264"
+         y="578.36218"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Fail job</tspan></text>
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="218"
+       y="728.36218"
+       id="text5565"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan5567"
+         x="218"
+         y="728.36218"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Cancel job</tspan></text>
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5892)"
+       d="m 314.18121,477.05236 c -47.69818,20.03987 -84.94599,9.32911 -116.30849,2.14285"
+       id="path5569"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4276"
+       inkscape:connection-end="#g4411"
+       sodipodi:nodetypes="cc" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6174)"
+       d="M 385.20445,485.78223 C 940,652.36221 835.65722,822.42397 665.47877,968.49491"
+       id="path5571"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4276"
+       inkscape:connection-end="#g4436"
+       sodipodi:nodetypes="cc" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6280)"
+       d="m 378.71359,670.32685 c 466.9675,42.03536 386.85357,182.13109 263.2627,288.16806"
+       id="path5573"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4286"
+       inkscape:connection-end="#g4436"
+       sodipodi:nodetypes="cc" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6392)"
+       d="M 375.26539,850.99339 556.58971,959.92348"
+       id="path5575"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4416"
+       inkscape:connection-end="#g4436" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6074)"
+       d="M 173.42792,441.40962 C 790,62.362207 855,633.79078 686.95342,807.37059"
+       id="path5579"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4411"
+       inkscape:connection-end="#g4431"
+       sodipodi:nodetypes="cc" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6764)"
+       d="M 123.688,485.69533 113.6599,625.15282"
+       id="path5581"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4411"
+       inkscape:connection-end="#g4324" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6510)"
+       d="m 362.07679,625.15283 10.26054,-139.3706"
+       id="path5585"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4286"
+       inkscape:connection-end="#g4276" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6634)"
+       d="M 356.36117,670.32685 334.17842,805.99316"
+       id="path5587"
+       inkscape:connector-type="polyline"
+       inkscape:connector-curvature="0"
+       inkscape:connection-start="#g4286"
+       inkscape:connection-end="#g4416" />
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker7718)"
+       d="M 194.71429,444.79077 C 295.26058,393.86555 426.46327,380.03465 584,438.3622"
+       id="path7710"
+       inkscape:connector-curvature="0"
+       sodipodi:nodetypes="cc" />
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="10"
+       y="556.36218"
+       id="text8166"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan8168"
+         x="10"
+         y="556.36218"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Restarted job</tspan></text>
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="156"
+       y="906.36218"
+       id="text8170"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan8172"
+         x="156"
+         y="906.36218"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Suspend job</tspan></text>
+    <text
+       sodipodi:linespacing="125%"
+       id="text8174"
+       y="906.93359"
+       x="468.57144"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       xml:space="preserve"><tspan
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
+         y="906.93359"
+         x="468.57144"
+         id="tspan8176"
+         sodipodi:role="line">Suspend job</tspan></text>
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="746.42859"
+       y="906.2193"
+       id="text8178"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan8180"
+         x="746.42859"
+         y="906.2193"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Suspend job</tspan></text>
+    <text
+       sodipodi:linespacing="125%"
+       id="text8182"
+       y="717.64789"
+       x="482.14288"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       xml:space="preserve"><tspan
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
+         y="717.64789"
+         x="482.14288"
+         id="tspan8184"
+         sodipodi:role="line">Suspend job</tspan></text>
+    <text
+       sodipodi:linespacing="125%"
+       id="text8186"
+       y="741.93359"
+       x="353"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       xml:space="preserve"><tspan
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
+         y="741.93359"
+         x="353"
+         id="tspan8188"
+         sodipodi:role="line">Cancel job</tspan></text>
+    <text
+       sodipodi:linespacing="125%"
+       id="text8190"
+       y="390.50507"
+       x="361.14285"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       xml:space="preserve"><tspan
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
+         y="390.50507"
+         x="361.14285"
+         id="tspan8192"
+         sodipodi:role="line">Fail job</tspan></text>
+    <text
+       sodipodi:linespacing="125%"
+       id="text8194"
+       y="306.21933"
+       x="487.28571"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       xml:space="preserve"><tspan
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
+         y="306.21933"
+         x="487.28571"
+         id="tspan8196"
+         sodipodi:role="line">Cancel job</tspan></text>
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="174"
+       y="510.36221"
+       id="text8198"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan8200"
+         x="174"
+         y="510.36221"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices</tspan><tspan
+         sodipodi:role="line"
+         x="174"
+         y="529.11218"
+         id="tspan8202"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">in final state &amp;</tspan><tspan
+         sodipodi:role="line"
+         x="174"
+         y="547.86218"
+         id="tspan8204"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">restartable</tspan></text>
+    <text
+       sodipodi:linespacing="125%"
+       id="text8206"
+       y="574.07654"
+       x="374"
+       style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       xml:space="preserve"><tspan
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
+         y="574.07654"
+         x="374"
+         id="tspan8208"
+         sodipodi:role="line">Fail job</tspan></text>
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4534)"
+       d="M 9.5714286,648.36221 46,648.36221"
+       id="path3470"
+       inkscape:connector-curvature="0"
+       sodipodi:nodetypes="cc" />
+  </g>
+</svg>

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/docs/internals/job_scheduling.md
----------------------------------------------------------------------
diff --git a/docs/internals/job_scheduling.md b/docs/internals/job_scheduling.md
index 1e1da97..9163678 100644
--- a/docs/internals/job_scheduling.md
+++ b/docs/internals/job_scheduling.md
@@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains the {% gh_link /flink-run
 <img src="fig/job_and_execution_graph.svg" alt="JobGraph and ExecutionGraph" height="400px" style="text-align: center;"/>
 </div>
 
-During its execution, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the 
+Each ExecutionGraph has a job status associated with it.
+This job status indicates the current state of the job execution.
+
+A Flink job is first in the *created* state, then switches to *running* and upon completion of all work it switches to *finished*.
+In case of failures, a job switches first to *failing* where it cancels all running tasks.
+If all job vertices have reached a final state and the job is not restartable, then the job transitions to *failed*.
+If the job can be restarted, then it will enter the *restarting* state.
+Once the job has been completely restarted, it will reach the *created* state.
+
+In case that the user cancels the job, it will go into the *cancelling* state.
+This also entails the cancellation of all currently running tasks.
+Once all running tasks have reached a final state, the job transitions to the state *cancelled*.
+
+Unlike the states *finished*, *canceled* and *failed* which denote a globally terminal state and, thus, trigger the clean up of the job, the *suspended* state is only locally terminal.
+Locally terminal means that the execution of the job has been terminated on the respective JobManager but another JobManager of the Flink cluster can retrieve the job from the persistent HA store and restart it.
+Consequently, a job which reaches the *suspended* state won't be completely cleaned up.
+
+<div style="text-align: center;">
+<img src="fig/job_status.svg" alt="States and Transitions of Flink job" height="500px" style="text-align: center;"/>
+</div>
+
+During the execution of the ExecutionGraph, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the 
 states and possible transitions between them. A task may be executed multiple times (for example in the course of failure recovery).
 For that reason, the execution of an ExecutionVertex is tracked in an {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java "Execution" %}. Each ExecutionVertex has a current Execution, and prior Executions.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
index 34d8069..f890106 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -163,7 +163,7 @@ public class BackPressureStatsTracker {
 			}
 
 			if (!pendingStats.contains(vertex) &&
-					!vertex.getGraph().getState().isTerminalState()) {
+					!vertex.getGraph().getState().isGloballyTerminalState()) {
 
 				ExecutionContext executionContext = vertex.getGraph().getExecutionContext();
 
@@ -245,7 +245,7 @@ public class BackPressureStatsTracker {
 
 					// Job finished, ignore.
 					JobStatus jobState = vertex.getGraph().getState();
-					if (jobState.isTerminalState()) {
+					if (jobState.isGloballyTerminalState()) {
 						LOG.debug("Ignoring sample, because job is in state " + jobState + ".");
 					} else if (success != null) {
 						OperatorBackPressureStats stats = createStatsFromSample(success);

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 4f31128..884b859 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -66,7 +66,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 		
 		// times and duration
 		final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED);
-		final long jobEndTime = graph.getState().isTerminalState() ?
+		final long jobEndTime = graph.getState().isGloballyTerminalState() ?
 				graph.getStatusTimestamp(graph.getState()) : -1L;
 		gen.writeNumberField("start-time", jobStartTime);
 		gen.writeNumberField("end-time", jobEndTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java
index a0b3804..c679d50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/HeapStateStore.java
@@ -31,7 +31,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @param <T> Type of state
  */
-class HeapStateStore<T extends Serializable> implements StateStore<T> {
+public class HeapStateStore<T extends Serializable> implements StateStore<T> {
 
 	private final ConcurrentMap<String, T> stateMap = new ConcurrentHashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b11f51d..e521888 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -797,18 +797,50 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
-	public void fail(Throwable t) {
-		if (t instanceof SuppressRestartsException) {
-			if (restartStrategy != null) {
-				// disable the restart strategy in case that we have seen a SuppressRestartsException
-				// it basically overrides the restart behaviour of a the root cause
-				restartStrategy.disable();
+	/**
+	 * Suspends the current ExecutionGraph.
+	 *
+	 * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
+	 * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
+	 *
+	 * The SUSPENDED state is a local terminal state which stops the execution of the job but does
+	 * not remove the job from the HA job store so that it can be recovered by another JobManager.
+	 *
+	 * @param suspensionCause Cause of the suspension
+	 */
+	public void suspend(Throwable suspensionCause) {
+		while (true) {
+			JobStatus currentState = state;
+
+			if (currentState.isGloballyTerminalState()) {
+				// stay in a terminal state
+				return;
+			} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
+				this.failureCause = suspensionCause;
+
+				for (ExecutionJobVertex ejv: verticesInCreationOrder) {
+					ejv.cancel();
+				}
+
+				synchronized (progressLock) {
+						postRunCleanup();
+						progressLock.notifyAll();
+
+						LOG.info("Job {} has been suspended.", getJobID());
+				}
+
+				return;
 			}
 		}
+	}
 
+	public void fail(Throwable t) {
 		while (true) {
 			JobStatus current = state;
-			if (current == JobStatus.FAILING || current.isTerminalState()) {
+			// stay in these states
+			if (current == JobStatus.FAILING ||
+				current == JobStatus.SUSPENDED ||
+				current.isGloballyTerminalState()) {
 				return;
 			} else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
 				synchronized (progressLock) {
@@ -849,6 +881,9 @@ public class ExecutionGraph implements Serializable {
 				} else if (current == JobStatus.FAILED) {
 					LOG.info("Failed job during restart. Aborting restart.");
 					return;
+				} else if (current == JobStatus.SUSPENDED) {
+					LOG.info("Suspended job during restart. Aborting restart.");
+					return;
 				} else if (current != JobStatus.RESTARTING) {
 					throw new IllegalStateException("Can only restart job from state restarting.");
 				}
@@ -947,7 +982,7 @@ public class ExecutionGraph implements Serializable {
 	 * This method cleans fields that are irrelevant for the archived execution attempt.
 	 */
 	public void prepareForArchiving() {
-		if (!state.isTerminalState()) {
+		if (!state.isGloballyTerminalState()) {
 			throw new IllegalStateException("Can only archive the job from a terminal state");
 		}
 
@@ -984,7 +1019,7 @@ public class ExecutionGraph implements Serializable {
 	 */
 	public void waitUntilFinished() throws InterruptedException {
 		synchronized (progressLock) {
-			while (!state.isTerminalState()) {
+			while (!state.isGloballyTerminalState()) {
 				progressLock.wait();
 			}
 		}
@@ -1037,23 +1072,28 @@ public class ExecutionGraph implements Serializable {
 						}
 					}
 					else if (current == JobStatus.FAILING) {
-						if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
-							// double check in case that in the meantime a SuppressRestartsException was thrown
-							if (restartStrategy.canRestart()) {
-								restartStrategy.restart(this);
-								break;
-							} else {
-								fail(new Exception("ExecutionGraph went into RESTARTING state but " +
-									"then the restart strategy was disabled."));
-							}
-
-						} else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) {
+						boolean allowRestart = !(failureCause instanceof SuppressRestartsException);
+
+						if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
+							restartStrategy.restart(this);
+							break;
+						} else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {
 							postRunCleanup();
 							break;
 						}
 					}
+					else if (current == JobStatus.SUSPENDED) {
+						// we've already cleaned up when entering the SUSPENDED state
+						break;
+					}
+					else if (current.isGloballyTerminalState()) {
+						LOG.warn("Job has entered globally terminal state without waiting for all " +
+							"job vertices to reach final state.");
+						break;
+					}
 					else {
 						fail(new Exception("ExecutionGraph went into final state from state " + current));
+						break;
 					}
 				}
 				// done transitioning the state

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 3406f4e..ac06379 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -41,7 +41,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	private final int maxNumberRestartAttempts;
 	private final long delayBetweenRestartAttempts;
 	private int currentRestartAttempt;
-	private boolean disabled = false;
 
 	public FixedDelayRestartStrategy(
 		int maxNumberRestartAttempts,
@@ -61,7 +60,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 
 	@Override
 	public boolean canRestart() {
-		return !disabled && currentRestartAttempt < maxNumberRestartAttempts;
+		return currentRestartAttempt < maxNumberRestartAttempts;
 	}
 
 	@Override
@@ -84,11 +83,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 		}, executionGraph.getExecutionContext());
 	}
 
-	@Override
-	public void disable() {
-		disabled = true;
-	}
-
 	/**
 	 * Creates a FixedDelayRestartStrategy from the given Configuration.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 6cc5ee4..958d9ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -36,9 +36,6 @@ public class NoRestartStrategy implements RestartStrategy {
 		throw new RuntimeException("NoRestartStrategy does not support restart.");
 	}
 
-	@Override
-	public void disable() {}
-
 	/**
 	 * Creates a NoRestartStrategy instance.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index c9e6277..2880c01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -38,9 +38,4 @@ public interface RestartStrategy {
 	 * @param executionGraph The ExecutionGraph to be restarted
 	 */
 	void restart(ExecutionGraph executionGraph);
-
-	/**
-	 * Disables the restart strategy.
-	 */
-	void disable();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index eb7d017..52a2abe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -24,38 +24,52 @@ package org.apache.flink.runtime.jobgraph;
 public enum JobStatus {
 
 	/** Job is newly created, no task has started to run. */
-	CREATED(false),
+	CREATED(TerminalState.NON_TERMINAL),
 
 	/** Some tasks are scheduled or running, some may be pending, some may be finished. */
-	RUNNING(false),
+	RUNNING(TerminalState.NON_TERMINAL),
 
 	/** The job has failed and is currently waiting for the cleanup to complete */
-	FAILING(false),
+	FAILING(TerminalState.NON_TERMINAL),
 	
 	/** The job has failed with a non-recoverable task failure */
-	FAILED(true),
+	FAILED(TerminalState.GLOBALLY),
 
 	/** Job is being cancelled */
-	CANCELLING(false),
+	CANCELLING(TerminalState.NON_TERMINAL),
 	
 	/** Job has been cancelled */
-	CANCELED(true),
+	CANCELED(TerminalState.GLOBALLY),
 
 	/** All of the job's tasks have successfully finished. */
-	FINISHED(true),
+	FINISHED(TerminalState.GLOBALLY),
 	
 	/** The job is currently undergoing a reset and total restart */
-	RESTARTING(false);
+	RESTARTING(TerminalState.NON_TERMINAL),
+
+	/**
+	 * The job has been suspended which means that it has been stopped but not been removed from a
+	 * potential HA job store.
+	 */
+	SUSPENDED(TerminalState.LOCALLY);
 	
 	// --------------------------------------------------------------------------------------------
+
+	enum TerminalState {
+		NON_TERMINAL,
+		LOCALLY,
+		GLOBALLY
+	}
 	
-	private final boolean terminalState;
+	private final TerminalState terminalState;
 	
-	JobStatus(boolean terminalState) {
+	JobStatus(TerminalState terminalState) {
 		this.terminalState = terminalState;
 	}
 	
-	public boolean isTerminalState() {
-		return terminalState;
+	public boolean isGloballyTerminalState() {
+		return terminalState == TerminalState.GLOBALLY;
 	}
 }
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 6d89de0..37a91b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -168,7 +168,7 @@ public final class WebMonitorUtils {
 		JobStatus status = job.getState();
 
 		long started = job.getStatusTimestamp(JobStatus.CREATED);
-		long finished = status.isTerminalState() ? job.getStatusTimestamp(status) : -1L;
+		long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L;
 
 		int[] countsPerStatus = new int[ExecutionState.values().length];
 		long lastChanged = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8ab887a..46f7ed2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -210,8 +210,7 @@ class JobManager(
     log.info(s"Stopping JobManager $getAddress.")
 
     val newFuturesToComplete = cancelAndClearEverything(
-      new Exception("The JobManager is shutting down."),
-      removeJobFromStateBackend = true)
+      new Exception("The JobManager is shutting down."))
 
     implicit val executionContext = context.dispatcher
 
@@ -307,8 +306,7 @@ class JobManager(
       log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
 
       val newFuturesToComplete = cancelAndClearEverything(
-        new Exception("JobManager is no longer the leader."),
-        removeJobFromStateBackend = false)
+        new Exception("JobManager is no longer the leader."))
 
       futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)
 
@@ -746,7 +744,7 @@ class JobManager(
             s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.",
             error)
 
-          if (newJobStatus.isTerminalState()) {
+          if (newJobStatus.isGloballyTerminalState()) {
             jobInfo.end = timeStamp
 
             future{
@@ -951,7 +949,7 @@ class JobManager(
     case RemoveCachedJob(jobID) =>
       currentJobs.get(jobID) match {
         case Some((graph, info)) =>
-          if (graph.getState.isTerminalState) {
+          if (graph.getState.isGloballyTerminalState) {
             removeJob(graph.getJobID, removeJobFromStateBackend = true) match {
               case Some(futureToComplete) =>
                 futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete)
@@ -1632,23 +1630,11 @@ class JobManager(
     *
     * @param cause Cause for the cancelling.
     */
-  private def cancelAndClearEverything(
-      cause: Throwable,
-      removeJobFromStateBackend: Boolean)
+  private def cancelAndClearEverything(cause: Throwable)
     : Seq[Future[Unit]] = {
     val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
       future {
-        if (removeJobFromStateBackend) {
-          try {
-            submittedJobGraphs.removeJobGraph(jobID)
-          }
-          catch {
-            case t: Throwable =>
-              log.error("Error during submitted job graph clean up.", t)
-          }
-        }
-
-        eg.fail(cause)
+        eg.suspend(cause)
 
         if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
           jobInfo.client ! decorateMessage(
@@ -1667,7 +1653,6 @@ class JobManager(
   }
 
   override def revokeLeadership(): Unit = {
-    leaderSessionID = None
     self ! decorateMessage(RevokeLeadership)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 687a46a..26ba04f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import akka.dispatch.Futures;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
@@ -40,10 +41,14 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.impl.Promise;
 
 import java.util.Iterator;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
@@ -667,6 +672,122 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.RESTARTING, eg.getState());
 	}
 
+	/**
+	 * Tests that a suspend call while restarting a job, will abort the restarting.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testSuspendWhileRestarting() throws Exception {
+		FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES);
+		Deadline deadline = timeout.fromNow();
+
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+			new SimpleActorGateway(TestingUtils.directExecutionContext()),
+			NUM_TASKS);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		JobVertex sender = new JobVertex("Task");
+		sender.setInvokableClass(Tasks.NoOpInvokable.class);
+		sender.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+
+		ControllableRestartStrategy controllableRestartStrategy = new ControllableRestartStrategy(timeout);
+
+		ExecutionGraph eg = new ExecutionGraph(
+			TestingUtils.defaultExecutionContext(),
+			new JobID(),
+			"Test job",
+			new Configuration(),
+			ExecutionConfigTest.getSerializedConfig(),
+			AkkaUtils.getDefaultTimeout(),
+			controllableRestartStrategy);
+
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+
+		eg.scheduleForExecution(scheduler);
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		instance.markDead();
+
+		Await.ready(controllableRestartStrategy.getReachedCanRestart(), deadline.timeLeft());
+
+		assertEquals(JobStatus.RESTARTING, eg.getState());
+
+		eg.suspend(new Exception("Test exception"));
+
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+		controllableRestartStrategy.unlockRestart();
+
+		Await.ready(controllableRestartStrategy.getRestartDone(), deadline.timeLeft());
+
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+	}
+
+	private static class ControllableRestartStrategy implements RestartStrategy {
+
+		private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
+		private Promise<Boolean> doRestart = new Promise.DefaultPromise<>();
+		private Promise<Boolean> restartDone = new Promise.DefaultPromise<>();
+
+		private volatile Exception exception = null;
+
+		private FiniteDuration timeout;
+
+		public ControllableRestartStrategy(FiniteDuration timeout) {
+			this.timeout = timeout;
+		}
+
+		public void unlockRestart() {
+			doRestart.success(true);
+		}
+
+		public Exception getException() {
+			return exception;
+		}
+
+		public Future<Boolean> getReachedCanRestart() {
+			return reachedCanRestart.future();
+		}
+
+		public Future<Boolean> getRestartDone() {
+			return restartDone.future();
+		}
+
+		@Override
+		public boolean canRestart() {
+			reachedCanRestart.success(true);
+			return true;
+		}
+
+		@Override
+		public void restart(final ExecutionGraph executionGraph) {
+			Futures.future(new Callable<Object>() {
+				@Override
+				public Object call() throws Exception {
+					try {
+
+						Await.ready(doRestart.future(), timeout);
+						executionGraph.restart();
+					} catch (Exception e) {
+						exception = e;
+					}
+
+					restartDone.success(true);
+
+					return null;
+				}
+			}, TestingUtils.defaultExecutionContext());
+		}
+	}
+
 	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
 
 		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));

http://git-wip-us.apache.org/repos/asf/flink/blob/6420c1c2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index 8b04fa3..bcee5a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -196,7 +196,73 @@ public class ExecutionGraphSignalsTest {
 		for (int i = 0; i < mockEJV.length; ++i) {
 			verify(mockEJV[i], times(times)).cancel();
 		}
+	}
+
+	/**
+	 * Tests that suspend cancels the ExecutionJobVertices and transitions to SUSPENDED state.
+	 * Tests also that one cannot leave the SUSPENDED state to enter a terminal state.
+	 */
+	@Test
+	public void testSuspend() throws Exception {
+		Assert.assertEquals(JobStatus.CREATED, eg.getState());
+		Exception testException = new Exception("Test exception");
+
+		eg.suspend(testException);
+
+		verifyCancel(1);
+		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+		f.set(eg, JobStatus.RUNNING);
+
+		eg.suspend(testException);
+
+		verifyCancel(2);
+		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+		f.set(eg, JobStatus.FAILING);
+
+		eg.suspend(testException);
+
+		verifyCancel(3);
+		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+		f.set(eg, JobStatus.CANCELLING);
+
+		eg.suspend(testException);
+
+		verifyCancel(4);
+		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+		f.set(eg, JobStatus.FAILED);
+
+		eg.suspend(testException);
+
+		verifyCancel(4);
+		Assert.assertEquals(JobStatus.FAILED, eg.getState());
+
+		f.set(eg, JobStatus.FINISHED);
+
+		eg.suspend(testException);
+
+		verifyCancel(4);
+		Assert.assertEquals(JobStatus.FINISHED, eg.getState());
+
+		f.set(eg, JobStatus.CANCELED);
+
+		eg.suspend(testException);
+
+		verifyCancel(4);
+		Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+
+		f.set(eg, JobStatus.SUSPENDED);
+
+		eg.fail(testException);
+
+		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+		eg.cancel();
 
+		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
 	}
 
 	// test that all source tasks receive STOP signal


Mime
View raw message