spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anirudh Ramanathan <fox...@google.com.INVALID>
Subject SPIP: Spark on Kubernetes
Date Tue, 15 Aug 2017 15:32:58 GMT
Spark on Kubernetes effort has been developed separately in a fork, and
linked back from the Apache Spark project as an experimental backend
<http://spark.apache.org/docs/latest/cluster-overview.html#cluster-manager-types>.
We're ~6 months in, have had 5 releases
<https://github.com/apache-spark-on-k8s/spark/releases>.

   - 2 Spark versions maintained (2.1, and 2.2)
   - Extensive integration testing and refactoring efforts to maintain code
   quality
   - Developer
   <https://github.com/apache-spark-on-k8s/spark#getting-started> and
   user-facing <https://apache-spark-on-k8s.github.io/userdocs/>
    documentation
   - 10+ consistent code contributors from different organizations
   <https://apache-spark-on-k8s.github.io/userdocs/contribute.html#project-contributions>
involved
   in actively maintaining and using the project, with several more members
   involved in testing and providing feedback.
   - The community has delivered several talks on Spark-on-Kubernetes
   generating lots of feedback from users.
   - In addition to these, we've seen efforts spawn off such as:
   - HDFS on Kubernetes
      <https://github.com/apache-spark-on-k8s/kubernetes-HDFS> with
      Locality and Performance Experiments
      - Kerberized access
      <https://docs.google.com/document/d/1RBnXD9jMDjGonOdKJ2bA1lN4AAV_1RwpU_ewFuCNWKg/edit>
to
      HDFS from Spark running on Kubernetes

*Following the SPIP process, I'm putting this SPIP up for a vote.*

   - +1: Yeah, let's go forward and implement the SPIP.
   - +0: Don't really care.
   - -1: I don't think this is a good idea because of the following
   technical reasons.

If there is any further clarification desired, on the design or the
implementation, please feel free to ask questions or provide feedback.


SPIP: Kubernetes as A Native Cluster Manager

Full Design Doc: link
<https://issues.apache.org/jira/secure/attachment/12881586/SPARK-18278%20Spark%20on%20Kubernetes%20Design%20Proposal%20Revision%202%20%281%29.pdf>

JIRA: https://issues.apache.org/jira/browse/SPARK-18278

Kubernetes Issue: https://github.com/kubernetes/kubernetes/issues/34377

Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt
Cheah,

Ilan Filonenko, Sean Suchter, Kimoon Kim
Background and Motivation

Containerization and cluster management technologies are constantly
evolving in the cluster computing world. Apache Spark currently implements
support for Apache Hadoop YARN and Apache Mesos, in addition to providing
its own standalone cluster manager. In 2014, Google announced development
of Kubernetes <https://kubernetes.io/> which has its own unique feature set
and differentiates itself from YARN and Mesos. Since its debut, it has seen
contributions from over 1300 contributors with over 50000 commits.
Kubernetes has cemented itself as a core player in the cluster computing
world, and cloud-computing providers such as Google Container Engine,
Google Compute Engine, Amazon Web Services, and Microsoft Azure support
running Kubernetes clusters.

This document outlines a proposal for integrating Apache Spark with
Kubernetes in a first class way, adding Kubernetes to the list of cluster
managers that Spark can be used with. Doing so would allow users to share
their computing resources and containerization framework between their
existing applications on Kubernetes and their computational Spark
applications. Although there is existing support for running a Spark
standalone cluster on Kubernetes
<https://github.com/kubernetes/examples/blob/master/staging/spark/README.md>,
there are still major advantages and significant interest in having native
execution support. For example, this integration provides better support
for multi-tenancy and dynamic resource allocation. It also allows users to
run applications of different Spark versions of their choices in the same
cluster.

The feature is being developed in a separate fork
<https://github.com/apache-spark-on-k8s/spark> in order to minimize risk to
the main project during development. Since the start of the development in
November of 2016, it has received over 100 commits from over 20
contributors and supports two releases based on Spark 2.1 and 2.2
respectively. Documentation is also being actively worked on both in the
main project repository and also in the repository
https://github.com/apache-spark-on-k8s/userdocs. Regarding real-world use
cases, we have seen cluster setup that uses 1000+ cores. We are also seeing
growing interests on this project from more and more organizations.

