spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "SemanticBeeng (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-18649) sc.textFile(my_file).collect() raises socket.timeout on large files
Date Tue, 29 May 2018 12:52:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-18649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493498#comment-16493498
] 

SemanticBeeng edited comment on SPARK-18649 at 5/29/18 12:51 PM:
-----------------------------------------------------------------

Should this hard coded timeout value not be configurable?

[https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L400-L403]

 If indeed " by using {{toLocalIterator()}} ..it rather returns partitions." then this
can timeout easily at 15 secs (for example if using windowing and not setting partitions).

As per https://stackoverflow.com/questions/50108533/spark-rdd-error-while-sending-iterator.


was (Author: semanticbeeng):
Should this hard coded timeout value not be configurable?

[https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L400-L403]

 

> sc.textFile(my_file).collect() raises socket.timeout on large files
> -------------------------------------------------------------------
>
>                 Key: SPARK-18649
>                 URL: https://issues.apache.org/jira/browse/SPARK-18649
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>         Environment: PySpark version 1.6.2
>            Reporter: Erik Cederstrand
>            Priority: Major
>
> I'm trying to load a file into the driver with this code:
>     contents = sc.textFile('hdfs://path/to/big_file.csv').collect()
> Loading into the driver instead of creating a distributed RDD is intentional in this
case. The file is ca. 6GB, and I have adjusted driver memory accordingly to fit the local
data. After some time, my spark/submitted job crashes with the stack trace below.
> I have traced this to pyspark/rdd.py where the _load_from_socket() method creates a socket
with a hard-coded timeout of 3 seconds (this code is also present in HEAD although I'm on
PySpark 1.6.2). Raising this hard-coded value to e.g. 600 lets me read the entire file.
> Is there any reason that this value does not use e.g. the 'spark.network.timeout' setting
instead?
> Traceback (most recent call last):
>   File "my_textfile_test.py", line 119, in <module>
>     contents = sc.textFile('hdfs://path/to/file.csv').collect()
>   File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772,
in collect
>   File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142,
in _load_from_socket
>   File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
517, in load_stream
>   File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
511, in loads
>   File "/usr/lib/python2.7/socket.py", line 380, in read
>     data = self._sock.recv(left)
> socket.timeout: timed out
> 16/11/30 13:33:14 WARN Utils: Suppressing exception in finally: Broken pipe
> java.net.SocketException: Broken pipe
> 	at java.net.SocketOutputStream.socketWrite0(Native Method)
> 	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
> 	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> 	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> 	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
> 	at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
> 	Suppressed: java.net.SocketException: Broken pipe
> 		at java.net.SocketOutputStream.socketWrite0(Native Method)
> 		at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
> 		at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> 		at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 		at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> 		at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> 		at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> 		... 3 more
> 16/11/30 13:33:14 ERROR PythonRDD: Error while sending iterator
> java.net.SocketException: Connection reset
> 	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
> 	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> 	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> 	at java.io.DataOutputStream.write(DataOutputStream.java:107)
> 	at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
> 	at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
> 	at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
> 	at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
> 	at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:648)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
> 	at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
> 	Suppressed: java.net.SocketException: Broken pipe
> 		at java.net.SocketOutputStream.socketWrite0(Native Method)
> 		at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
> 		at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> 		at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 		at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> 		at java.io.DataOutputStream.flush(DataOutputStream.java:123)
> 		at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> 		at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650)
> 		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248)
> 		... 1 more
> 		Suppressed: java.net.SocketException: Broken pipe
> 			at java.net.SocketOutputStream.socketWrite0(Native Method)
> 			at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
> 			at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
> 			at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> 			at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> 			at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> 			at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> 			... 3 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message