Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7D31B17B0A for ; Mon, 31 Aug 2015 11:00:24 +0000 (UTC) Received: (qmail 72636 invoked by uid 500); 31 Aug 2015 11:00:20 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 72555 invoked by uid 500); 31 Aug 2015 11:00:19 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 72545 invoked by uid 99); 31 Aug 2015 11:00:19 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 31 Aug 2015 11:00:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 815B9EFF7C for ; Mon, 31 Aug 2015 11:00:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.151 X-Spam-Level: *** X-Spam-Status: No, score=3.151 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id xdI1s4OEcAnJ for ; Mon, 31 Aug 2015 11:00:04 +0000 (UTC) Received: from mail-wi0-f179.google.com (mail-wi0-f179.google.com [209.85.212.179]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 7C3FD2578C for ; Mon, 31 Aug 2015 11:00:03 +0000 (UTC) Received: by wicpl12 with SMTP id pl12so29321154wic.0 for ; Mon, 31 Aug 2015 04:00:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=LmNvuz8yS1truNepWUb7ry6ng5YeaxhfhHbKMKuUQXo=; b=nutJVXEN0p5zHUmdaoNED6eATy9/5nBQwrR0gH3frEE1ueR4Hn5Tcnu2a4Pf+6SmGd 3f0qDCPhZbA/BNTFtF+ajSbH+3jJg2DQXFMRksgd0UUEmVgcIv2TyXvhwvXNbj+bEw+G mHExxfrQc0+ekZFY162mk8MjyBDq5tSHfeKxVrr2HHAx5poLh3nkcjeDcyvIGN01bVNf mquKiWEeeR1aCkUKs2SUNWrxN6SGtwfX5bT7/e5KUUTm4HsUNHtDqqQV2dASRznhX4K1 7WAkzKSd+SSXOzDBTfJGBa7YZdYF+cwWYGPiMJ/hID8w3oFDN1/rfmrUu4pYFyhvHbjn yFLA== MIME-Version: 1.0 X-Received: by 10.194.190.110 with SMTP id gp14mr29098715wjc.76.1441018803136; Mon, 31 Aug 2015 04:00:03 -0700 (PDT) Received: by 10.28.130.212 with HTTP; Mon, 31 Aug 2015 04:00:03 -0700 (PDT) In-Reply-To: References: Date: Mon, 31 Aug 2015 16:30:03 +0530 Message-ID: Subject: Re: spark streaming 1.3 kafka topic error From: Shushant Arora To: Cody Koeninger Cc: user Content-Type: multipart/alternative; boundary=047d7bb04fbee02cbc051e9955e6 --047d7bb04fbee02cbc051e9955e6 Content-Type: text/plain; charset=UTF-8 Say if my cluster takes long time for rebalance for some reason intermittently . So to handle that Can I have infinite retries instead of killing the app? What should be the value of retries (-1) will work or something else ? On Thu, Aug 27, 2015 at 6:46 PM, Cody Koeninger wrote: > Your kafka broker died or you otherwise had a rebalance. > > Normally spark retries take care of that. > > Is there something going on with your kafka installation, that rebalance > is taking especially long? > > Yes, increasing backoff / max number of retries will "help", but it's > better to figure out what's going on with kafka. > > On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora > wrote: > >> Hi >> >> My streaming application gets killed with below error >> >> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: >> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> org.apache.spark.SparkException: Couldn't find leader offsets for >> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], >> [testtopic,193])) >> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for >> time 1440626120000 ms >> org.apache.spark.SparkException: >> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >> org.apache.spark.SparkException: Couldn't find leader offsets for >> Set([testtopic,115])) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> >> >> >> Kafka params in job logs printed are : >> value.serializer = class >> org.apache.kafka.common.serialization.StringSerializer >> key.serializer = class >> org.apache.kafka.common.serialization.StringSerializer >> block.on.buffer.full = true >> retry.backoff.ms = 100 >> buffer.memory = 1048576 >> batch.size = 16384 >> metrics.sample.window.ms = 30000 >> metadata.max.age.ms = 300000 >> receive.buffer.bytes = 32768 >> timeout.ms = 30000 >> max.in.flight.requests.per.connection = 5 >> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] >> metric.reporters = [] >> client.id = >> compression.type = none >> retries = 0 >> max.request.size = 1048576 >> send.buffer.bytes = 131072 >> acks = all >> reconnect.backoff.ms = 10 >> linger.ms = 0 >> metrics.num.samples = 2 >> metadata.fetch.timeout.ms = 60000 >> >> >> Is it kafka broker getting down and job is getting killed ? Whats the >> best way to handle it ? >> Increasing retries and backoff time wil help and to what values those >> should be set to never have streaming application failure - rather it keep >> on retrying after few seconds and send a event so that my custom code can >> send notification of kafka broker down if its because of that. >> >> >> Thanks >> >> > --047d7bb04fbee02cbc051e9955e6 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Say if my cluster takes long time for rebalance for some r= eason intermittently . So to handle that Can I have infinite retries instea= d of killing the app? What should be the value of retries (-1) will work or= something else ?