While it is easy to bootstrap the project in a forked repository, it is
hard to maintain it in the long run because of the tricky process of
rebasing onto the upstream and lack of awareness in the large Spark
community. It would be beneficial to both the Spark and Kubernetes
community seeing this feature being merged upstream. On one hand, it gives
Spark users the option of running their Spark workloads along with other
workloads that may already be running on Kubernetes, enabling better
resource sharing and isolation, and better cluster administration. On the
other hand, it gives Kubernetes a leap forward in the area of large-scale
data processing by being an officially supported cluster manager for Spark.
The risk of merging into upstream is low because most of the changes are
purely incremental, i.e., new Kubernetes-aware implementations of existing
interfaces/classes in Spark core are introduced. The development is also
concentrated in a single place at resource-managers/kubernetes
<https://github.com/apache-spark-on-k8s/spark/tree/branch-2.2-kubernetes/resource-managers/kubernetes>.
The risk is further reduced by a comprehensive integration test framework,
and an active and responsive community of future maintainers.
Target Personas

Devops, data scientists, data engineers, application developers, anyone who
can benefit from having Kubernetes
<https://kubernetes.io/docs/concepts/overview/what-is-kubernetes/> as a
native cluster manager for Spark.
Goals

   -

   Make Kubernetes a first-class cluster manager for Spark, alongside Spark
   Standalone, Yarn, and Mesos.
   -

   Support both client and cluster deployment mode.
   -

   Support dynamic resource allocation
   <http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation>
   .
   -

   Support Spark Java/Scala, PySpark, and Spark R applications.
   -

   Support secure HDFS access.
   -

   Allow running applications of different Spark versions in the same
   cluster through the ability to specify the driver and executor Docker
   images on a per-application basis.
   -

   Support specification and enforcement of limits on both CPU cores and
   memory.

Non-Goals

   -

   Support cluster resource scheduling and sharing beyond capabilities
   offered natively by the Kubernetes per-namespace resource quota model.

Proposed API Changes

Most API changes are purely incremental, i.e., new Kubernetes-aware
implementations of existing interfaces/classes in Spark core are
introduced. Detailed changes are as follows.

   -

   A new cluster manager option KUBERNETES is introduced and some changes
   are made to SparkSubmit to make it be aware of this option.
   -

   A new implementation of CoarseGrainedSchedulerBackend, namely
   KubernetesClusterSchedulerBackend is responsible for managing the
   creation and deletion of executor Pods through the Kubernetes API.
   -

   A new implementation of TaskSchedulerImpl, namely
   KubernetesTaskSchedulerImpl, and a new implementation of TaskSetManager,
   namely Kubernetes TaskSetManager, are introduced for Kubernetes-aware
   task scheduling.
   -

   When dynamic resource allocation is enabled, a new implementation of
   ExternalShuffleService, namely KubernetesExternalShuffleService is
   introduced.

Design Sketch

Below we briefly describe the design. For more details on the design and
architecture, please refer to the architecture documentation
<https://github.com/apache-spark-on-k8s/spark/tree/branch-2.2-kubernetes/resource-managers/kubernetes/architecture-docs>.
The main idea of this design is to run Spark driver and executors inside
Kubernetes Pods <https://kubernetes.io/docs/concepts/workloads/pods/pod/>.
Pods are a co-located and co-scheduled group of one or more containers run
in a shared context. The driver is responsible for creating and destroying
executor Pods through the Kubernetes API, while Kubernetes is fully
responsible for scheduling the Pods to run on available nodes in the
cluster. In the cluster mode, the driver also runs in a Pod in the cluster,
created through the Kubernetes API by a Kubernetes-aware submission client
called by the spark-submit script. Because the driver runs in a Pod, it is
reachable by the executors in the cluster using its Pod IP. In the client
mode, the driver runs outside the cluster and calls the Kubernetes API to
create and destroy executor Pods. The driver must be routable from within
the cluster for the executors to communicate with it.

The main component running in the driver is the
KubernetesClusterSchedulerBackend, an implementation of
CoarseGrainedSchedulerBackend, which manages allocating and destroying
executors via the Kubernetes API, as instructed by Spark core via calls to
methods doRequestTotalExecutors and doKillExecutors, respectively. Within
the KubernetesClusterSchedulerBackend, a separate kubernetes-pod-allocator
thread handles the creation of new executor Pods with appropriate
throttling and monitoring. Throttling is achieved using a feedback loop
that makes decision on submitting new requests for executors based on
whether previous executor Pod creation requests have completed. This
indirection is necessary because the Kubernetes API server accepts requests
for new Pods optimistically, with the anticipation of being able to
eventually schedule them to run. However, it is undesirable to have a very
large number of Pods that cannot be scheduled and stay pending within the
cluster. The throttling mechanism gives us control over how fast an
application scales up (which can be configured), and helps prevent Spark
applications from DOS-ing the Kubernetes API server with too many Pod
creation requests. The executor Pods simply run the
CoarseGrainedExecutorBackend class from a pre-built Docker image that
contains a Spark distribution.

