flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [05/11] flink git commit: [FLINK-2976] [docs] Add docs about savepoints
Date Mon, 11 Jan 2016 15:31:26 GMT
[FLINK-2976] [docs] Add docs about savepoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3c01858
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3c01858
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3c01858

Branch: refs/heads/master
Commit: a3c01858ddc596e4b6825ae51801cd2ac9060f76
Parents: ebbc85d
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Dec 2 16:21:06 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Jan 11 16:30:42 2016 +0100

----------------------------------------------------------------------
 docs/_includes/navbar.html               |   1 +
 docs/apis/cli.md                         |  99 +++++++++++++++++------
 docs/apis/fig/savepoints-overview.png    | Bin 0 -> 62824 bytes
 docs/apis/fig/savepoints-program_ids.png | Bin 0 -> 55492 bytes
 docs/apis/savepoints.md                  | 108 ++++++++++++++++++++++++++
 5 files changed, 186 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3c01858/docs/_includes/navbar.html
----------------------------------------------------------------------
diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index c565feb..91ba62a 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -83,6 +83,7 @@ under the License.
                 <li class="divider"></li>
                 <li><a href="{{ apis }}/fault_tolerance.html">Fault Tolerance</a></li>
                 <li><a href="{{ apis }}/state_backends.html">State in Streaming
Programs</a></li>
+                <li><a href="{{ apis }}/savepoints.html">Savepoints</a></li>
                 <li><a href="{{ apis }}/scala_shell.html">Interactive Scala Shell</a></li>
                 <li><a href="{{ apis }}/dataset_transformations.html">DataSet
Transformations</a></li>
                 <li><a href="{{ apis }}/best_practices.html">Best Practices</a></li>

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c01858/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index dfa1baf..78ea4b6 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -105,6 +105,34 @@ The command line can be used to
 
         ./bin/flink cancel <jobID>
 
+### Savepoints
+
+[Savepoints]({{site.baseurl}}/apis/savepoints.html) are controlled via the command line client:
+
+#### Trigger a savepoint
+
+{% highlight bash %}
+./bin/flink savepoint <jobID>
+{% endhighlight %}
+
+Returns the path of the created savepoint. You need this path to restore and dispose savepoints.
+
+#### **Restore a savepoint**:
+
+{% highlight bash %}
+./bin/flink run -s <savepointPath> ...
+{% endhighlight %}
+
+The run command has a savepoint flag to submit a job, which restores its state from a savepoint.
The savepoint path is returned by the savepoint trigger command.
+
+#### **Dispose a savepoint**:
+
+{% highlight bash %}
+./bin/flink savepoint -d <savepointPath>
+{% endhighlight %}
+
+Disposes the savepoint at the given path. The savepoint path is returned by the savepoint
trigger command.
+
 ## Usage
 
 The command line syntax is as follows:
@@ -118,39 +146,53 @@ Action "run" compiles and runs a program.
 
   Syntax: run [OPTIONS] <jar-file> <arguments>
   "run" action options:
