flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hao Sun <ha...@zendesk.com>
Subject Re: Flink HA with Kubernetes, without Zookeeper
Date Tue, 22 Aug 2017 19:50:52 GMT
Great suggestions, the etcd operator is very interesting, thanks James.

On Tue, Aug 22, 2017, 12:42 James Bucher <jbucher@expedia.com> wrote:

> Just wanted to throw in a couple more details here from what I have
> learned from working with Kubernetes.
>
> *All processes restart (a lost JobManager restarts eventually). Should be
> given in Kubernetes*:
>
>    - This works very well, we run multiple jobs with a single Jobmanager
>    and Flink/Kubernetes recovers quite well.
>
> *A way for TaskManagers to discover the restarted JobManager. Should work
> via Kubernetes as well (restarted containers retain the external hostname)*
> :
>
>    - We use StatefulSets which provide a DNS based discovery mechanism.
>    Provided DNS is set up correctly with TTLs this works well. You could also
>    leverage the built-in Kubernetes services if you are only running a single
>    Job Manager. Kubernetes will just route the traffic to the single pod. This
>    works fine with a single Job Manager (I have tested it). However multiple
>    Job Managers won’t work because Kubernetes will route this round-robin to
>    the Job Managers
>
> *A way to isolate different "leader sessions" against each other. Flink
> currently uses ZooKeeper to also attach a "leader session ID" to leader
> election, which is a fencing token to avoid that processes talk to each
> other despite having different views on who is the leader, or whether the
> leaser lost and re-gained leadership:*
>
>    - This is probably the most difficult thing. You could leverage the
>    built in ETCD cluster. Connecting directly to the Kubernetes ETCD database
>    directly is probably a bad idea however. You should be able to create a
>    counter using the PATCH API that Kubernetes supplies in the API which
>    follows: https://tools.ietf.org/html/rfc6902
>    <https://tools.ietf.org/html/rfc6902> you could probably
>    leverage https://tools.ietf.org/html/rfc6902#section-4.6
>    <https://tools.ietf.org/html/rfc6902#section-4.6> to allow for atomic
>    updates to counters. Combining this with:
>    https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources
>    <https://kubernetes.io/docs/concepts/api-extension/custom-resources/#custom-resources>
should give a good
>    way to work with ETCD without actually connecting directly to the
>    Kubernetes ETCD directly. This integration would require modifying the Job
>    Manager leader election code.
>
> *A distributed atomic counter for the checkpoint ID. This is crucial to
> ensure correctness of checkpoints in the presence of JobManager failures
> and re-elections or split-brain situations*.
>
>    - This is very similar to the above, we should be able to accomplish
>    that through the PATCH API combined with update if condition.
>
> If you don’t want to actually rip way into the code for the Job Manager
> the ETCD Operator <https://github.com/coreos/etcd-operator> would
> be a good way to bring up an ETCD cluster that is separate from the core
> Kubernetes ETCD database. Combined with zetcd you could probably have that
> up and running quickly.
>
> Thanks,
> James Bucher
>
> From: Hao Sun <hasun@zendesk.com>
> Date: Monday, August 21, 2017 at 9:45 AM
> To: Stephan Ewen <sewen@apache.org>, Shannon Carey <scarey@expedia.com>
> Cc: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: Flink HA with Kubernetes, without Zookeeper
>
> Thanks Shannon for the https://github.com/coreos/zetcd
> <https://github.com/coreos/zetcd> tips, I will check
> that out and share my results if we proceed on that path.
> Thanks Stephan for the details, this is very useful, I was about to ask
> what exactly is stored into zookeeper, haha.
>
> On Mon, Aug 21, 2017 at 9:31 AM Stephan Ewen <sewen@apache.org> wrote:
>
>> Hi!
>>
>> That is a very interesting proposition. In cases where you have a single
>> master only, you may bet away with quite good guarantees without ZK. In
>> fact, Flink does not store significant data in ZK at all, it only uses
>> locks and counters.
>>
>> You can have a setup without ZK, provided you have the following:
>>
>>   - All processes restart (a lost JobManager restarts eventually). Should
>> be given in Kubernetes.
>>
>>   - A way for TaskManagers to discover the restarted JobManager. Should
>> work via Kubernetes as well (restarted containers retain the external
>> hostname)
>>
>>   - A way to isolate different "leader sessions" against each other.
>> Flink currently uses ZooKeeper to also attach a "leader session ID" to
>> leader election, which is a fencing token to avoid that processes talk to
>> each other despite having different views on who is the leader, or whether
>> the leaser lost and re-gained leadership.
>>
>>   - An atomic marker for what is the latest completed checkpoint.
>>
>>   - A distributed atomic counter for the checkpoint ID. This is crucial
>> to ensure correctness of checkpoints in the presence of JobManager failures
>> and re-elections or split-brain situations.
>>
>> I would assume that etcd can provide all of those services. The best way
>> to integrate it would probably be to add an implementation of Flink's
>> "HighAvailabilityServices" based on etcd.
>>
>> Have a look at this class:
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
>> <https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java>
>>
>> If you want to contribute an extension of Flink using etcd, that would be
>> awesome.
>> This should have a FLIP though, and a plan on how to set up rigorous unit
>> testing of that implementation (because its correctness is very crucial to
>> Flink's HA resilience).
>>
>> Best,
>> Stephan
>>
>>
>> On Mon, Aug 21, 2017 at 4:15 PM, Shannon Carey <scarey@expedia.com>
>> wrote:
>>
>>> Zookeeper should still be necessary even in that case, because it is
>>> where the JobManager stores information which needs to be recovered after
>>> the JobManager fails.
>>>
>>> We're eyeing https://github.com/coreos/zetcd
>>> <https://github.com/coreos/zetcd> as a way to run
>>> Zookeeper on top of Kubernetes' etcd cluster so that we don't have to rely
>>> on a separate Zookeeper cluster. However, we haven't tried it yet.
>>>
>>> -Shannon
>>>
>>> From: Hao Sun <hasun@zendesk.com>
>>> Date: Sunday, August 20, 2017 at 9:04 PM
>>> To: "user@flink.apache.org" <user@flink.apache.org>
>>> Subject: Flink HA with Kubernetes, without Zookeeper
>>>
>>> Hi, I am new to Flink and trying to bring up a Flink cluster on top of
>>> Kubernetes.
>>>
>>> For HA setup, with kubernetes, I think I just need one job manager and
>>> do not need Zookeeper? I will store all states to S3 buckets. So in case of
>>> failure, kubernetes can just bring up a new job manager without losing
>>> anything?
>>>
>>> I want to confirm my assumptions above make sense. Thanks
>>>
>>
>>

Mime
View raw message