There are auxiliary and optional components: ResourceStagingServer and
KubernetesExternalShuffleService, which serve specific purposes described
below. The ResourceStagingServer serves as a file store (in the absence of
a persistent storage layer in Kubernetes) for application dependencies
uploaded from the submission client machine, which then get downloaded from
the server by the init-containers in the driver and executor Pods. It is a
Jetty server with JAX-RS and has two endpoints for uploading and
downloading files, respectively. Security tokens are returned in the
responses for file uploading and must be carried in the requests for
downloading the files. The ResourceStagingServer is deployed as a
Kubernetes Service
<https://kubernetes.io/docs/concepts/services-networking/service/> backed
by a Deployment
<https://kubernetes.io/docs/concepts/workloads/controllers/deployment/> in
the cluster and multiple instances may be deployed in the same cluster.
Spark applications specify which ResourceStagingServer instance to use
through a configuration property.

The KubernetesExternalShuffleService is used to support dynamic resource
allocation, with which the number of executors of a Spark application can
change at runtime based on the resource needs. It provides an additional
endpoint for drivers that allows the shuffle service to delete driver
termination and clean up the shuffle files associated with corresponding
application. There are two ways of deploying the
KubernetesExternalShuffleService: running a shuffle service Pod on each
node in the cluster or a subset of the nodes using a DaemonSet
<https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/>, or
running a shuffle service container in each of the executor Pods. In the
first option, each shuffle service container mounts a hostPath
<https://kubernetes.io/docs/concepts/storage/volumes/#hostpath> volume. The
same hostPath volume is also mounted by each of the executor containers,
which must also have the environment variable SPARK_LOCAL_DIRS point to the
hostPath. In the second option, a shuffle service container is co-located
with an executor container in each of the executor Pods. The two containers
share an emptyDir
<https://kubernetes.io/docs/concepts/storage/volumes/#emptydir> volume
where the shuffle data gets written to. There may be multiple instances of
the shuffle service deployed in a cluster that may be used for different
versions of Spark, or for different priority levels with different resource
quotas.

New Kubernetes-specific configuration options are also introduced to
facilitate specification and customization of driver and executor Pods and
related Kubernetes resources. For example, driver and executor Pods can be
created in a particular Kubernetes namespace and on a particular set of the
nodes in the cluster. Users are allowed to apply labels and annotations to
the driver and executor Pods.

Additionally, secure HDFS support is being actively worked on following the
design here
<https://docs.google.com/document/d/1RBnXD9jMDjGonOdKJ2bA1lN4AAV_1RwpU_ewFuCNWKg/edit>.
Both short-running jobs and long-running jobs that need periodic delegation
token refresh are supported, leveraging built-in Kubernetes constructs like
Secrets. Please refer to the design doc for details.
Rejected DesignsResource Staging by the Driver

A first implementation effectively included the ResourceStagingServer in
the driver container itself. The driver container ran a custom command that
opened an HTTP endpoint and waited for the submission client to send
resources to it. The server would then run the driver code after it had
received the resources from the submission client machine. The problem with
this approach is that the submission client needs to deploy the driver in
such a way that the driver itself would be reachable from outside of the
cluster, but it is difficult for an automated framework which is not aware
of the cluster's configuration to expose an arbitrary pod in a generic way.
The Service-based design chosen allows a cluster administrator to expose
the ResourceStagingServer in a manner that makes sense for their cluster,
such as with an Ingress or with a NodePort service.
Kubernetes External Shuffle Service

Several alternatives were considered for the design of the shuffle service.
The first design postulated the use of long-lived executor pods and sidecar
containers in them running the shuffle service. The advantage of this model
was that it would let us use emptyDir for sharing as opposed to using node
local storage, which guarantees better lifecycle management of storage by
Kubernetes. The apparent disadvantage was that it would be a departure from
the traditional Spark methodology of keeping executors for only as long as
required in dynamic allocation mode. It would additionally use up more
resources than strictly necessary during the course of long-running jobs,
partially losing the advantage of dynamic scaling.

Another alternative considered was to use a separate shuffle service
manager as a nameserver. This design has a few drawbacks. First, this means
another component that needs authentication/authorization management and
maintenance. Second, this separate component needs to be kept in sync with
the Kubernetes cluster. Last but not least, most of functionality of this
separate component can be performed by a combination of the in-cluster
shuffle service and the Kubernetes API server.
Pluggable Scheduler Backends

Fully pluggable scheduler backends were considered as a more generalized
solution, and remain interesting as a possible avenue for future-proofing
against new scheduling targets.  For the purposes of this project, adding a
new specialized scheduler backend for Kubernetes was chosen as the approach
due to its very low impact on the core Spark code; making scheduler fully
pluggable would be a high-impact high-risk modification to Spark’s core
libraries. The pluggable scheduler backends effort is being tracked in
JIRA-19700 <https://issues.apache.org/jira/browse/SPARK-19700>.

Mime
View raw message