spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bipin Nag <bipin....@gmail.com>
Subject Re: BigDecimal problem in parquet file
Date Thu, 18 Jun 2015 10:44:27 GMT
I increased the memory limits
​,​
​now ​
it works fine.

​Thanks for the help.




On 18 June 2015 at 04:01, Cheng Lian <lian.cs.zju@gmail.com> wrote:

>  Does increasing executor memory fix the memory problem?
>
> How many columns does the schema contain? Parquet can be super memory
> consuming when writing wide tables.
>
> Cheng
>
>
> On 6/15/15 5:48 AM, Bipin Nag wrote:
>
>  HI Davies,
>
>  I have tried recent 1.4 and 1.5-snapshot to 1) open the parquet and save
> it again or 2 apply schema to rdd and save dataframe as parquet but now I
> get this error (right in the beginning):
>
> java.lang.OutOfMemoryError: Java heap space
>     at
> parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
>     at
> parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57)
>     at
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68)
>     at
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48)
>     at
> parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
>     at
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
>     at
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
>     at
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
>     at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
>     at
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
>     at
> parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
>     at
> parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
>     at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
>     at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>     at
> org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111)
>     at
> org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
>     at
> org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:386)
>     at
> org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:298)
>     at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:142)
>     at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>     at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>     at org.apache.spark.scheduler.Task.run(Task.scala:70)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
>
>
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>     at parquet.io.api.Binary.fromByteArray(Binary.java:159)
>     at
> parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:94)
>     at
> parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:67)
>     at parquet.column.Encoding$4.initDictionary(Encoding.java:131)
>     at
> parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:325)
>     at
> parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:63)
>     at
> parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:58)
>     at
> parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
>     at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
>     at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
>     at
> parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
>     at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
>     at
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
>     at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>     at
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>     at
> org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
>     at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>     at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)
>
>  I am not sure if this is related to your patch or some other bug. My
> error doesn't show up in newer versions, so this is the problem to fix now.
>
>  Thanks
>
> On 13 June 2015 at 06:31, Davies Liu <davies@databricks.com> wrote:
>
>> Maybe it's related to a bug, which is fixed by
>> https://github.com/apache/spark/pull/6558 recently.
>>
>> On Fri, Jun 12, 2015 at 5:38 AM, Bipin Nag <bipin.nag@gmail.com> wrote:
>> > Hi Cheng,
>> >
>> > Yes, some rows contain unit instead of decimal values. I believe some
>> rows
>> > from original source I had don't have any value i.e. it is null. And
>> that
>> > shows up as unit. How does the spark-sql or parquet handle null in
>> place of
>> > decimal values, assuming that field is nullable. I will have to change
>> it
>> > properly.
>> >
>> > Thanks for helping out.
>> > Bipin
>> >
>> > On 12 June 2015 at 14:57, Cheng Lian <lian.cs.zju@gmail.com> wrote:
>> >>
>> >> On 6/10/15 8:53 PM, Bipin Nag wrote:
>> >>
>> >> Hi Cheng,
>> >>
>> >> I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an
>> >> existing parquet file, then repartitioning and saving it. Doing this
>> gives
>> >> the error. The code for this doesn't look like causing  problem. I
>> have a
>> >> feeling the source - the existing parquet is the culprit.
>> >>
>> >> I created that parquet using a jdbcrdd (pulled from microsoft sql
>> server).
>> >> First I saved jdbcrdd as an objectfile on disk. Then loaded it again,
>> made a
>> >> dataframe from it using a schema then saved it as a parquet.
>> >>
>> >> Following is the code :
>> >> For saving jdbcrdd:
>> >>  name - fullqualifiedtablename
>> >>  pk - string for primarykey
>> >>  pklast - last id to pull
>> >>     val myRDD = new JdbcRDD( sc, () =>
>> >>         DriverManager.getConnection(url,username,password) ,
>> >>         "SELECT * FROM " + name + " WITH (NOLOCK) WHERE ? <= "+pk+" and
>> >> "+pk+" <= ?",
>> >>         1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
>> >>     myRDD.saveAsObjectFile("rawdata/"+name);
>> >>
>> >> For applying schema and saving the parquet:
>> >>     val myschema = schemamap(name)
>> >>     val myrdd =
>> >> sc.objectFile[Array[Object]]("/home/bipin/rawdata/"+name).map(x =>
>> >> org.apache.spark.sql.Row(x:_*))
>> >>
>> >> Have you tried to print out x here to check its contents? My guess is
>> that
>> >> x actually contains unit values. For example, the follow Spark shell
>> code
>> >> can reproduce a similar exception:
>> >>
>> >> import org.apache.spark.sql.types._
>> >> import org.apache.spark.sql.Row
>> >>
>> >> val schema = StructType(StructField("dec", DecimalType(10, 0)) :: Nil)
>> >> val rdd = sc.parallelize(1 to 10).map(_ => Array(())).map(arr =>
>> Row(arr:
>> >> _*))
>> >> val df = sqlContext.createDataFrame(rdd, schema)
>> >>
>> >> df.saveAsParquetFile("file:///tmp/foo")
>> >>
>> >>     val actualdata = sqlContext.createDataFrame(myrdd, myschema)
>> >>     actualdata.saveAsParquetFile("/home/bipin/stageddata/"+name)
>> >>
>> >> Schema structtype can be made manually, though I pull table's metadata
>> and
>> >> make one. It is a simple string translation (see sql docs and/or spark
>> >> datatypes)
>> >>
>> >> That is how I created the parquet file. Any help to solve the issue is
>> >> appreciated.
>> >> Thanks
>> >> Bipin
>> >>
>> >>
>> >> On 9 June 2015 at 20:44, Cheng Lian <lian.cs.zju@gmail.com> wrote:
>> >>>
>> >>> Would you please provide a snippet that reproduce this issue? What
>> >>> version of Spark were you using?
>> >>>
>> >>> Cheng
>> >>>
>> >>> On 6/9/15 8:18 PM, bipin wrote:
>> >>>>
>> >>>> Hi,
>> >>>> When I try to save my data frame as a parquet file I get the
>> following
>> >>>> error:
>> >>>>
>> >>>> java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be
cast
>> to
>> >>>> org.apache.spark.sql.types.Decimal
>> >>>>         at
>> >>>>
>> >>>>
>> org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
>> >>>>         at
>> >>>>
>> >>>>
>> org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
>> >>>>         at
>> >>>>
>> >>>>
>> org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
>> >>>>         at
>> >>>>
>> >>>>
>> org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
>> >>>>         at
>> >>>>
>> >>>>
>> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
>> >>>>         at
>> >>>> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
>> >>>>         at
>> >>>> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.sql.parquet.ParquetRelation2.org
>> $apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
>> >>>>         at
>> >>>>
>> >>>>
>> org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
>> >>>>         at
>> >>>>
>> >>>>
>> org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
>> >>>>         at
>> >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> >>>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> >>>>         at
>> >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> >>>>         at
>> >>>>
>> >>>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >>>>         at
>> >>>>
>> >>>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >>>>         at java.lang.Thread.run(Thread.java:745)
>> >>>>
>> >>>> How to fix this problem ?
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> View this message in context:
>> >>>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
>> >>>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>>
>> >>>> ---------------------------------------------------------------------
>> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>
>
>

Mime
View raw message