flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Greg Hogan (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3163) Configure Flink for NUMA systems
Date Thu, 14 Jul 2016 19:33:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15378192#comment-15378192

Greg Hogan commented on FLINK-3163:

I think we can achieve "good enough" without changing the format of {{masters}} and {{slaves}}.
Mesos and YARN provide cluster management, and it might be best to keep the Flink configuration

What if we added
* a configuration parameter to enable NUMA which would result in a TaskManager started on
each NUMA node for each IP in {{slaves}}
* a configuration parameter (one or two?) for the JobManager and ResourceManager to run in
their own NUMA node, not shared with a TaskManager (would the JM and RM share a NUMA node
if on the same IP?)

These could be {{taskmanager.compute.numa}}, {{jobmanager.compute.numa}}, and {{resourcemanager.compute.numa}}.

We could also add, as a related idea, {{taskmanager.compute.fraction}}. This would operate
relative to {{taskmanager.numberOfTaskSlots}} as {{taskmanager.memory.fraction}} operates
relative to {{taskmanager.memory.size}}. If set to {{1.0}} you would get one slot per (hyper-threaded)

As [~saliya] noted, binding processes is quite easy. Since I have only dealt with single-socket
systems I have temporarily hard-coded the following in my build:

diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index e579c0c..5f076d5 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -96,4 +96,10 @@ if [[ $STARTSTOP == "start" ]]; then
     args=("--configDir" "${FLINK_CONF_DIR}")
-"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
+command -v numactl >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
+    numactl --membind=0 --cpunodebind=0 -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP
taskmanager "${args[@]}"
+    numactl --membind=1 --cpunodebind=1 -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP
taskmanager "${args[@]}"

> Configure Flink for NUMA systems
> --------------------------------
>                 Key: FLINK-3163
>                 URL: https://issues.apache.org/jira/browse/FLINK-3163
>             Project: Flink
>          Issue Type: Improvement
>          Components: Startup Shell Scripts
>    Affects Versions: 1.0.0
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
> On NUMA systems Flink can be pinned to a single physical processor ("node") using {{numactl
--membind=$node --cpunodebind=$node <command>}}. Commonly available NUMA systems include
the largest AWS and Google Compute instances.
> For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could configure
a single TaskManager with 36 slots or have Flink create two TaskManagers bound to each of
the NUMA nodes, each with 18 slots.
> There may be some extra overhead in transferring network buffers between TaskManagers
on the same system, though the fraction of data shuffled in this manner decreases with the
size of the cluster. The performance improvement from only accessing local memory looks to
be significant though difficult to benchmark.
> The JobManagers may fit into NUMA nodes rather than requiring full systems.

This message was sent by Atlassian JIRA

View raw message