Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2D410200B4A for ; Wed, 6 Jul 2016 04:18:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2BDDB160A77; Wed, 6 Jul 2016 02:18:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0D66B160A6F for ; Wed, 6 Jul 2016 04:18:26 +0200 (CEST) Received: (qmail 8538 invoked by uid 500); 6 Jul 2016 02:18:21 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 8528 invoked by uid 99); 6 Jul 2016 02:18:21 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jul 2016 02:18:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id B2326C0A6A for ; Wed, 6 Jul 2016 02:18:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.612 X-Spam-Level: ** X-Spam-Status: No, score=2.612 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id uwtwByQOJZPT for ; Wed, 6 Jul 2016 02:18:18 +0000 (UTC) Received: from mail-io0-f180.google.com (mail-io0-f180.google.com [209.85.223.180]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 9C32F5F1B3 for ; Wed, 6 Jul 2016 02:18:17 +0000 (UTC) Received: by mail-io0-f180.google.com with SMTP id t74so189633070ioi.0 for ; Tue, 05 Jul 2016 19:18:17 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=sgSouvZTnwx66ugT+3wbonkh2pbE3t1mDY2TKYEbw4s=; b=AnXymH6N5S3Deceh5r3GYOplSghci5duuYla8aC6AWP8ABS1O8uLo+Z6I1eqzMCPIB TonM7HI+snawVDZWShyHzhlWVckTOpy7OJ2HBdojgZEQueS4nsVGyYN4lc7AwxiwZ8V3 Z92qCCZb0HoFZigAieoPpG1l5f+AqFIuCqGvu4UGQK0RPCEfnQLkq9z2TgWgWOiSF4Cs 8Y9yg2iURI9/buwXqWAloBfHWIxid9zv9zAKwrd50bzeEMZ2BN5VoU6ZjF2RjNTglZzE v8I0gm4u7uJ51gmY4RBfLkurlYIR5da6ekX9xLT3Z2M2J70ehIlST4DA0Xan/ArKtbRX 9xIg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=sgSouvZTnwx66ugT+3wbonkh2pbE3t1mDY2TKYEbw4s=; b=ef9cz8VVKk6RUPe4ZllcOlM4lsU2GEhz064Bbj3/MfVKZm2eh/7+VylB4+797A2R6y UdUmxYduNteyvAxNtnN2vr2EvJ55ZyeJK+nnXDes2RnhLDVhnaFPMZ9O+jUdXv+9h40K LBH3SDjntjANOmV+80UGo/5bWWvod8YGcBqWA6hF7wVjiFeWM+n3b08nIjGUO2Y2ofKO IkLvSb7uOZFFWMPMuppDbW/kcnfPva7xJKImzXz4lZsI7y4khNXMAuPcpTTD+zc8xQqe 4vOkwCMcHsQt5V5yrTxqw03YDLBmHag3ZaIiS/tz50nwCdnrbGaPWIlO03XWon4aPxTo w3xQ== X-Gm-Message-State: ALyK8tJ3++cP40LMMUkkWBZDyBFSt3g7/VI7ZnBWY0UTUcdHr2U+Z3oV8nCgbABdhPmpEcJKrZwvsqxQbBpmGMTz X-Received: by 10.107.182.197 with SMTP id g188mr15337669iof.53.1467771496344; Tue, 05 Jul 2016 19:18:16 -0700 (PDT) MIME-Version: 1.0 Received: by 10.107.15.98 with HTTP; Tue, 5 Jul 2016 19:18:15 -0700 (PDT) In-Reply-To: References: <1467405271576-7764.post@n4.nabble.com> From: Jamie Grier Date: Tue, 5 Jul 2016 19:18:15 -0700 Message-ID: Subject: Re: Failed job restart - flink on yarn To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114ac588a6cc760536ee2efc archived-at: Wed, 06 Jul 2016 02:18:28 -0000 --001a114ac588a6cc760536ee2efc Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable The Kafka client can be configured to commit offsets to Zookeeper periodically even when those offsets are not used in the normal fault-tolerance case. Normally, the Kafka offsets are part of Flink's normal state. However, in the absence of this state the FlinkKafkaConsumer will actually retrieve the last committed offsets so you may not need to do anything special in your case unless I've misunderstood you. On Tue, Jul 5, 2016 at 4:18 PM, vprabhu@gmail.com wrote= : > Thanks for the reply, It would be great to have the feature to restart a > failed job from the last checkpoint. > > > Is there a way to pass the initial set of partition-offsets to the > kafka-client ? In that case I can maintain a list of last processed offse= ts > from within my window operation (possibly store the offsets in some > database) and use that to bootstrap the kafka client upon restart. > > I realize that I can probably reset the offsets for the consumer group > from some external program to the last fully processed offsets and restar= t > the job, just want to confirm if there is already a feature in the > kafka-client. > > Thanks, > Prabhu > > On Mon, Jul 4, 2016 at 2:17 AM, Ufuk Celebi [via Apache Flink User Mailin= g > List archive.] <[hidden email] > > wrote: > >> If you just re-submit the job without a savepoint, the Kafka consumer >> will by default start processing from the latest offset and the >> operators will be in an empty state. It should be possible to add a >> feature to Flink, which allows turning the latest checkpoint to a >> savepoint, from which you then could resume the job after increasing >> the container memory. But I'm afraid that this won't make it to the >> next release though. I will open an issue for it though. >> >> A work around (more a hack) would be to run in HA mode >> ( >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobman= ager_high_availability.html) >> >> and just shut down the YARN containers without cancelling the job. The >> latest checkpoint meta data should be stored in ZooKeeper and resumed >> when you restart the cluster. It's really more a hack/abuse of HA >> though. >> >> =E2=80=93 Ufuk >> >> >> On Sat, Jul 2, 2016 at 7:09 AM, [hidden email] >> <[hidden emai= l] >> > wrote: >> >> > Hi Jamie, >> > >> > Thanks for the reply. >> > >> > Yeah i looked at save points, i want to start my job only from the las= t >> > checkpoint, this means I have to keep track of when the checkpoint was >> taken >> > and the trigger a save point. I am not sure this is the way to go. My >> state >> > backend is HDFS and I can see that the checkpoint path has the data >> that has >> > been buffered in the window. >> > >> > I want to start the job in a way such that it will read the >> checkpointed >> > data before the failure and continue processing. >> > >> > I realise that the checkpoints are used whenever there is a container >> > failure, and a new container is obtained. In my case the job failed >> because >> > a container failed for the maximum AllowedN umber of failures >> > >> > Thanks, >> > Prabhu >> > >> > On Fri, Jul 1, 2016 at 3:54 PM, Jamie Grier [via Apache Flink User >> Mailing >> > List archive.] <[hidden email]> wrote: >> >> >> >> >> Hi Prabhu, >> >> >> >> Have you taken a look at Flink's savepoints feature? This allows you >> to >> >> make snapshots of your job's state on demand and then at any time >> restart >> >> your job from that point: >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streami= ng/savepoints.html >> >> >> >> Also know that you can use Flink disk-backed state backend as well if >> >> you're job state is larger than fits in memory. See >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streami= ng/state_backends.html#the-rocksdbstatebackend >> >> >> >> >> >> -Jamie >> >> >> >> >> >> On Fri, Jul 1, 2016 at 1:34 PM, [hidden email] <[hidden email]> wrote= : >> >>> >> >>> Hi, >> >>> >> >>> I have a flink streaming job that reads from kafka, performs a >> >>> aggregation >> >>> in a window, it ran fine for a while however when the number of >> events in >> >>> a >> >>> window crossed a certain limit , the yarn containers failed with Out >> Of >> >>> Memory. The job was running with 10G containers. >> >>> >> >>> We have about 64G memory on the machine and now I want to restart th= e >> job >> >>> with a 20G container (we ran some tests and 20G should be good enoug= h >> to >> >>> accomodate all the elements from the window). >> >>> >> >>> Is there a way to restart the job from the last checkpoint ? >> >>> >> >>> When I resubmit the job, it starts from the last committed offsets >> >>> however >> >>> the events that were held in the window at the time of checkpointing >> seem >> >>> to >> >>> get lost. Is there a way to recover the events buffered within the >> window >> >>> and were checkpointed before the failure ? >> >>> >> >>> Thanks, >> >>> Prabhu >> >>> >> >>> >> >>> >> >>> -- >> >>> View this message in context: >> >>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fail= ed-job-restart-flink-on-yarn-tp7764.html >> >>> Sent from the Apache Flink User Mailing List archive. mailing list >> >>> archive at Nabble.com. >> >> >> >> >> >> >> >> >> >> -- >> >> >> >> Jamie Grier >> >> data Artisans, Director of Applications Engineering >> >> @jamiegrier >> >> [hidden email] >> >> >> >> >> >> >> >> ________________________________ >> >> If you reply to this email, your message will be added to the >> discussion >> >> below: >> >> >> >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fail= ed-job-restart-flink-on-yarn-tp7764p7767.html >> >> To unsubscribe from Failed job restart - flink on yarn, click here. >> >> NAML >> > >> > >> > >> > ________________________________ >> > View this message in context: Re: Failed job restart - flink on yarn >> > >> > Sent from the Apache Flink User Mailing List archive. mailing list >> archive >> > at Nabble.com. >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the discussion >> below: >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fail= ed-job-restart-flink-on-yarn-tp7764p7784.html >> To unsubscribe from Failed job restart - flink on yarn, click here. >> NAML >> >> > > > ------------------------------ > View this message in context: Re: Failed job restart - flink on yarn > > Sent from the Apache Flink User Mailing List archive. mailing list archiv= e > at > Nabble.com. > --=20 Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier jamie@data-artisans.com --001a114ac588a6cc760536ee2efc Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
The Kafka client can be configured to commit offsets to Zo= okeeper periodically even when those offsets are not used in the normal fau= lt-tolerance case.=C2=A0 Normally, the Kafka offsets are part of Flink'= s normal state.=C2=A0 However, in the absence of this state the FlinkKafkaC= onsumer will actually retrieve the last committed offsets so you may not ne= ed to do anything special in your case unless I've misunderstood you.

On Tue, Jul 5, 2016 at 4:18 PM, vpra= bhu@gmail.com <vprabhu@gmail.com> wrote:
Thanks for the reply, It would be = great to have the feature to restart a failed job from the last checkpoint.=


Is there a way to pass the initial set of partition-= offsets to the kafka-client ? In that case I can maintain a list of last pr= ocessed offsets from within my window operation (possibly store the offsets= in some database) and use that to bootstrap the kafka client upon restart.=

I realize that I can probably reset the offsets for the= consumer group from some external program to the last fully processed offs= ets and restart the job, just want to confirm if there is already a feature= in the kafka-client.

Thanks,
Pr= abhu

On Mon, Jul 4, 2016 at 2:17 AM, Ufuk Celebi [via Apache= Flink User Mailing List archive.] <[hidden email]> wrote:
If you just re-submit the job without a savepoint, the Kafka consumer
will by default start processing from the latest offset and the
operators will be in an empty state. It should be possible to add a
feature to Flink, which allows turning the latest checkpoint to a
savepoint, from which you then could resume the job after increasing
the container memory. But I'm afraid that this won't make it to= the
next release though. I will open an issue for it though.

A work around (more a hack) would be to run in HA mode
(https://ci.apache.org/projects/flink/flink-docs-release= -1.0/setup/jobmanager_high_availability.html)
and just shut down the YARN containers without cancelling the job. The
latest checkpoint meta data should be stored in ZooKeeper and resumed
when you restart the cluster. It's really more a hack/abuse of HA
though.

=E2=80=93 Ufuk


On Sat, Jul 2, 2016 = at 7:09 AM, [hidden = email] <[hi= dden email]> wrote:

> Hi Jamie,
>
> Thanks for the reply.
>
> Yeah i looked at save points, i want to start my job only from the= last
> checkpoint, this means I have to keep track of when the checkpoint= was taken
> and the trigger a save point. I am not sure this is the way to go.= My state
> backend is HDFS and I can see that the checkpoint path has the dat= a that has
> been buffered in the window.
>
> I want to start the job in a way such that it will read the checkp= ointed
> data before the failure and continue processing.
>
> I realise that the checkpoints are used whenever there is a contai= ner
> failure, and a new container is obtained. In my case the job faile= d because
> a container failed for the maximum AllowedN umber of failures
>
> Thanks,
> Prabhu
>
> On Fri, Jul 1, 2016 at 3:54 PM, Jamie Grier [via Apache Flink User= Mailing
> List archive.] <[hidden email]> wrote:

>>
>> Hi Prabhu,
>>
>> Have you taken a look at Flink's savepoints feature?=C2=A0= This allows you to
>> make snapshots of your job's state on demand and then at a= ny time restart
>> your job from that point:
>> https://ci.apache.org/projects/flink/flink-docs-release-= 1.0/apis/streaming/savepoints.html
>>
>> Also know that you can use Flink disk-backed state backend as = well if
>> you're job state is larger than fits in memory.=C2=A0 See
>> https://ci.apache.org/projec= ts/flink/flink-docs-release-1.0/apis/streaming/state_backends.html#the-rock= sdbstatebackend
>>
>>
>> -Jamie
>>
>>
>> On Fri, Jul 1, 2016 at 1:34 PM, [hidden email] <[hidden ema= il]> wrote:
>>>
>>> Hi,
>>>
>>> I have a flink streaming job that reads from kafka, perfor= ms a
>>> aggregation
>>> in a window, it ran fine for a while however when the numb= er of events in
>>> a
>>> window crossed a certain limit , the yarn containers faile= d with Out Of
>>> Memory. The job was running with 10G containers.
>>>
>>> We have about 64G memory on the machine and now I want to = restart the job
>>> with a 20G container (we ran some tests and 20G should be = good enough to
>>> accomodate all the elements from the window).
>>>
>>> Is there a way to restart the job from the last checkpoint= ?
>>>
>>> When I resubmit the job, it starts from the last committed= offsets
>>> however
>>> the events that were held in the window at the time of che= ckpointing seem
>>> to
>>> get lost. Is there a way to recover the events buffered wi= thin the window
>>> and were checkpointed before the failure ?
>>>
>>> Thanks,
>>> Prabhu
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-maili= ng-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp77= 64.html
>>> Sent from the Apache Flink User Mailing List ar= chive. mailing list
>>> archive at Nabble.com.
>>
>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier
>> [hidden email]
>>
>>
>>
>> ________________________________
>> If you reply to this email, your message will be added to the = discussion
>> below:
>>
>> http://apache-flink-user-mail= ing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7= 764p7767.html
>> To unsubscribe from Failed job restart - flin= k on yarn, click here.
>> NAML
>
>
>
> ________________________________
> View this message in context: Re: Failed job restart - flink on ya= rn
>
> Sent from the Apache Flink User Mailing List archive. mailing list= archive
> at Nabble.com.
=09 =09 =09


If you reply to this email, your message = will be added to the discussion below:
http://apache-flink-u= ser-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-= yarn-tp7764p7784.html
=09 To unsubscribe from Failed job restart - flink on yarn, click here.
= NAML

=09 =09 =09

View this message in context: Re: Failed job restart - flink on yarn
Sent from the Apache Flink User Mailing List archiv= e. mailing list archive at Nabble.com.



--

Jamie Grier
data= Artisans, Director of Applications Engineering

--001a114ac588a6cc760536ee2efc--