-     -c,--class <classname>           Class with the program entry point ("main"
-                                      method or "getPlan()" method. Only needed
-                                      if the JAR file does not specify the class
-                                      in its manifest.
-     -m,--jobmanager <host:port>      Address of the JobManager (master) to
-                                      which to connect. Specify 'yarn-cluster'
-                                      as the JobManager to deploy a YARN cluster
-                                      for the job. Use this flag to connect to a
-                                      different JobManager than the one
-                                      specified in the configuration.
-     -p,--parallelism <parallelism>   The parallelism with which to run the
-                                      program. Optional flag to override the
-                                      default value specified in the
-                                      configuration.
-     -q --sysoutLogging               Specifying this flag will disable log messages
-                                      being reported on the console. All messages
-                                      however will still be logged by SLF4J loggers,
-                                      regardless of this setting.
-     -d --detached                    Specifying this option will run the job in
-                                      detached mode.
-
+     -c,--class <classname>               Class with the program entry point
+                                          ("main" method or "getPlan()" method.
+                                          Only needed if the JAR file does not
+                                          specify the class in its manifest.
+     -C,--classpath <url>                 Adds a URL to each user code
+                                          classloader  on all nodes in the
+                                          cluster. The paths must specify a
+                                          protocol (e.g. file://) and be
+                                          accessible on all nodes (e.g. by means
+                                          of a NFS share). You can use this
+                                          option multiple times for specifying
+                                          more than one URL. The protocol must
+                                          be supported by the {@link
+                                          java.net.URLClassLoader}.
+     -d,--detached                        If present, runs the job in detached
+                                          mode
+     -m,--jobmanager <host:port>          Address of the JobManager (master) to
+                                          which to connect. Specify
+                                          'yarn-cluster' as the JobManager to
+                                          deploy a YARN cluster for the job. Use
+                                          this flag to connect to a different
+                                          JobManager than the one specified in
+                                          the configuration.
+     -p,--parallelism <parallelism>       The parallelism with which to run the
+                                          program. Optional flag to override the
+                                          default value specified in the
+                                          configuration.
+     -q,--sysoutLogging                   If present, supress logging output to
+                                          standard out.
+     -s,--fromSavepoint <savepointPath>   Path to a savepoint to reset the job
+                                          back to (for example
+                                          file:///flink/savepoint-1537).
   Additional arguments if -m yarn-cluster is set:
      -yD <arg>                            Dynamic properties
-     -yd,--yarndetached                   Start detached [consider using -d flag above]
+     -yd,--yarndetached                   Start detached
      -yj,--yarnjar <arg>                  Path to Flink jar file
      -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                           MB]
      -yn,--yarncontainer <arg>            Number of YARN container to allocate
                                           (=Number of Task Managers)
+     -ynm,--yarnname <arg>                Set a custom name for the application
+                                          on YARN
      -yq,--yarnquery                      Display available YARN resources
                                           (memory, cores)
      -yqu,--yarnqueue <arg>               Specify YARN queue.
      -ys,--yarnslots <arg>                Number of slots per TaskManager
+     -yst,--yarnstreaming                 Start Flink in streaming mode
      -yt,--yarnship <arg>                 Ship files in the specified directory
                                           (t for transfer)
      -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in
@@ -201,4 +243,17 @@ Action "cancel" cancels a running program.
                                    job. Use this flag to connect to a different
                                    JobManager than the one specified in the
                                    configuration.
+
+
+Action "savepoint" triggers savepoints for a running job or disposes existing ones.
+
+  Syntax: savepoint [OPTIONS] <Job ID>
+  "savepoint" action options:
+     -d,--dispose <savepointPath>   Disposes an existing savepoint.
+     -m,--jobmanager <host:port>    Address of the JobManager (master) to which
+                                    to connect. Specify 'yarn-cluster' as the
+                                    JobManager to deploy a YARN cluster for the
+                                    job. Use this flag to connect to a different
+                                    JobManager than the one specified in the
+                                    configuration.
 ~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c01858/docs/apis/fig/savepoints-overview.png
----------------------------------------------------------------------
diff --git a/docs/apis/fig/savepoints-overview.png b/docs/apis/fig/savepoints-overview.png
new file mode 100644
index 0000000..c0e7563
Binary files /dev/null and b/docs/apis/fig/savepoints-overview.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c01858/docs/apis/fig/savepoints-program_ids.png
----------------------------------------------------------------------
diff --git a/docs/apis/fig/savepoints-program_ids.png b/docs/apis/fig/savepoints-program_ids.png
new file mode 100644
index 0000000..cc161ef
Binary files /dev/null and b/docs/apis/fig/savepoints-program_ids.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c01858/docs/apis/savepoints.md
----------------------------------------------------------------------
diff --git a/docs/apis/savepoints.md b/docs/apis/savepoints.md
new file mode 100644
index 0000000..2f4dd82
--- /dev/null
+++ b/docs/apis/savepoints.md
@@ -0,0 +1,108 @@
+---
+title: "Savepoints"
+is_beta: false
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Programs written in the [Data Stream API]({{ site.baseurl }}/apis/streaming_guide.html) can
resume execution from a **savepoint**. Savepoints allow both updating your programs and your
Flink cluster without loosing any state. This page covers all steps to trigger, restore, and
dispose savepoints. For more details on how Flink handles state and failures, check out the
[State in Streaming Programs]({{ site.baseurl }}/apis/state_backends.html) and [Fault Tolerance]({{
site.baseurl }}/apis/fault_tolerance.html) pages.
+
+* toc
+{:toc}
+
+## Overview
+
+Savepoints are **manually triggered checkpoints**, which take a snapshot of the program and
write it out to a state backend. They rely on the regular checkpointing mechanism for this.
During execution programs are periodically snapshotted on the worker nodes and produce checkpoints.
For recovery only the last completed checkpoint is needed and older checkpoints can be safely
discarded as soon as a new one is completed.
+
+Savepoints are similar to these periodic checkpoints except that they are **triggered by
the user** and **don't automatically expire** when newer checkpoints are completed.
+
+<img src="fig/savepoints-overview.png" class="center" />
+
+In the above example the workers produce checkpoints **c<sub>1</sub>**, **c<sub>2</sub>**,
**c<sub>3</sub>**, and **c<sub>4</sub>** for job *0xA312Bc*. Periodic
checkpoints **c<sub>1</sub>** and **c<sub>3</sub>** have already been
*discarded* and **c<sub>4</sub>** is the *latest checkpoint*. **c<sub>2</sub>
is special**. It is the state associated with the savepoint **s<sub>1</sub>**
and has been triggered by the user and it doesn't expire automatically (as c<sub>1</sub>
and c<sub>3</sub> did after the completion of newer checkpoints).
+
+Note that **s<sub>1</sub>** is only a **pointer to the actual checkpoint data
c<sub>2</sub>**. This means that the actual state is *not copied* for the savepoint
and periodic checkpoint data is kept around.
+
+## Configuration
+
+Savepoints point to regular checkpoints and store their state in a configured [state backend]({{
site.baseurl }}/apis/state_backends.html). Currently, the supported state backends are **jobmanager**
and **filesystem**. The state backend configuration for the regular periodic checkpoints is
**independent** of the savepoint state backend configuration. Checkpoint data is **not copied**
for savepoints, but points to the configured checkpoint state backend.
+
+### JobManager
+
+This is the **default backend** for savepoints.
+
+Savepoints are stored on the heap of the job manager. They are *lost* after the job manager
is shut down. This mode is only useful if you want to *stop* and *resume* your program while
the **same cluster** keeps running. It is *not recommended* for production use. Savepoints
are *not* part the [job manager's highly availabile]({{ site.baseurl }}/setup/jobmanager_high_availability.html)
state.
+
+<pre>
+state.backend.savepoints: jobmanager
+</pre>
+
+**Note**: If you don't configure a specific state backend for the savepoints, the default
state backend (config key `state.backend`) will be used.
+
+### File system
+
+Savepoints are stored in the configured **file system directory**. They are available between
cluster instances and allow to move your program to another cluster.
+
+<pre>
+state.backend.savepoints: filesystem
+state.backend.savepoints.fs.dir: hdfs:///flink/savepoints
+</pre>
+
+**Note**: If you don't configure a specific directory, the checkpoint directory (config key
`state.backend.fs.checkpointdir`) will be used.
+
+**Important**: A savepoint is a pointer to completed checkpoint. That means that the state
of a savepoint is not only found in the savepoint file itself, but also needs the actual checkpoint
data (e.g. in a set of further files).
+
+## Changes to your program
+
+Savepoints **work out of the box**, but it is **highly recommended** that you slightly adjust
your programs in order to be able to work with savepoints in future versions of your program.
+
+<img src="fig/savepoints-program_ids.png" class="center" />'
+
+For savepoints **only stateful tasks matter**. In the above example, the source and map tasks
are stateful whereas the sink is not stateful. Therefore, only the state of the source and
map tasks are part of the savepoint.
+
+Each task is identified by its **generated task IDs** and **subtask index**. In the above
example the state of the source (**s<sub>1</sub>**, **s<sub>2</sub>**)
and map tasks (**m<sub>1</sub>**, **m<sub>2</sub>**) is identified
by their respective task ID (*0xC322EC* for the source tasks and *0x27B3EF* for the map tasks)
and subtask index. There is no state for the sinks (**t<sub>1</sub>**, **t<sub>2</sub>**).
Their IDs therefore do not matter.
+
+The IDs are generated **deterministically** from your program structure. This means that
if your program does not change, the IDs do not change. In this case, it is straight forward
to restore the state from a savepoint by mapping it back to the same task IDs and subtask
indexes. This allows you to work with savepoints out of the box, but gets problematic as soon
as you make changes to your program, because they result in changed IDs and the savepoint
state cannot be mapped to your program any more.
+
+In order to be able to change your program and **have fixed IDs**, the *DataStream* API provides
a method to manually specify the task IDs. Each operator provides a **`uid(String)`** method
to override the generated ID. The ID is a String, which will be deterministically hashed to
a 16-byte hash value. It is **important** that the specified IDs are **unique per transformation
and job**. If this is not the case, job submission will fail.
+
+{% highlight scala %}
+DataStream<String> stream = env.
+  // Stateful source (e.g. Kafka) with ID
+  .addSource(new StatefulSource())
+  .uid("source-id")
+  .shuffle()
+  // The stateful mapper with ID
+  .map(new StatefulMapper())
+  .uid("mapper-id")
+
+// Stateless sink (no specific ID required)
+stream.print()
+{% endhighlight %}
+
+## Command-line client
+
+You control the savepoints via the [command line client]({{site.baseurl}}/apis/cli.html#savepoints).
+
+## Current limitations
+
+- **Parallelism**: When restoring a savepoint, the parallelism of the program has to match
the parallelism of the original program from which the savepoint was drawn. There is no mechanism
to re-partition the savepoint's state yet.
+
+- **Chaining**: Chained operators are identified by the ID of the first task. It's not possible
to manually assign an ID to an intermediate chained task, e.g. in the chain `[  a -> b
-> c ]` only **a** can have its ID assigned manually, but not **b** or **c**.
+
+- **Disposing custom state**: Disposing an old savepoint does not work with custom state,
because the user code class loader is not available during disposal.


Mime
View raw message