spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Patrick Wendell (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-3954) Optimization to FileInputDStream
Date Thu, 16 Oct 2014 03:19:34 GMT

     [ https://issues.apache.org/jira/browse/SPARK-3954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Patrick Wendell updated SPARK-3954:
-----------------------------------
    Summary: Optimization to FileInputDStream  (was:  promote the speed of convert files to
RDDS)

> Optimization to FileInputDStream
> --------------------------------
>
>                 Key: SPARK-3954
>                 URL: https://issues.apache.org/jira/browse/SPARK-3954
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.0.0, 1.1.0
>            Reporter: 宿荣全
>
> about convert files to RDDS there are 3 loops with files sequence in spark source.
> loops files sequence:
> 1、files.map(...)
> 2、files.zip(fileRDDs)
> 3、files-size.foreach
> It's will very time consuming when lots of files.So I do the following correction:
> 3 loops with files sequence => only one loop
> spark source code:
>   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
>     val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
>     files.zip(fileRDDs).foreach { case (file, rdd) => {
>       if (rdd.partitions.size == 0) {
>         logError("File " + file + " has no data in it. Spark Streaming can only ingest
" +
>           "files that have been \"moved\" to the directory assigned to the file stream.
" +
>           "Refer to the streaming programming guide for more details.")
>       }
>     }}
>     new UnionRDD(context.sparkContext, fileRDDs)
>   }
> // -----------------------------------------------------------------------------------
> modified code:
>   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
>     val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K,
V, F](file)) yield {
>       if (rdd.partitions.size == 0) {
>         logError("File " + file + " has no data in it. Spark Streaming can only ingest
" +
>           "files that have been \"moved\" to the directory assigned to the file stream.
" +
>           "Refer to the streaming programming guide for more details.")
>       }
>       rdd
>     }
>     new UnionRDD(context.sparkContext, fileRDDs)
>   }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message