Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F405218FF1 for ; Mon, 23 Nov 2015 13:51:18 +0000 (UTC) Received: (qmail 50205 invoked by uid 500); 23 Nov 2015 13:51:18 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 50125 invoked by uid 500); 23 Nov 2015 13:51:18 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 50116 invoked by uid 99); 23 Nov 2015 13:51:18 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Nov 2015 13:51:18 +0000 Received: from mail-wm0-f44.google.com (mail-wm0-f44.google.com [74.125.82.44]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 255EC1A0048 for ; Mon, 23 Nov 2015 13:51:18 +0000 (UTC) Received: by wmec201 with SMTP id c201so161527190wme.0 for ; Mon, 23 Nov 2015 05:51:16 -0800 (PST) MIME-Version: 1.0 X-Received: by 10.194.243.6 with SMTP id wu6mr30068089wjc.14.1448286676789; Mon, 23 Nov 2015 05:51:16 -0800 (PST) Received: by 10.28.170.68 with HTTP; Mon, 23 Nov 2015 05:51:16 -0800 (PST) In-Reply-To: References: <6DAD18F2-8F82-457D-9378-87A640151884@apache.org> <43174418-4B4B-4CFB-9FB3-382A07229B5E@apache.org> <54BCA669-5135-4A3E-A9A5-2A54EBE39827@apache.org> Date: Mon, 23 Nov 2015 14:51:16 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: YARN High Availability From: Till Rohrmann To: user@flink.apache.org Content-Type: multipart/alternative; boundary=089e01494146e734cc05253584a4 --089e01494146e734cc05253584a4 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable The problem is the execution graph handle which is stored in ZooKeeper. You can manually remove it via the ZooKeeper shell by simply deleting everything below your `recovery.zookeeper.path.root` ZNode. But you should be sure that the cluster has been stopped before. Do you start the different clusters with different `recovery.zookeeper.path.root` values? If not, then you should run into troubles when running multiple clusters at the same time. The reason is that then all clusters will think that they belong together. Cheers, Till On Mon, Nov 23, 2015 at 2:15 PM, Gwenhael Pasquiers < gwenhael.pasquiers@ericsson.com> wrote: > OK, I understand. > > Maybe we are not really using flink as you intended. The way we are using > it, one cluster equals one job. That way we are sure to isolate the > different jobs as much as possible and in case of crashes / bugs / (etc) > can completely kill one cluster without interfering with the other jobs. > > That future behavior seems good :-) > > Instead of the manual flink commands, is there to manually delete those > old jobs before launching my job ? They probably are somewhere in hdfs, > aren't they ? > > B.R. > > > -----Original Message----- > From: Ufuk Celebi [mailto:uce@apache.org] > Sent: lundi 23 novembre 2015 12:12 > To: user@flink.apache.org > Subject: Re: YARN High Availability > > Hey Gwenha=C3=ABl, > > the restarting jobs are most likely old job submissions. They are not > cleaned up when you shut down the cluster, but only when they finish > (either regular finish or after cancelling). > > The workaround is to use the command line frontend: > > bin/flink cancel JOBID > > for each RESTARTING job. Sorry about the inconvenience! > > We are in an active discussion about addressing this. The future behaviou= r > will be that the startup or shutdown of a cluster cleans up everything an= d > an option to skip this step. > > The reasoning for the initial solution (not removing anything) was to mak= e > sure that no jobs are deleted by accident. But it looks like this is more > confusing than helpful. > > =E2=80=93 Ufuk > > > On 23 Nov 2015, at 11:45, Gwenhael Pasquiers < > gwenhael.pasquiers@ericsson.com> wrote: > > > > Hi again ! > > > > On the same topic I'm still trying to start my streaming job with HA. > > The HA part seems to be more or less OK (I killed the JobManager and it > came back), however I have an issue with the TaskManagers. > > I configured my job to have only one TaskManager and 1 slot that does > [source=3D>map=3D>sink]. > > The issue I'm encountering is that other instances of my job appear and > are in the RESTARTING status since there is only one task slot. > > > > Do you know of this, or have an idea of where to look in order to > understand what's happening ? > > > > B.R. > > > > Gwenha=C3=ABl PASQUIERS > > > > -----Original Message----- > > From: Maximilian Michels [mailto:mxm@apache.org] > > Sent: jeudi 19 novembre 2015 13:36 > > To: user@flink.apache.org > > Subject: Re: YARN High Availability > > > > The docs have been updated. > > > > On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi wrote: > >> I=E2=80=99ve added a note about this to the docs and asked Max to trig= ger a new > build of them. > >> > >> Regarding Aljoscha=E2=80=99s idea: I like it. It is essentially a shor= tcut for > configuring the root path. > >> > >> In any case, it is orthogonal to Till=E2=80=99s proposals. That one we= need to > address as well (see FLINK-2929). The motivation for the current behaviou= r > was to be rather defensive when removing state in order to not loose data > accidentally. But it can be confusing, indeed. > >> > >> =E2=80=93 Ufuk > >> > >>> On 19 Nov 2015, at 12:08, Till Rohrmann wrote: > >>> > >>> You mean an additional start-up parameter for the `start-cluster.sh` > script for the HA case? That could work. > >>> > >>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek < > aljoscha@apache.org> wrote: > >>> Maybe we could add a user parameter to specify a cluster name that is > used to make the paths unique. > >>> > >>> > >>> On Thu, Nov 19, 2015, 11:24 Till Rohrmann > wrote: > >>> I agree that this would make the configuration easier. However, it > entails also that the user has to retrieve the randomized path from the > logs if he wants to restart jobs after the cluster has crashed or > intentionally restarted. Furthermore, the system won't be able to clean u= p > old checkpoint and job handles in case that the cluster stop was > intentional. > >>> > >>> Thus, the question is how do we define the behaviour in order to > retrieve handles and to clean up old handles so that ZooKeeper won't be > cluttered with old handles? > >>> > >>> There are basically two modes: > >>> > >>> 1. Keep state handles when shutting down the cluster. Provide a mean > to define a fixed path when starting the cluster and also a mean to purge > old state handles. Furthermore, add a shutdown mode where the handles und= er > the current path are directly removed. This mode would guarantee to alway= s > have the state handles available if not explicitly told differently. > However, the downside is that ZooKeeper will be cluttered most certainly. > >>> > >>> 2. Remove the state handles when shutting down the cluster. Provide a > shutdown mode where we keep the state handles. This will keep ZooKeeper > clean but will give you also the possibility to keep a checkpoint around = if > necessary. However, the user is more likely to lose his state when shutti= ng > down the cluster. > >>> > >>> On Thu, Nov 19, 2015 at 10:55 AM, Robert Metzger > wrote: > >>> I agree with Aljoscha. Many companies install Flink (and its config) > in a central directory and users share that installation. > >>> > >>> On Thu, Nov 19, 2015 at 10:45 AM, Aljoscha Krettek < > aljoscha@apache.org> wrote: > >>> I think we should find a way to randomize the paths where the HA stuf= f > stores data. If users don=E2=80=99t realize that they store data in the s= ame paths > this could lead to problems. > >>> > >>>> On 19 Nov 2015, at 08:50, Till Rohrmann wrote= : > >>>> > >>>> Hi Gwenha=C3=ABl, > >>>> > >>>> good to hear that you could resolve the problem. > >>>> > >>>> When you run multiple HA flink jobs in the same cluster, then you > don=E2=80=99t have to adjust the configuration of Flink. It should work o= ut of the > box. > >>>> > >>>> However, if you run multiple HA Flink cluster, then you have to set > for each cluster a distinct ZooKeeper root path via the option > recovery.zookeeper.path.root in the Flink configuraiton. This is necessar= y > because otherwise all JobManagers (the ones of the different clusters) wi= ll > compete for a single leadership. Furthermore, all TaskManagers will only > see the one and only leader and connect to it. The reason is that the > TaskManagers will look up their leader at a ZNode below the ZooKeeper roo= t > path. > >>>> > >>>> If you have other questions then don=E2=80=99t hesitate asking me. > >>>> > >>>> Cheers, > >>>> Till > >>>> > >>>> > >>>> On Wed, Nov 18, 2015 at 6:37 PM, Gwenhael Pasquiers < > gwenhael.pasquiers@ericsson.com> wrote: > >>>> Nevermind, > >>>> > >>>> > >>>> > >>>> Looking at the logs I saw that it was having issues trying to connec= t > to ZK. > >>>> > >>>> To make I short is had the wrong port. > >>>> > >>>> > >>>> > >>>> It is now starting. > >>>> > >>>> > >>>> > >>>> Tomorrow I=E2=80=99ll try to kill some JobManagers *evil*. > >>>> > >>>> > >>>> > >>>> Another question : if I have multiple HA flink jobs, are there some > points to check in order to be sure that they won=E2=80=99t collide on hd= fs or ZK ? > >>>> > >>>> > >>>> > >>>> B.R. > >>>> > >>>> > >>>> > >>>> Gwenha=C3=ABl PASQUIERS > >>>> > >>>> > >>>> > >>>> From: Till Rohrmann [mailto:till.rohrmann@gmail.com] > >>>> Sent: mercredi 18 novembre 2015 18:01 > >>>> To: user@flink.apache.org > >>>> Subject: Re: YARN High Availability > >>>> > >>>> > >>>> > >>>> Hi Gwenha=C3=ABl, > >>>> > >>>> > >>>> > >>>> do you have access to the yarn logs? > >>>> > >>>> > >>>> > >>>> Cheers, > >>>> > >>>> Till > >>>> > >>>> > >>>> > >>>> On Wed, Nov 18, 2015 at 5:55 PM, Gwenhael Pasquiers < > gwenhael.pasquiers@ericsson.com> wrote: > >>>> > >>>> Hello, > >>>> > >>>> > >>>> > >>>> We=E2=80=99re trying to set up high availability using an existing z= ookeeper > quorum already running in our Cloudera cluster. > >>>> > >>>> > >>>> > >>>> So, as per the doc we=E2=80=99ve changed the max attempt in yarn=E2= =80=99s config as > well as the flink.yaml. > >>>> > >>>> > >>>> > >>>> recovery.mode: zookeeper > >>>> > >>>> recovery.zookeeper.quorum: host1:3181,host2:3181,host3:3181 > >>>> > >>>> state.backend: filesystem > >>>> > >>>> state.backend.fs.checkpointdir: hdfs:///flink/checkpoints > >>>> > >>>> recovery.zookeeper.storageDir: hdfs:///flink/recovery/ > >>>> > >>>> yarn.application-attempts: 1000 > >>>> > >>>> > >>>> > >>>> Everything is ok as long as recovery.mode is commented. > >>>> > >>>> As soon as I uncomment recovery.mode the deployment on yarn is stuck > on : > >>>> > >>>> > >>>> > >>>> =E2=80=9CDeploying cluster, current state ACCEPTED=E2=80=9D. > >>>> > >>>> =E2=80=9CDeployment took more than 60 seconds=E2=80=A6.=E2=80=9D > >>>> > >>>> Every second. > >>>> > >>>> > >>>> > >>>> And I have more than enough resources available on my yarn cluster. > >>>> > >>>> > >>>> > >>>> Do you have any idea of what could cause this, and/or what logs I > should look for in order to understand ? > >>>> > >>>> > >>>> > >>>> B.R. > >>>> > >>>> > >>>> > >>>> Gwenha=C3=ABl PASQUIERS > >>>> > >>>> > >>>> > >>>> > >>> > >>> > >>> > >>> > >> > > > > --089e01494146e734cc05253584a4 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
The problem is the execution graph handle which is stored = in ZooKeeper. You can manually remove it via the ZooKeeper shell by simply = deleting everything below your `recovery.zookeeper.path.root` ZNode. But yo= u should be sure that the cluster has been stopped before.

