spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steve Loughran (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22240) S3 CSV number of partitions incorrectly computed
Date Tue, 24 Oct 2017 17:33:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16217325#comment-16217325
] 

Steve Loughran commented on SPARK-22240:
----------------------------------------

I'm doing some testing with master & reading files off S3A, with s3a logging file IO in
close() statements, and I'm seeing a full load of the file in the executor, even when schema
inference is turned off

{code}
2017-10-24 18:03:20,085 [ScalaTest-main-running-S3ACommitBulkDataSuite] INFO  codegen.CodeGenerator
(Logging.scala:logInfo(54)) - Code generated in 149.680004 ms
2017-10-24 18:03:20,423 [Executor task launch worker for task 0] INFO  codegen.CodeGenerator
(Logging.scala:logInfo(54)) - Code generated in 9.406645 ms
2017-10-24 18:03:20,434 [Executor task launch worker for task 0] DEBUG s3a.S3AFileSystem (S3AFileSystem.java:open(680))
- Opening 's3a://landsat-pds/scene_list.gz' for reading; input policy = sequential
2017-10-24 18:03:20,435 [Executor task launch worker for task 0] DEBUG s3a.S3AFileSystem (S3AFileSystem.java:innerGetFileStatus(2065))
- Getting path status for s3a://landsat-pds/scene_list.gz  (scene_list.gz)
2017-10-24 18:03:21,175 [Executor task launch worker for task 0] DEBUG s3a.S3AFileSystem (S3AFileSystem.java:s3GetFileStatus(2133))
- Found exact file: normal file
2017-10-24 18:03:21,184 [Executor task launch worker for task 0] INFO  compress.CodecPool
(CodecPool.java:getDecompressor(184)) - Got brand-new decompressor [.gz]
2017-10-24 18:03:21,188 [Executor task launch worker for task 0] DEBUG s3a.S3AInputStream
(S3AInputStream.java:reopen(174)) - reopen(s3a://landsat-pds/scene_list.gz) for read from
new offset range[0-45603307], length=65536, streamPosition=0, nextReadPosition=0, policy=sequential
2017-10-24 18:03:53,447 [Executor task launch worker for task 0] DEBUG s3a.S3AInputStream
(S3AInputStream.java:closeStream(490)) - Closing stream close() operation: soft
2017-10-24 18:03:53,447 [Executor task launch worker for task 0] DEBUG s3a.S3AInputStream
(S3AInputStream.java:closeStream(503)) - Drained stream of 0 bytes
2017-10-24 18:03:53,448 [Executor task launch worker for task 0] DEBUG s3a.S3AInputStream
(S3AInputStream.java:closeStream(524)) - Stream s3a://landsat-pds/scene_list.gz closed: close()
operation; remaining=0 streamPos=45603307, nextReadPos=45603307, request range 0-45603307
length=45603307
2017-10-24 18:03:53,448 [Executor task launch worker for task 0] DEBUG s3a.S3AInputStream
(S3AInputStream.java:close(463)) - Statistics of stream scene_list.gz
StreamStatistics{OpenOperations=1, CloseOperations=1, Closed=1, Aborted=0, SeekOperations=0,
ReadExceptions=0, ForwardSeekOperations=0, BackwardSeekOperations=0, BytesSkippedOnSeek=0,
BytesBackwardsOnSeek=0, BytesRead=45603307, BytesRead excluding skipped=45603307, ReadOperations=5240,
ReadFullyOperations=0, ReadsIncomplete=5240, BytesReadInClose=0, BytesDiscardedInAbort=0,
InputPolicy=1, InputPolicySetCount=1}
{code}

that is, handing in a csv.gz triggers a full read., which is "observably slow" to read 400MB
of data from across the planet. Load options were: 
{code}
  val csvOptions = Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "false",
    "mode" -> "DROPMALFORMED",
  )
{code}

I'm going to set a schema to make this go away (it'll become a dataframe one transform later
anyway), but I don't believe this used to occur. I'll try building against other spark versions
to see.


> S3 CSV number of partitions incorrectly computed
> ------------------------------------------------
>
>                 Key: SPARK-22240
>                 URL: https://issues.apache.org/jira/browse/SPARK-22240
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>         Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0
>            Reporter: Arthur Baudry
>
> Reading CSV out of S3 using S3A protocol does not compute the number of partitions correctly
in Spark 2.2.0.
> With Spark 2.2.0 I get only partition when loading a 14GB file
> {code:java}
> scala> val input = spark.read.format("csv").option("header", "true").option("delimiter",
"|").option("multiLine", "true").load("s3a://<s3_path>")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: string ...
36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 1
> {code}
> While in Spark 2.0.2 I had:
> {code:java}
> scala> val input = spark.read.format("csv").option("header", "true").option("delimiter",
"|").option("multiLine", "true").load("s3a://<s3_path>")
> input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: string ...
36 more fields]
> scala> input.rdd.getNumPartitions
> res2: Int = 115
> {code}
> This introduces obvious performance issues in Spark 2.2.0. Maybe there is a property
that should be set to have the number of partitions computed correctly.
> I'm aware that the .option("multiline","true") is not supported in Spark 2.0.2, it's
not relevant here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message