flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Bucher <jbuc...@expedia.com>
Subject Re: Flink HA with Kubernetes, without Zookeeper
Date Tue, 22 Aug 2017 19:41:39 GMT
Just wanted to throw in a couple more details here from what I have learned from working with
Kubernetes.

All processes restart (a lost JobManager restarts eventually). Should be given in Kubernetes:

  *   This works very well, we run multiple jobs with a single Jobmanager and Flink/Kubernetes
recovers quite well.

A way for TaskManagers to discover the restarted JobManager. Should work via Kubernetes as
well (restarted containers retain the external hostname):

  *   We use StatefulSets which provide a DNS based discovery mechanism. Provided DNS is set
up correctly with TTLs this works well. You could also leverage the built-in Kubernetes services
if you are only running a single Job Manager. Kubernetes will just route the traffic to the
single pod. This works fine with a single Job Manager (I have tested it). However multiple
Job Managers won’t work because Kubernetes will route this round-robin to the Job Managers

A way to isolate different "leader sessions" against each other. Flink currently uses ZooKeeper
to also attach a "leader session ID" to leader election, which is a fencing token to avoid
that processes talk to each other despite having different views on who is the leader, or
whether the leaser lost and re-gained leadership:

  *   This is probably the most difficult thing. You could leverage the built in ETCD cluster.
Connecting directly to the Kubernetes ETCD database directly is probably a bad idea however.
You should be able to create a counter using the PATCH API that Kubernetes supplies in the
API which follows: https://tools.ietf.org/html/rfc6902 you could probably leverage https://tools.ietf.org/html/rfc6902#section-4.6
to allow for atomic updates to counters. Combining this with: https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources
should give a good way to work with ETCD without actually connecting directly to the Kubernetes
ETCD directly. This integration would require modifying the Job Manager leader election code.

A distributed atomic counter for the checkpoint ID. This is crucial to ensure correctness
of checkpoints in the presence of JobManager failures and re-elections or split-brain situations.

  *   This is very similar to the above, we should be able to accomplish that through the
PATCH API combined with update if condition.

If you don’t want to actually rip way into the code for the Job Manager the ETCD Operator<https://github.com/coreos/etcd-operator>
would be a good way to bring up an ETCD cluster that is separate from the core Kubernetes
ETCD database. Combined with zetcd you could probably have that up and running quickly.

Thanks,
James Bucher

From: Hao Sun <hasun@zendesk.com<mailto:hasun@zendesk.com>>
Date: Monday, August 21, 2017 at 9:45 AM
To: Stephan Ewen <sewen@apache.org<mailto:sewen@apache.org>>, Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Flink HA with Kubernetes, without Zookeeper

Thanks Shannon for the https://github.com/coreos/zetcd tips, I will check that out and share
my results if we proceed on that path.
Thanks Stephan for the details, this is very useful, I was about to ask what exactly is stored
into zookeeper, haha.

On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen <sewen@apache.org<mailto:sewen@apache.org>>
wrote:
Hi!

That is a very interesting proposition. In cases where you have a single master only, you
may bet away with quite good guarantees without ZK. In fact, Flink does not store significant
data in ZK at all, it only uses locks and counters.

You can have a setup without ZK, provided you have the following:

  - All processes restart (a lost JobManager restarts eventually). Should be given in Kubernetes.

  - A way for TaskManagers to discover the restarted JobManager. Should work via Kubernetes
as well (restarted containers retain the external hostname)

  - A way to isolate different "leader sessions" against each other. Flink currently uses
ZooKeeper to also attach a "leader session ID" to leader election, which is a fencing token
to avoid that processes talk to each other despite having different views on who is the leader,
or whether the leaser lost and re-gained leadership.

  - An atomic marker for what is the latest completed checkpoint.

  - A distributed atomic counter for the checkpoint ID. This is crucial to ensure correctness
of checkpoints in the presence of JobManager failures and re-elections or split-brain situations.

I would assume that etcd can provide all of those services. The best way to integrate it would
probably be to add an implementation of Flink's "HighAvailabilityServices" based on etcd.

Have a look at this class: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java

If you want to contribute an extension of Flink using etcd, that would be awesome.
This should have a FLIP though, and a plan on how to set up rigorous unit testing of that
implementation (because its correctness is very crucial to Flink's HA resilience).

Best,
Stephan


On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>
wrote:
Zookeeper should still be necessary even in that case, because it is where the JobManager
stores information which needs to be recovered after the JobManager fails.

We're eyeing https://github.com/coreos/zetcd as a way to run Zookeeper on top of Kubernetes'
etcd cluster so that we don't have to rely on a separate Zookeeper cluster. However, we haven't
tried it yet.

-Shannon

From: Hao Sun <hasun@zendesk.com<mailto:hasun@zendesk.com>>
Date: Sunday, August 20, 2017 at 9:04 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Flink HA with Kubernetes, without Zookeeper

Hi, I am new to Flink and trying to bring up a Flink cluster on top of Kubernetes.

For HA setup, with kubernetes, I think I just need one job manager and do not need Zookeeper?
I will store all states to S3 buckets. So in case of failure, kubernetes can just bring up
a new job manager without losing anything?

I want to confirm my assumptions above make sense. Thanks

Mime
View raw message