Do you start the different clusters with different `recovery.zookeeper.p= ath.root` values? If not, then you should run into troubles when running mu= ltiple clusters at the same time. The reason is that then all clusters will= think that they belong together.

Cheers,
Till

On Mon, Nov 23, 2015 at 2:15 PM, Gwenhael Pasquiers <= gwenha= el.pasquiers@ericsson.com> wrote:
OK, I understand.

Maybe we are not really using flink as you intended. The way we are using i= t, one cluster equals one job. That way we are sure to isolate the differen= t jobs as much as possible and in case of crashes / bugs / (etc) can comple= tely kill one cluster without interfering with the other jobs.

That future behavior seems good :-)

Instead of the manual flink commands, is there to manually delete those old= jobs before launching my job ? They probably are somewhere in hdfs, aren&#= 39;t they ?

B.R.


-----Original Message-----
From: Ufuk Celebi [mailto:uce@apache.org<= /a>]
Sent: lundi 23 novembre 2015 12:12
To:
user@flink.apache.org
Subject: Re: YARN High Availability

Hey Gwenha=C3=ABl,

the restarting jobs are most likely old job submissions. They are not clean= ed up when you shut down the cluster, but only when they finish (either reg= ular finish or after cancelling).

The workaround is to use the command line frontend:

bin/flink cancel JOBID

