spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lantao Jin (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-18227) Parquet file stream sink create a hidden directory "_spark_metadata" cause the DataFrame read failed
Date Wed, 02 Nov 2016 13:31:58 GMT

     [ https://issues.apache.org/jira/browse/SPARK-18227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Lantao Jin updated SPARK-18227:
-------------------------------
    Description: 
When we set an out directory as a streaming sink with parquet format in structured streaming,
 as the streaming job running, all output parquet files will be written to this out directory.
However, it also creates a hidden directory called "_spark_metadata" in the out directory.
If we load the parquet files from the out directory by "load", it will throw RuntimeException
and task failed.
{code:java}
val stream = modifiedData.writeStream.format("parquet")
.option("checkpointLocation", "/path/ck/")
.start("/path/out/")
val df1 = spark.read.format("parquet").load("/path/out/*")
{code}

{panel}
16/11/02 03:49:40 WARN TaskSetManager: Lost task 1.0 in stage 110.0 (TID 3131, cupid044.stratus.phx.ebay.com):
java.lang.Ru
ntimeException: hdfs:///path/out/_spark_metadata/0 is not a Parquet file (too s
mall)   
        at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412)
        at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
        at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRec
ordReaderBase.java:107)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRec
ordReader.java:109)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
mat.scala:367)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
mat.scala:341)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
Sour
ce)     
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
{panel}

That's because the ParquetFileReader reads the metadata file as a parquet format. 

I thought the smooth way to fix it is moving the metadata directory to another path, but from
the code DataSource.scala, it has less path information except out directory path to store
into. So maybe skipping hidden files and paths could be a better way. But from the stack trace
above, it failed in initialize() in SpecificParquetRecordReaderBase. It means  that metadata
files in hidden directory have been traversed in upper invocation(FileScanRDD). But in there,
no format info can be known to skip a hidden directory(or over authority).

So, what is the best way to fix it?

  was:
When we set an out directory as a streaming sink with parquet format in structured streaming,
 as the streaming job running, all output parquet files will be written to this out directory.
However, it also creates a hidden directory called "_spark_metadata" in the out directory.
If we load the parquet files from the out directory by "load", it will throw RuntimeException
and task failed.
{code:java}
val stream = modifiedData.writeStream.format("parquet")
.option("checkpointLocation", "/path/ck/")
.start("/path/out/")
val df1 = spark.read.format("parquet").load("path/out/*")
{code}

{panel}
16/11/02 03:49:40 WARN TaskSetManager: Lost task 1.0 in stage 110.0 (TID 3131, cupid044.stratus.phx.ebay.com):
java.lang.Ru
ntimeException: hdfs:///path/out/_spark_metadata/0 is not a Parquet file (too s
mall)   
        at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412)
        at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
        at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRec
ordReaderBase.java:107)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRec
ordReader.java:109)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
mat.scala:367)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
mat.scala:341)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
Sour
ce)     
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
{panel}

That's because the ParquetFileReader reads the metadata file as a parquet format. 

I thought the smooth way to fix it is moving the metadata directory to another path, but from
the code DataSource.scala, it has less path information except out directory path to store
into. So maybe skipping hidden files and paths could be a better way. But from the stack trace
above, it failed in initialize() in SpecificParquetRecordReaderBase. It means  that metadata
files in hidden directory have been traversed in upper invocation(FileScanRDD). But in there,
no format info can be known to skip a hidden directory(or over authority).

So, what is the best way to fix it?


> Parquet file stream sink create a hidden directory "_spark_metadata" cause the DataFrame
read failed
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18227
>                 URL: https://issues.apache.org/jira/browse/SPARK-18227
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.0.1
>            Reporter: Lantao Jin
>
> When we set an out directory as a streaming sink with parquet format in structured streaming,
 as the streaming job running, all output parquet files will be written to this out directory.
However, it also creates a hidden directory called "_spark_metadata" in the out directory.
If we load the parquet files from the out directory by "load", it will throw RuntimeException
and task failed.
> {code:java}
> val stream = modifiedData.writeStream.format("parquet")
> .option("checkpointLocation", "/path/ck/")
> .start("/path/out/")
> val df1 = spark.read.format("parquet").load("/path/out/*")
> {code}
> {panel}
> 16/11/02 03:49:40 WARN TaskSetManager: Lost task 1.0 in stage 110.0 (TID 3131, cupid044.stratus.phx.ebay.com):
java.lang.Ru
> ntimeException: hdfs:///path/out/_spark_metadata/0 is not a Parquet file (too s
> mall)   
>         at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412)
>         at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
>         at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRec
> ordReaderBase.java:107)
>         at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRec
> ordReader.java:109)
>         at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
> mat.scala:367)
>         at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
> mat.scala:341)
>         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
>         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
Sour
> ce)     
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
>         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> {panel}
> That's because the ParquetFileReader reads the metadata file as a parquet format. 
> I thought the smooth way to fix it is moving the metadata directory to another path,
but from the code DataSource.scala, it has less path information except out directory path
to store into. So maybe skipping hidden files and paths could be a better way. But from the
stack trace above, it failed in initialize() in SpecificParquetRecordReaderBase. It means
 that metadata files in hidden directory have been traversed in upper invocation(FileScanRDD).
But in there, no format info can be known to skip a hidden directory(or over authority).
> So, what is the best way to fix it?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message