Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AD8BA18567 for ; Tue, 27 Oct 2015 23:45:19 +0000 (UTC) Received: (qmail 3220 invoked by uid 500); 27 Oct 2015 23:45:16 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 3128 invoked by uid 500); 27 Oct 2015 23:45:16 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 3105 invoked by uid 99); 27 Oct 2015 23:45:16 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Oct 2015 23:45:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D45031A213C for ; Tue, 27 Oct 2015 23:45:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.998 X-Spam-Level: ** X-Spam-Status: No, score=2.998 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=databricks_com.20150623.gappssmtp.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 5aLMdrhg8_31 for ; Tue, 27 Oct 2015 23:45:14 +0000 (UTC) Received: from mail-yk0-f172.google.com (mail-yk0-f172.google.com [209.85.160.172]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 804CF43CD5 for ; Tue, 27 Oct 2015 23:45:14 +0000 (UTC) Received: by ykdr3 with SMTP id r3so236229617ykd.1 for ; Tue, 27 Oct 2015 16:45:08 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=databricks_com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc:content-type; bh=WLGggfu2kKCGuxQqJLSfIsqHhcdr7R3A5AK9tPKSYvI=; b=ylDnRLFG2RC5VNftRsxrvO9PeRwE0sod4ILhpYzxvZR6yVhjfJ6cqlYTbLrfw1RgLS 4h69Dajc9w/adw1Lg8c2DLcd+toK4/4ypwzPplZNJHiELaXUr3K7KyREisJn8OEK15LB VCDTdT4h892I9OHi1+eHbgUvnUMCJPn78Ts+Xej79L5MeJ2I7vwpdk47MQuEsgkM4Yhy NNBmyHdoafUQQT+HhdCHMyICUngr7sh8Rw+OrhX6I6W9agvsGr0CTwIEveTl1jvWrXLM jFYM+3e9HW4sRSXGPLAm8sxJwj23Na/oBqwDKax3UfgPLa/yU5Q6DoHl6ZsM0NDLhzwD j2iQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc:content-type; bh=WLGggfu2kKCGuxQqJLSfIsqHhcdr7R3A5AK9tPKSYvI=; b=d9F1t3JFXEvkwE5EvhJ02xX7tt+H76Xwq7oMTkrE5sscqQ4M4tNED/1vLwgeyUnks+ Q62sgsmKvMktEHuaEEdoKRs3ZT2lfGfMc/nvHUSXM8xhvd0M4dNBMBLkircag4PuAVKg ZMXdtFBAiwQnZpEMulFsQL2XZtrNpkhAA6Va3To47mwqbscUe4YxcqfZAwuAQwOm2yXq gucptj4ajtp/3XWBHvdMZEHewo7aCLIRsDkApUSM04UQOGyCelLIcamovKK1Zi/oaGYq CteC5YZ32mBt/e47FigQ5ZPrD+4g/Bt5UlyxjfD5FrRLFrODRB3bx6aKqN/O4Q1UOsvS kBeA== X-Gm-Message-State: ALoCoQkRmqkPl49FZJWB6SNtL5QrwcrtYCIhbeWZ3Vink/8aDoJ0fFeh0LIzTrqVKo98M13IW2N1 X-Received: by 10.129.78.8 with SMTP id c8mr32442941ywb.344.1445989508045; Tue, 27 Oct 2015 16:45:08 -0700 (PDT) MIME-Version: 1.0 Received: by 10.129.87.82 with HTTP; Tue, 27 Oct 2015 16:44:38 -0700 (PDT) In-Reply-To: References: From: Tathagata Das Date: Tue, 27 Oct 2015 16:44:38 -0700 Message-ID: Subject: Re: expected Kinesis checkpoint behavior when driver restarts To: Hster Geguri Cc: user Content-Type: multipart/alternative; boundary=94eb2c05411efa0c5005231eaa64 --94eb2c05411efa0c5005231eaa64 Content-Type: text/plain; charset=UTF-8 Your observation is correct! The current implementation of checkpointing to DynamoDB is tied to the presence of new data from Kinesis (I think that emulates the KCL behavior), if there is no data for while, the checkpointing does not occur. That explains your observation. I have filed a JIRA to fix this - https://issues.apache.org/jira/browse/SPARK-11359 Should be available in 1.6 On Tue, Oct 27, 2015 at 4:09 PM, Hster Geguri wrote: > We are using Kinesis with Spark Streaming 1.5 on a YARN cluster. When we > enable checkpointing in Spark, where in the Kinesis stream should a > restarted driver continue? I run a simple experiment as follows: > > 1. In the first driver run, Spark driver processes 1 million records > starting from InitialPositionInStream.TRIM_HORIZON in 5 second batch > intervals with 10 seconds set as the Kinesis receiver checkpoint interval. > (This interval has been purposely set low to see the impact of where a > restarted driver would pick up. ) > > 2. We stop pushing events to Kinesis stream until the driver keeps pulling > zero events for a few minutes. Then first driver killed manually through > "yarn application --kill". > > 3. The driver is relaunched a second time and the logs show it > successfully restored from the DFS checkpoint directory. Because the first > driver had completely processed all the entries in the stream, I would > expect the second driver to pick up at the end of the stream or at minimum > the last 10 second interval window. However the second driver launch (and > subsequent driver launches) re-processes about 30 seconds worth of > (100,000) events and appears not to be related to the Kinesis checkpoint > interval. > > Also with a Kinesis driver, does it make sense you would use Write Ahead > Logs and incur the cost of writing to DFS when you could remember the > previous to last checkpoint and just reprocess/refetch directly from the > stream? > > Any input is highly appreciated. > > Thanks, > Heji > --94eb2c05411efa0c5005231eaa64 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Your observation is correct! The current implementation of= checkpointing to DynamoDB is tied to the presence of new data from Kinesis= (I think that emulates the KCL behavior), if there is no data for while, t= he checkpointing does not occur. That explains your observation.=C2=A0
=
I have filed a JIRA to fix this -=C2=A0https://issues.apache.org/jira/bro= wse/SPARK-11359
Should be available in 1.6


On Tue, O= ct 27, 2015 at 4:09 PM, Hster Geguri <hster.investigates@gmail.= com> wrote:
We are using Kinesis with Spark Streaming 1.5 on a YARN clust= er.=C2=A0 When we enable checkpointing in=C2=A0Spark, where in the Kinesis = stream should a restarted driver=C2=A0continue? I run a simple experiment a= s follows:

1. In the first drive= r run, Spark driver processes 1 million records starting from InitialPositi= onInStream.TRIM_HORIZON=C2=A0=C2=A0in 5 second batch intervals with 10 seco= nds set as the Kinesis receiver checkpoint interval. (This interval has bee= n purposely set low to see the impact of where a restarted driver would pic= k up. )

2. We stop pushing events to= Kinesis stream until the driver keeps pulling zero events for a few minute= s. Then first driver killed manually through "yarn application --kill&= quot;.

3. The driver is relaunched a= second time and the logs show it successfully restored from the DFS checkp= oint directory. Because the first driver had completely processed all the e= ntries in the stream, I would expect the second driver to pick up at the en= d of the stream or at minimum the last 10 second interval window. However t= he second driver launch (and subsequent driver launches) =C2=A0re-processes= about 30 seconds worth of (100,000) events and appears not to be related t= o the Kinesis checkpoint interval.

A= lso with a Kinesis driver, does it make sense you would use Write Ahead Log= s and incur the cost of writing to DFS when you could remember the previous= to last checkpoint and just reprocess/refetch directly from the stream?

Any input is highly appreciated.
=

Thanks,
Heji

--94eb2c05411efa0c5005231eaa64--