spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request #14987: [SPARK-17372][SQL][STREAMING] Avoid serialization...
Date Wed, 07 Sep 2016 00:27:17 GMT
GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/14987

    [SPARK-17372][SQL][STREAMING] Avoid serialization issues by using Arrays to save file
names in FileStreamSource

    ## What changes were proposed in this pull request?
    
    When we create a filestream on a directory that has partitioned subdirs (i.e. dir/x=y/),
then ListingFileCatalog.allFiles returns the files in the dir as Seq[String] which internally
is a Stream[String]. This is because of this [line|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93],
where a LinkedHashSet.values.toSeq returns Stream. Then when the [FileStreamSource|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79]
filters this Stream[String] to remove the seen files, it creates a new Stream[String], which
has a filter function that has a $outer reference to the FileStreamSource (in Scala 2.10).
Trying to serialize this Stream[String] causes NotSerializableException. This will happened
even if there is just one file in the dir.
    
    Its important to note that this behavior is different in Scala 2.11. There is no $outer
reference to FileStreamSource, so it does not throw NotSerializableException. However, with
a large sequence of files (tested with 10000 files), it throws StackOverflowError. This is
because how Stream class is implemented. Its basically like a linked list, and attempting
to serialize a long Stream requires *recursively* going through linked list, thus resulting
in StackOverflowError.
    
    The right solution is to convert the seq to an array before writing to the log. This PR
implements this fix in two ways.
    - Changing all uses for HDFSMetadataLog to ensure Array is used instead of Seq
    - Added a `require` in HDFSMetadataLog such that it is never used with type Seq
    
    ## How was this patch tested?
    
    Added unit test that test that ensures the file stream source can handle with 10000 files.
This tests fails in both Scala 2.10 and 2.11 with different failures as indicated above.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-17372

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14987.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14987
    
----
commit 5e7c127d7a79570f4d210f8eaf239bef73a54e24
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   2016-09-06T21:57:12Z

    SPARK-17372

commit 9bcbb087d2935657a30eb9bc6b52ea6fbed65edf
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   2016-09-07T00:22:59Z

    Improve unit test to run faster

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message