Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3BDA918ED3 for ; Wed, 14 Oct 2015 18:06:27 +0000 (UTC) Received: (qmail 57077 invoked by uid 500); 14 Oct 2015 18:06:07 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 56981 invoked by uid 500); 14 Oct 2015 18:06:07 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 56969 invoked by uid 99); 14 Oct 2015 18:06:07 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Oct 2015 18:06:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B11351A230D for ; Wed, 14 Oct 2015 18:06:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.13 X-Spam-Level: *** X-Spam-Status: No, score=3.13 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id uHnD6L2rMr0Q for ; Wed, 14 Oct 2015 18:05:58 +0000 (UTC) Received: from mail-lb0-f182.google.com (mail-lb0-f182.google.com [209.85.217.182]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 104FA2054C for ; Wed, 14 Oct 2015 18:05:58 +0000 (UTC) Received: by lbwr8 with SMTP id r8so52109091lbw.2 for ; Wed, 14 Oct 2015 11:05:57 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=VpLOtkKsEA4wMuWZTIZKS2N3LI0WsqD4q4dtf2FUwME=; b=H1U8lyi+vt1MbEh7iko6OF1SuRSsTob9Ve3Sm79Ry5/gAs7R375pNuqlhYY12mN9ZT qvgFRcpMTGsumZsAFHPYOD5So5TlZBmbkz8Qq95+xwOZ1GkUrTdlgBLI6H4VPqt3abgl Onu08YWC0V6U1hfzSwZIDBN7XuesORc0AOKYYBpfJxxoGYN+kPZQPE8d1nvDlh7wdH4u UKqwCXgQRJHF9yj0H4KZjfBJnbqVQB5k2vZ/vvrm6RubabQbThpkQm2r/8CEjUiaDoUP ChDTkur1vhj0sy7R9qXZ3hhf/TfoKb8kpMKc4rSSj5lYIH9u407dLW2laJcRiDS0hQ2N /TGw== MIME-Version: 1.0 X-Received: by 10.112.205.231 with SMTP id lj7mr2352704lbc.57.1444845957497; Wed, 14 Oct 2015 11:05:57 -0700 (PDT) Received: by 10.25.40.84 with HTTP; Wed, 14 Oct 2015 11:05:57 -0700 (PDT) In-Reply-To: References: Date: Wed, 14 Oct 2015 11:05:57 -0700 Message-ID: Subject: Re: Spark 1.5 java.net.ConnectException: Connection refused From: Spark Newbie To: Tathagata Das Cc: user Content-Type: multipart/alternative; boundary=001a11c3c3480d4de70522146a0e --001a11c3c3480d4de70522146a0e Content-Type: text/plain; charset=UTF-8 Is it slowing things down or blocking progress. >> I didn't see slowing of processing, but I do see jobs aborted consecutively for a period of 18 batches (5 minute batch intervals). So I am worried about what happened to the records that these jobs were processing. Also, one more thing to mention is that the StreamingListenerBatchCompleted.numRecords information shows all received records as processed even if the batch/job failed. The processing time as well shows as the same time it takes for a successful batch. It seems like it is the numRecords which was the input to the batch regardless of whether they were successfully processed or not. On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie wrote: > I ran 2 different spark 1.5 clusters that have been running for more than > a day now. I do see jobs getting aborted due to task retry's maxing out > (default 4) due to ConnectionException. It seems like the executors die and > get restarted and I was unable to find the root cause (same app code and > conf used on spark 1.4.1 I don't see ConnectionException). > > Another question related to this, what happens to the kinesis records > received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I > am using) does the job gets resubmitted with the same received records? Or > does the kinesis-asl library get those records again based on sequence > numbers it tracks? It would good for me to understand the story around > lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when > jobs are aborted. Any pointers or quick explanation would be very helpful. > > > On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das > wrote: > >> Is this happening too often? Is it slowing things down or blocking >> progress. Failures once in a while is part of the norm, and the system >> should take care of itself. >> >> On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie >> wrote: >> >>> Hi Spark users, >>> >>> I'm seeing the below exception in my spark streaming application. It >>> happens in the first stage where the kinesis receivers receive records and >>> perform a flatMap operation on the unioned Dstream. A coalesce step also >>> happens as a part of that stage for optimizing the performance. >>> >>> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I >>> look at the executor logs I do not see any exceptions indicating the root >>> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did >>> that service go down. >>> >>> Any help debugging this problem will be helpful. >>> >>> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while >>> beginning fetch of 1 outstanding blocks >>> java.io.IOException: Failed to connect to >>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 >>> at >>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) >>> at >>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) >>> at >>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) >>> at >>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) >>> at >>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) >>> at >>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) >>> at >>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593) >>> at >>> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579) >>> at >>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:623) >>> at >>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >>> at >>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139) >>> at >>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135) >>> at >>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at >>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >>> at >>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at >>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.net.ConnectException: Connection refused: >>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 >>> >>> Thanks, >>> Bharath >>> >>> >> > --001a11c3c3480d4de70522146a0e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Is it slowing things down or blocking progr= ess.
>> I didn't see slowing of processing, but I do see= jobs aborted consecutively for a period of 18 batches (5 minute batch inte= rvals). So I am worried about what happened to the records that these jobs = were processing.
Also, one more thing to mention is that the StreamingListenerBatchComplet= ed.numRecords information shows all received records as processed ev= en if the batch/job failed. The processing time as well shows as the same t= ime it takes for a successful batch.=C2=A0
It seems like it is th= e numRecords which was the input to the batch regardless of whether they we= re successfully processed or not.