On Thu, Aug 27, 2015 at 6:46 PM, Cody Koeninger <= ;cody@koeninger.org= > wrote:
Y= our kafka broker died or you otherwise had a rebalance.

= Normally spark retries take care of that.

Is there= something going on with your kafka installation, that rebalance is taking = especially long?

Yes, increasing backoff / max num= ber of retries will "help", but it's better to figure out wha= t's going on with kafka.

On Wed, Aug = 26, 2015 at 9:07 PM, Shushant Arora <shushantarora09@gmail.com= > wrote:
Hi
My streaming application gets killed with below error

5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStr= eam: ArrayBuffer(kafka.common.NotLeaderForPartitionException, kafka.common.= NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException= , kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPar= titionException, org.apache.spark.SparkException: Couldn't find leader = offsets for Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopi= c,100], [testtopic,193]))
15/08/26 21:55:20 ERROR scheduler.JobSc= heduler: Error generating jobs for time 1440626120000 ms
org.apac= he.spark.SparkException: ArrayBuffer(kafka.common.NotLeaderForPartitionExce= ption, org.apache.spark.SparkException: Couldn't find leader offsets fo= r Set([testtopic,115]))
at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeade= rOffsets(DirectKafkaInputDStream.scala:94)
at org.apache.spark.streaming.kafka.DirectKafkaInput= DStream.compute(DirectKafkaInputDStream.scala:116)
at org.apache.spark.streaming.dstream.DStrea= m$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dst= ream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicV= ariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anon= fun$getOrCompute$1.apply(DStream.scala:299)
at=C2=A0



Kafka params in job logs printed are :
=C2=A0value.serializer =3D class org.apache.kafka.common.serialization.St= ringSerializer
=C2=A0 =C2=A0 =C2=A0 =C2=A0 key.serializer =3D cla= ss org.apache.kafka.common.serialization.StringSerializer
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 block.on.buffer.full =3D true
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 retry.b= ackoff.ms =3D 100
=C2=A0 =C2=A0 =C2=A0 =C2=A0 buffer.memory = =3D 1048576
=C2=A0 =C2=A0 =C2=A0 =C2=A0 batch.size =3D 16384
=C2=A0 =C2=A0 =C2=A0 =C2=A0 metrics.sample.window.ms =3D 30000
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 metadata.max.age.ms =3D 300000
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 receive.buffer.bytes =3D 32768
=C2=A0 =C2=A0 =C2=A0 =C2=A0= timeout.ms =3D 30000
=C2=A0 =C2=A0 =C2=A0 =C2=A0 max.in.flight.requests.per.connection = =3D 5
=C2=A0 =C2=A0 =C2=A0 =C2=A0 bootstrap.servers =3D [broker1:= 9092, broker2:9092, broker3:9092]
=C2=A0 =C2=A0 =C2=A0 =C2=A0 met= ric.reporters =3D []
=C2=A0 =C2=A0 =C2=A0 =C2=A0 client.id =3D
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 compression.type =3D none
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 retries =3D 0
=C2=A0 =C2=A0 =C2=A0 =C2=A0 max.request.size = =3D 1048576
=C2=A0 =C2=A0 =C2=A0 =C2=A0 send.buffer.bytes =3D 131= 072
=C2=A0 =C2=A0 =C2=A0 =C2=A0 acks =3D all
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 reconnect.backoff.ms =3D 10
=C2=A0 =C2=A0 =C2=A0 =C2=A0 linger.ms =3D 0
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 metrics.num.samples =3D 2
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 metadata.fetch.timeout.ms =3D 60000


Is it kafka broker getting down and job is getting killed = ? Whats the best way to handle it ?
Increasing retries and backof= f time =C2=A0wil help and to what values those should be set to never have = streaming application failure - rather it keep on retrying after few second= s and send a event so that my custom code can send notification of kafka br= oker down if its because of that.


T= hanks



--047d7bb04fbee02cbc051e9955e6--