spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Florencio (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-23739) Spark structured streaming long running problem
Date Wed, 21 Mar 2018 08:39:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-23739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407611#comment-16407611
] 

Florencio commented on SPARK-23739:
-----------------------------------

Thanks for your response. I am getting this error after some time, it could be hours or days.
The spark-submit command is:

_export SPARK_MAJOR_VERSION=2_
_spark-submit --master yarn --deploy-mode client \_
_--executor-cores 6 --num-executors 4 --driver-memory 2G --executor-memory 2G \_
_--jars /usr/hdp/current/hbase-client/lib/hbase-client-1.1.2.2.6.1.0-129.jar,/usr/hdp/current/hbase-client/lib/hbase-common-1.1.2.2.6.1.0-129.jar,/usr/hdp/current/hbase-client/lib/hbase-server-1.1.2.2.6.1.0-129.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol-1.1.2.2.6.1.0-129.jar
\_
_--class classname jarname \_
_-topic=topicname\_
_-tablehbase= namehbasetable\_
_-checkpointLocation=/tmp/PassingStructuredStreamingEntrate \_

 

Some useful information could be the configuration of the structured stream:

 _val ds1 = spark.readStream_
 _.format("kafka")_
 _.option("kafka.bootstrap.servers", "serversnames")_
 _.option("startingOffsets","latest")_
 _.option("failOnDataLoss","false")_ 
 _.option("fetchOffset.numRetries",5)_
 _.option("subscribe", topic)_
 _.load()_

 

 _val writer = new HbaseSink(tablehbase)_
 _val query = results.writeStream.foreach(writer)_
 _.outputMode("complete")_
 _.option("checkpointLocation", checkpointLocation)_
_.start()_

 

 

Thanks.

 

 

> Spark structured streaming long running problem
> -----------------------------------------------
>
>                 Key: SPARK-23739
>                 URL: https://issues.apache.org/jira/browse/SPARK-23739
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Florencio
>            Priority: Critical
>              Labels: spark, streaming, structured
>
> I had a problem with long running spark structured streaming in spark 2.1. Caused by:
java.lang.ClassNotFoundException: org.apache.kafka.common.requests.LeaveGroupResponse.
> The detailed error is the following:
> 18/03/16 16:10:57 INFO StreamExecution: Committed offsets for batch 2110. Metadata OffsetSeqMetadata(0,1521216656590)
> 18/03/16 16:10:57 INFO KafkaSource: GetBatch called with start = Some(\{"TopicName":{"2":5520197,"1":5521045,"3":5522054,"0":5527915}}),
end = \{"TopicName":{"2":5522730,"1":5523577,"3":5524586,"0":5530441}}
> 18/03/16 16:10:57 INFO KafkaSource: Partitions added: Map()
> 18/03/16 16:10:57 ERROR StreamExecution: Query [id = a233b9ff-cc39-44d3-b953-a255986c04bf,
runId = 8520e3c0-2455-4ac1-9021-8518fb58b3f8] terminated with error
> java.util.zip.ZipException: invalid code lengths set
>  at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:164)
>  at java.io.FilterInputStream.read(FilterInputStream.java:133)
>  at java.io.FilterInputStream.read(FilterInputStream.java:107)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:354)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
>  at org.apache.spark.util.Utils$.copyStream(Utils.scala:362)
>  at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:45)
>  at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:83)
>  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:173)
>  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>  at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
>  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
>  at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287)
>  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:503)
>  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:499)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 18/03/16 16:10:57 ERROR ClientUtils: Failed to close coordinator
> java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/LeaveGroupResponse
>  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendLeaveGroupRequest(AbstractCoordinator.java:575)
>  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:566)
>  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:555)
>  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:377)
>  at org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:66)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1383)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1364)
>  at org.apache.spark.sql.kafka010.KafkaSource.stop(KafkaSource.scala:311)
>  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$stopSources$1.apply(StreamExecution.scala:574)
>  at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$stopSources$1.apply(StreamExecution.scala:572)
>  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at org.apache.spark.sql.execution.streaming.StreamExecution.stopSources(StreamExecution.scala:572)
>  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:325)
>  at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
> Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.requests.LeaveGroupResponse
>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>  ... 15 more
> 18/03/16 16:10:57 WARN StreamExecution: Failed to stop streaming source: KafkaSource[Subscribe[TPusciteStazMinuto]].
Resources may have leaked.
> org.apache.kafka.common.KafkaException: Failed to close kafka consumer
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message