<= div class=3D"gmail_quote">On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie <sparknewbie1234@gmail.com> wrote:
I ran 2 different spark 1.5 clust= ers that have been running for more than a day now. I do see jobs getting a= borted due to task retry's maxing out (default 4) due to ConnectionExce= ption. It seems like the executors die and get restarted and I was unable t= o find the root cause (same app code and conf used on spark 1.4.1 I don'= ;t see ConnectionException).

Another question related to this= , what happens to the kinesis records received when Job gets aborted? In Sp= ark-1.5 and kinesis-asl-1.5 (which I am using) does the job gets resubmitte= d with the same received records? Or does the kinesis-asl library get those= records again based on sequence numbers it tracks? It would good for me to= understand the story around lossless processing of kinesis records in Spar= k-1.5 + kinesis-asl-1.5 when jobs are aborted. Any pointers or quick explan= ation would be very helpful.


On T= ue, Oct 13, 2015 at 4:04 PM, Tathagata Das <tdas@databricks.com><= /span> wrote:
Is this ha= ppening too often? Is it slowing things down or blocking progress. Failures= once in a while is part of the norm, and the system should take care of it= self.

On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie <sparknewbie1234= @gmail.com> wrote:
Hi Spark users,

I'm seeing the bel= ow exception in my spark streaming application. It happens in the first sta= ge where the kinesis receivers receive records and perform a flatMap operat= ion on the unioned Dstream. A coalesce step also happens as a part of that = stage for optimizing the performance.

This is happening on my= spark 1.5 instance using kinesis-asl-1.5. When I look at the executor logs= I do not see any exceptions indicating the root cause of why there is no c= onnectivity on xxx.xx.xx.xxx:36684 or when did that service go down.
Any help debugging this problem will be helpful.
=

15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception wh= ile beginning fetch of 1 outstanding blocks
java.io.IOException: Failed = to connect to ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.network.client.Tra= nsportClientFactory.createClient(TransportClientFactory.java:193)
=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.network.client.Tra= nsportClientFactory.createClient(TransportClientFactory.java:156)
=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.network.netty.Nett= yBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scal= a:88)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.net= work.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.= java:140)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark= .network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)<= br>=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.network.n= etty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:= 97)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.netwo= rk.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.storage.Bloc= kManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.storage.BlockManager$= $anonfun$doGetRemote$2.apply(BlockManager.scala:593)
=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 at scala.collection.mutable.ResizableArray$class.f= oreach(ResizableArray.scala:59)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.storage.Bl= ockManager.doGetRemote(BlockManager.scala:593)
=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 at org.apache.spark.storage.BlockManager.getRemote(Block= Manager.scala:579)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apa= che.spark.storage.BlockManager.get(BlockManager.scala:623)
=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.CacheManager.getOrComput= e(CacheManager.scala:44)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at o= rg.apache.spark.rdd.RDD.iterator(RDD.scala:262)
=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.= apply(CoGroupedRDD.scala:139)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD= .scala:135)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.collecti= on.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scal= a:772)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at scala.collection.im= mutable.List.foreach(List.scala:318)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 at scala.collection.TraversableLike$WithFilter.foreach(Traversabl= eLike.scala:771)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apach= e.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.RDD.computeOrReadCheckp= oint(RDD.scala:297)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.ap= ache.spark.rdd.RDD.iterator(RDD.scala:264)
=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartiti= onsRDD.scala:38)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apach= e.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.RDD.iterator(RDD.scala:= 264)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.= MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.RDD.computeOrReadCheckpoin= t(RDD.scala:297)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apach= e.spark.CacheManager.getOrCompute(CacheManager.scala:69)
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.RDD.iterator(RDD.scala:= 262)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.= MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.RDD.computeOrReadCheckpoin= t(RDD.scala:297)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apach= e.spark.rdd.RDD.iterator(RDD.scala:264)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitions= RDD.scala:38)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.s= park.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.rdd.RDD.iterator(RDD.scala:264= )
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.schedul= er.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.scheduler.ShuffleMapTask.runTask(= ShuffleMapTask.scala:41)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at o= rg.apache.spark.scheduler.Task.run(Task.scala:88)
=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0 at org.apache.spark.executor.Executor$TaskRunner.run(= Executor.scala:214)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at java.u= til.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at java.util.concurrent.Thread= PoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 at java.lang.Thread.run(Thread.java:745)
Caused= by: java.net.ConnectException: Connection refused: ip-xxx-xx-xx-xxx.ec2.in= ternal/xxx.xx.xx.xxx:36684

Thanks,
Bharath<= br>




--001a11c3c3480d4de70522146a0e--