flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [docs] Adjust network buffer config for slots and add tl; dr
Date Mon, 09 May 2016 09:00:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master afd0f16b8 -> a522c7aae


[docs] Adjust network buffer config for slots and add tl;dr


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a522c7aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a522c7aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a522c7aa

Branch: refs/heads/master
Commit: a522c7aae025daa9447cf587a4c8f02e1fa1a1f8
Parents: afd0f16
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon May 9 10:59:31 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon May 9 11:00:35 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md | 44 +++++++++++++++++++++++++++-----------------
 1 file changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a522c7aa/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 6e6e0a6..db189a0 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -54,11 +54,11 @@ The configuration files for the TaskManagers can be different, Flink does
not as
 - `parallelism.default`: The default parallelism to use for programs that have no parallelism
specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value
to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution
resources for the program's execution. **Note**: The default parallelism can be overwriten
for an entire job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment`
or by passing `-p <parallelism>` to the Flink Command-line frontend. It can be overwritten
for single transformations by calling `setParallelism(int
 parallelism)` on an operator. See the [programming guide]({{site.baseurl}}/apis/programming_guide.html#parallel-execution)
for more information about the parallelism.
 
-- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority
to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed). 
-By default, this is set to `file:///` which points to the local filesystem. This means that
the local 
-filesystem is going to be used to search for user-specified files **without** an explicit
scheme 
-definition. As another example, if this is set to `hdfs://localhost:9000/`, then a user-specified
file path 
-without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to be transformed
into 
+- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority
to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed).
+By default, this is set to `file:///` which points to the local filesystem. This means that
the local
+filesystem is going to be used to search for user-specified files **without** an explicit
scheme
+definition. As another example, if this is set to `hdfs://localhost:9000/`, then a user-specified
file path
+without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to be transformed
into
 `hdfs://localhost:9000/user/USERNAME/in.txt`. This scheme is used **ONLY** if no other scheme
is specified (explicitly) in the user-provided `URI`.
 
 - `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) configuration
**directory** (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files
using short URIs (`hdfs:///path/to/files`, without including the address and port of the NameNode
in the file URI). Without this option, HDFS files can be accessed, but require fully qualified
URIs like `hdfs://address:port/path/to/files`. This option also causes file writers to pick
up the HDFS's default values for block sizes and replication factors. Flink will look for
the "core-site.xml" and "hdfs-site.xml" files in the specified directory.
@@ -133,11 +133,11 @@ To use the fixed delay strategy you have to specify "fixed-delay".
 To turn the restart behaviour off you have to specify "none".
 Default value "none".
 
-- `restart-strategy.fixed-delay.attempts`: Number of restart attempts, used if the default
restart strategy is set to "fixed-delay". 
+- `restart-strategy.fixed-delay.attempts`: Number of restart attempts, used if the default
restart strategy is set to "fixed-delay".
 Default value is 1.
- 
-- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default
restart strategy is set to "fixed-delay". 
-Default value is the `akka.ask.timeout`. 
+
+- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default
restart strategy is set to "fixed-delay".
+Default value is the `akka.ask.timeout`.
 
 ## Full Reference
 
@@ -204,9 +204,9 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 The parameters define the behavior of tasks that create result files.
 
-- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority
to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed). 
-By default, this is set to `file:///` which points to the local filesystem. This means that
the local 
-filesystem is going to be used to search for user-specified files **without** an explicit
scheme 
+- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority
to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed).
+By default, this is set to `file:///` which points to the local filesystem. This means that
the local
+filesystem is going to be used to search for user-specified files **without** an explicit
scheme
 definition. This scheme is used **ONLY** if no other scheme is specified (explicitly) in
the user-provided `URI`.
 
 - `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files
by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false)
@@ -240,10 +240,10 @@ definition. This scheme is used **ONLY** if no other scheme is specified
(explic
 - `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted
to YARN, the JobManager's host and the number of available processing slots is written into
a properties file, so that the Flink clientis able to pick those details up. This configuration
parameter allows changing the default location of that file (for example for environments
sharing a Flink installation between users)
 
 - `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.`
will be passed as environment variables to the ApplicationMaster/JobManager process. For example
for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set:
-	
+
 	yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 
-- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default,
the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise.

+- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default,
the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise.
 
 - `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting
custom environment variables for the TaskManager processes.
 
@@ -274,7 +274,7 @@ For example when running Flink on YARN on an environment with a restrictive
fire
 
 - `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection
retries before the client gives up.
 
-- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs
are recovered in case of a recovery situation. 
+- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs
are recovered in case of a recovery situation.
 
 ## Environment
 
@@ -284,12 +284,22 @@ For example when running Flink on YARN on an environment with a restrictive
fire
 
 ### Configuring the Network Buffers
 
+If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`,
please use the following formula to adjust the number of network buffers:
+
+```
+#slots-per-TM^2 * #TMs * 4
+```
+
+Where `#slots per TM` are the [number of slots per TaskManager](#configuring-taskmanager-processing-slots)
and `#TMs` are the total number of task managers.
+
 Network buffers are a critical resource for the communication layers. They are used to buffer
records before transmission over a network, and to buffer incoming data before dissecting
it into records and handing them to the
 application. A sufficient number of network buffers is critical to achieve a good throughput.
 
-In general, configure the task manager to have enough buffers that each logical network connection
on you expect to be open at the same time has a dedicated buffer. A logical network connection
exists for each point-to-point exchange of data over the network, which typically happens
at repartitioning- or broadcasting steps. In those, each parallel task inside the TaskManager
has to be able to talk to all other parallel tasks. Hence, the required number of buffers
on a task manager is *total-degree-of-parallelism* (number of targets) \* *intra-node-parallelism*
(number of sources in one task manager) \* *n*. Here, *n* is a constant that defines how many
repartitioning-/broadcasting steps you expect to be active at the same time.
+In general, configure the task manager to have enough buffers that each logical network connection
you expect to be open at the same time has a dedicated buffer. A logical network connection
exists for each point-to-point exchange of data over the network, which typically happens
at repartitioning- or broadcasting steps (shuffle phase). In those, each parallel task inside
the TaskManager has to be able to talk to all other parallel tasks. Hence, the required number
of buffers on a task manager is *total-degree-of-parallelism* (number of targets) \* *intra-node-parallelism*
(number of sources in one task manager) \* *n*. Here, *n* is a constant that defines how many
repartitioning-/broadcasting steps you expect to be active at the same time.
+
+Since the *intra-node-parallelism* is typically the number of cores, and more than 4 repartitioning
or broadcasting channels are rarely active in parallel, it frequently boils down to `#slots-per-TM^2
* #TMs * 4`.
 
-Since the *intra-node-parallelism* is typically the number of cores, and more than 4 repartitioning
or broadcasting channels are rarely active in parallel, it frequently boils down to *\#cores\^2\^*
\* *\#machines* \* 4. To support for example a cluster of 20 8-core machines, you should use
roughly 5000 network buffers for optimal throughput.
+To support for example a cluster of 20 8-slot machines, you should use roughly 5000 network
buffers for optimal throughput.
 
 Each network buffer has by default a size of 32 KiBytes. In the above example, the system
would allocate roughly 300 MiBytes for network buffers.
 


Mime
View raw message