Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5489B186FD for ; Tue, 27 Oct 2015 15:30:59 +0000 (UTC) Received: (qmail 81218 invoked by uid 500); 27 Oct 2015 15:30:54 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 81132 invoked by uid 500); 27 Oct 2015 15:30:54 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 81122 invoked by uid 99); 27 Oct 2015 15:30:54 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Oct 2015 15:30:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id E03971A2883 for ; Tue, 27 Oct 2015 15:30:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.999 X-Spam-Level: ** X-Spam-Status: No, score=2.999 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H2=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=koeninger_org.20150623.gappssmtp.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id YjJgoxyGXgTW for ; Tue, 27 Oct 2015 15:30:50 +0000 (UTC) Received: from mail-oi0-f43.google.com (mail-oi0-f43.google.com [209.85.218.43]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 50C4620751 for ; Tue, 27 Oct 2015 15:30:50 +0000 (UTC) Received: by oiao187 with SMTP id o187so122103123oia.3 for ; Tue, 27 Oct 2015 08:30:49 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=koeninger_org.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=H5IUU1Bgz73UPP9ue8K3DxHI4XljClGhoOH3d+/ZZJc=; b=mRvnRsXGYzpM6M1qBzEIngSdetCZ80seklXLsimTlvd8kTWsgBYFvnCxsJtNwNEc+Y L15BtgEr0g3R8Cz1RQnGL77y2chyd/2e2E7nmvxHceWyvLKnZqD2dRgfAxO3GUMyd4yN USPu4BUiyX24mxr3j3B6mVBfBvSygwbl5BwZb5JtOsXx/lHtcJzQkRVghFgOhY5jG1ir MxR31dFd2PcIkJGZsMN8t2HIdwIAG5+z0KX97ZiGu7a4LaA4Ov3MrsyPi+dOr+zJj0q6 7w64iY16gmjuMMhopLKLsrw8j7z0ursJ72qInc0/2MIYYigIOmJCZ6QS8XijRgdDJL0n VhaQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:cc:content-type; bh=H5IUU1Bgz73UPP9ue8K3DxHI4XljClGhoOH3d+/ZZJc=; b=CUF3/rrnF5Ht/20zO1WPkNCnhEhCwzNiba6ux9b6YONyDlUeQqidXO9h4Aq69M9gtz VFBZindBXD0zClnFcChdGYRbSgk4ihLScK/s35QM9jra7d79MTM3B+pWXPSz05qVSLsn XwrkipapQ+JQFqbSQ5ss4xh2XHjoantwPqjJzNgavtQ+v53F0+xzlsxmIh54gS2m8IOP Lp7r0bR/0lhPe/KXN3ac9mNxQr/lVPHqV0TKbOP2Hep/+HPYNYwsaXrW7L6xiCJo6kSq 4/6UyX63eKkFS0aTf74yjvlS84TZ+dXzGWwpJNxBvGmuN8Bj3vSR3JYKUWdVdF84H0hm amMQ== X-Gm-Message-State: ALoCoQmuzq3+27KwAlD3pW/tWirO4l2ekVFNtm1XFo2HTlU/v1wXfeGic2CZln1BL1eDqaUwicX2 MIME-Version: 1.0 X-Received: by 10.202.216.8 with SMTP id p8mr27849508oig.65.1445959849525; Tue, 27 Oct 2015 08:30:49 -0700 (PDT) Received: by 10.76.82.133 with HTTP; Tue, 27 Oct 2015 08:30:49 -0700 (PDT) In-Reply-To: References: Date: Tue, 27 Oct 2015 10:30:49 -0500 Message-ID: Subject: Re: correct and fast way to stop streaming application From: Cody Koeninger To: Krot Viacheslav Cc: varun sharma , "user@spark.apache.org" Content-Type: multipart/alternative; boundary=001a113d2dfa30dd4d052317c390 --001a113d2dfa30dd4d052317c390 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable If you want to make sure that your offsets are increasing without gaps... one way to do that is to enforce that invariant when you're saving to your database. That would probably mean using a real database instead of zookeeper though. On Tue, Oct 27, 2015 at 4:13 AM, Krot Viacheslav wrote: > Any ideas? This is so important because we use kafka direct streaming and > save processed offsets manually as last step in the job, so we archive > at-least-once. > But see what happens when new batch is scheduled after a job fails: > - suppose we start from offset 10 loaded from zookeeper > - job starts with offsets 10-20 > - job fails N times, awaitTermination notices that and stops context (or > even jvm with System.exit), but Scheduler has already started new job, it > is job for offsets 20-30, and sent it to executor. > - executor does all the steps (if there is only one stage) and saves > offset 30 to zookeeper. > > This way I loose data in offsets 10-20 > > How should this be handled correctly? > > =D0=BF=D0=BD, 26 =D0=BE=D0=BA=D1=82. 2015 =D0=B3. =D0=B2 18:37, varun sha= rma : > >> +1, wanted to do same. >> >> On Mon, Oct 26, 2015 at 8:58 PM, Krot Viacheslav < >> krot.vyacheslav@gmail.com> wrote: >> >>> Hi all, >>> >>> I wonder what is the correct way to stop streaming application if some >>> job failed? >>> What I have now: >>> >>> val ssc =3D new StreamingContext >>> .... >>> ssc.start() >>> try { >>> ssc.awaitTermination() >>> } catch { >>> case e =3D> ssc.stop(stopSparkContext =3D true, stopGracefully =3D f= alse) >>> } >>> >>> It works but one problem still exists - after job failed and before >>> streaming context is stopped it manages to start job for next batch. Th= at >>> is not desirable for me. >>> It works like this because JobScheduler is an actor and after it report= s >>> error, it goes on with next message that starts next batch job. While >>> ssc.awaitTermination() works in another thread and happens after next b= atch >>> starts. >>> >>> Is there a way to stop before next job is submitted? >>> >> >> >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> > --001a113d2dfa30dd4d052317c390 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
If you want to make sure that your offsets are increasing = without gaps... one way to do that is to enforce that invariant when you= 9;re saving to your database.=C2=A0 That would probably mean using a real d= atabase instead of zookeeper though.

On Tue, Oct 27, 2015 at 4:13 AM, Krot Viacheslav <= span dir=3D"ltr"><krot.vyacheslav@gmail.com> wrote:
Any ideas? T= his is so important because we use kafka direct streaming and save processe= d offsets manually as last step in the job, so we archive at-least-once.
But see what happens when new batch is scheduled after a job fails:=
- suppose we start from offset 10 loaded from zookeeper
<= /div>- job starts with offsets 10-20
- job fails N times, awaitTer= mination notices that and stops context (or even jvm with System.exit), but= Scheduler has already started new job, it is job for offsets 20-30, and se= nt it to executor.
- executor does all the steps (if there is only= one stage) and saves offset 30 to zookeeper.

This way I loos= e data in offsets 10-20

How should this be handled correctly?<= br>

=D0=BF=D0=BD, 26 = =D0=BE=D0=BA=D1=82. 2015 =D0=B3. =D0=B2 18:37, varun sharma <varunsharmansit@gmail.c= om>:
=
+1, wanted to do same.

On Mon, Oct 26= , 2015 at 8:58 PM, Krot Viacheslav <krot.vyacheslav@gmail.com&= gt; wrote:
<= div>
Hi all,

I wonder what is the corr= ect way to stop streaming application if some job failed?
What I h= ave now:

val ssc =3D new StreamingContext=20
....
ssc.start()
try {
=C2=A0=C2=A0 ssc.awaitTermination()
} catch {
=C2=A0=C2=A0 case e =3D> ssc.stop(stopSparkContext =3D true, stopG= racefully =3D false)
}

It works but one problem still exists - after job failed= and before streaming context is stopped it manages to start job for next b= atch. That is not desirable for me.
It works like this because Jo= bScheduler is an actor and after it reports error, it goes on with next mes= sage that starts next batch job. While ssc.awaitTermination() works in anot= her thread and happens after next batch starts.

Is ther= e a way to stop before next job is submitted?



--
VARUN SHARMA
Flipkart
Bang= alore

--001a113d2dfa30dd4d052317c390--