for each RESTARTING job. Sorry about the inconvenience!

We are in an active discussion about addressing this. The future behaviour = will be that the startup or shutdown of a cluster cleans up everything and = an option to skip this step.

The reasoning for the initial solution (not removing anything) was to make = sure that no jobs are deleted by accident. But it looks like this is more c= onfusing than helpful.

=E2=80=93 Ufuk

> On 23 Nov 2015, at 11:45, Gwenhael Pasquiers <gwenhael.pasquiers@ericsson.com> wrote= :
>
> Hi again !
>
> On the same topic I'm still trying to start my streaming job with = HA.
> The HA part seems to be more or less OK (I killed the JobManager and i= t came back), however I have an issue with the TaskManagers.
> I configured my job to have only one TaskManager and 1 slot that does = [source=3D>map=3D>sink].
> The issue I'm encountering is that other instances of my job appea= r and are in the RESTARTING status since there is only one task slot.
>
> Do you know of this, or have an idea of where to look in order to unde= rstand what's happening ?
>
> B.R.
>
> Gwenha=C3=ABl PASQUIERS
>
> -----Original Message-----
> From: Maximilian Michels [mailto:mxm= @apache.org]
> Sent: jeudi 19 novembre 2015 13:36
> To: user@flink.apache.org=
> Subject: Re: YARN High Availability
>
> The docs have been updated.
>
> On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi <uce@apache.org> wrote:
>> I=E2=80=99ve added a note about this to the docs and asked Max to = trigger a new build of them.
>>
>> Regarding Aljoscha=E2=80=99s idea: I like it. It is essentially a = shortcut for configuring the root path.
>>
>> In any case, it is orthogonal to Till=E2=80=99s proposals. That on= e we need to address as well (see FLINK-2929). The motivation for the curre= nt behaviour was to be rather defensive when removing state in order to not= loose data accidentally. But it can be confusing, indeed.
>>
>> =E2=80=93 Ufuk
>>
>>> On 19 Nov 2015, at 12:08, Till Rohrmann <trohrmann@apache.org> wrote:
>>>
>>> You mean an additional start-up parameter for the `start-clust= er.sh` script for the HA case? That could work.
>>>
>>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
>>> Maybe we could add a user parameter to specify a cluster name = that is used to make the paths unique.
>>>
>>>
>>> On Thu, Nov 19, 2015, 11:24 Till Rohrmann <trohrmann@apache.org> wrote:
>>> I agree that this would make the configuration easier. However= , it entails also that the user has to retrieve the randomized path from th= e logs if he wants to restart jobs after the cluster has crashed or intenti= onally restarted. Furthermore, the system won't be able to clean up old= checkpoint and job handles in case that the cluster stop was intentional.<= br> >>>
>>> Thus, the question is how do we define the behaviour in order = to retrieve handles and to clean up old handles so that ZooKeeper won't= be cluttered with old handles?
>>>
>>> There are basically two modes:
>>>
>>> 1. Keep state handles when shutting down the cluster. Provide = a mean to define a fixed path when starting the cluster and also a mean to = purge old state handles. Furthermore, add a shutdown mode where the handles= under the current path are directly removed. This mode would guarantee to = always have the state handles available if not explicitly told differently.= However, the downside is that ZooKeeper will be cluttered most certainly.<= br> >>>
>>> 2. Remove the state handles when shutting down the cluster. Pr= ovide a shutdown mode where we keep the state handles. This will keep ZooKe= eper clean but will give you also the possibility to keep a checkpoint arou= nd if necessary. However, the user is more likely to lose his state when sh= utting down the cluster.
>>>
>>> On Thu, Nov 19, 2015 at 10:55 AM, Robert Metzger <rmetzger@apache.org> wrote:
>>> I agree with Aljoscha. Many companies install Flink (and its c= onfig) in a central directory and users share that installation.
>>>
>>> On Thu, Nov 19, 2015 at 10:45 AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
>>> I think we should find a way to randomize the paths where the = HA stuff stores data. If users don=E2=80=99t realize that they store data i= n the same paths this could lead to problems.
>>>
>>>> On 19 Nov 2015, at 08:50, Till Rohrmann <trohrmann@apache.org> wrote:
>>>>
>>>> Hi Gwenha=C3=ABl,
>>>>
>>>> good to hear that you could resolve the problem.
>>>>
>>>> When you run multiple HA flink jobs in the same cluster, t= hen you don=E2=80=99t have to adjust the configuration of Flink. It should = work out of the box.
>>>>
>>>> However, if you run multiple HA Flink cluster, then you ha= ve to set for each cluster a distinct ZooKeeper root path via the option re= covery.zookeeper.path.root in the Flink configuraiton. This is necessary be= cause otherwise all JobManagers (the ones of the different clusters) will c= ompete for a single leadership. Furthermore, all TaskManagers will only see= the one and only leader and connect to it. The reason is that the TaskMana= gers will look up their leader at a ZNode below the ZooKeeper root path. >>>>
>>>> If you have other questions then don=E2=80=99t hesitate as= king me.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>>
>>>> On Wed, Nov 18, 2015 at 6:37 PM, Gwenhael Pasquiers <gwenhael.pasquiers@ericsso= n.com> wrote:
>>>> Nevermind,
>>>>
>>>>
>>>>
>>>> Looking at the logs I saw that it was having issues trying= to connect to ZK.
>>>>
>>>> To make I short is had the wrong port.
>>>>
>>>>
>>>>
>>>> It is now starting.
>>>>
>>>>
>>>>
>>>> Tomorrow I=E2=80=99ll try to kill some JobManagers *evil*.=
>>>>
>>>>
>>>>
>>>> Another question : if I have multiple HA flink jobs, are t= here some points to check in order to be sure that they won=E2=80=99t colli= de on hdfs or ZK ?
>>>>
>>>>
>>>>
>>>> B.R.
>>>>
>>>>
>>>>
>>>> Gwenha=C3=ABl PASQUIERS
>>>>
>>>>
>>>>
>>>> From: Till Rohrmann [mailto:till.rohrmann@gmail.com]
>>>> Sent: mercredi 18 novembre 2015 18:01
>>>> To: user@flink.ap= ache.org
>>>> Subject: Re: YARN High Availability
>>>>
>>>>
>>>>
>>>> Hi Gwenha=C3=ABl,
>>>>
>>>>
>>>>
>>>> do you have access to the yarn logs?
>>>>
>>>>
>>>>
>>>> Cheers,
>>>>
>>>> Till
>>>>
>>>>
>>>>
>>>> On Wed, Nov 18, 2015 at 5:55 PM, Gwenhael Pasquiers <gwenhael.pasquiers@ericsso= n.com> wrote:
>>>>
>>>> Hello,
>>>>
>>>>
>>>>
>>>> We=E2=80=99re trying to set up high availability using an = existing zookeeper quorum already running in our Cloudera cluster.
>>>>
>>>>
>>>>
>>>> So, as per the doc we=E2=80=99ve changed the max attempt i= n yarn=E2=80=99s config as well as the flink.yaml.
>>>>
>>>>
>>>>
>>>> recovery.mode: zookeeper
>>>>
>>>> recovery.zookeeper.quorum: host1:3181,host2:3181,host3:318= 1
>>>>
>>>> state.backend: filesystem
>>>>
>>>> state.backend.fs.checkpointdir: hdfs:///flink/checkpoints<= br> >>>>
>>>> recovery.zookeeper.storageDir: hdfs:///flink/recovery/
>>>>
>>>> yarn.application-attempts: 1000
>>>>
>>>>
>>>>
>>>> Everything is ok as long as recovery.mode is commented. >>>>
>>>> As soon as I uncomment recovery.mode the deployment on yar= n is stuck on :
>>>>
>>>>
>>>>
>>>> =E2=80=9CDeploying cluster, current state ACCEPTED=E2=80= =9D.
>>>>
>>>> =E2=80=9CDeployment took more than 60 seconds=E2=80=A6.=E2= =80=9D
>>>>
>>>> Every second.
>>>>
>>>>
>>>>
>>>> And I have more than enough resources available on my yarn= cluster.
>>>>
>>>>
>>>>
>>>> Do you have any idea of what could cause this, and/or what= logs I should look for in order to understand ?
>>>>
>>>>
>>>>
>>>> B.R.
>>>>
>>>>
>>>>
>>>> Gwenha=C3=ABl PASQUIERS
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>
> <unwanted_jobs.jpg>


--089e01494146e734cc05253584a4--