spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: BigDecimal problem in parquet file
Date Sat, 13 Jun 2015 01:01:41 GMT
Maybe it's related to a bug, which is fixed by
https://github.com/apache/spark/pull/6558 recently.

On Fri, Jun 12, 2015 at 5:38 AM, Bipin Nag <bipin.nag@gmail.com> wrote:
> Hi Cheng,
>
> Yes, some rows contain unit instead of decimal values. I believe some rows
> from original source I had don't have any value i.e. it is null. And that
> shows up as unit. How does the spark-sql or parquet handle null in place of
> decimal values, assuming that field is nullable. I will have to change it
> properly.
>
> Thanks for helping out.
> Bipin
>
> On 12 June 2015 at 14:57, Cheng Lian <lian.cs.zju@gmail.com> wrote:
>>
>> On 6/10/15 8:53 PM, Bipin Nag wrote:
>>
>> Hi Cheng,
>>
>> I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an
>> existing parquet file, then repartitioning and saving it. Doing this gives
>> the error. The code for this doesn't look like causing  problem. I have a
>> feeling the source - the existing parquet is the culprit.
>>
>> I created that parquet using a jdbcrdd (pulled from microsoft sql server).
>> First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made a
>> dataframe from it using a schema then saved it as a parquet.
>>
>> Following is the code :
>> For saving jdbcrdd:
>>  name - fullqualifiedtablename
>>  pk - string for primarykey
>>  pklast - last id to pull
>>     val myRDD = new JdbcRDD( sc, () =>
>>         DriverManager.getConnection(url,username,password) ,
>>         "SELECT * FROM " + name + " WITH (NOLOCK) WHERE ? <= "+pk+" and
>> "+pk+" <= ?",
>>         1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
>>     myRDD.saveAsObjectFile("rawdata/"+name);
>>
>> For applying schema and saving the parquet:
>>     val myschema = schemamap(name)
>>     val myrdd =
>> sc.objectFile[Array[Object]]("/home/bipin/rawdata/"+name).map(x =>
>> org.apache.spark.sql.Row(x:_*))
>>
>> Have you tried to print out x here to check its contents? My guess is that
>> x actually contains unit values. For example, the follow Spark shell code
>> can reproduce a similar exception:
>>
>> import org.apache.spark.sql.types._
>> import org.apache.spark.sql.Row
>>
>> val schema = StructType(StructField("dec", DecimalType(10, 0)) :: Nil)
>> val rdd = sc.parallelize(1 to 10).map(_ => Array(())).map(arr => Row(arr:
>> _*))
>> val df = sqlContext.createDataFrame(rdd, schema)
>>
>> df.saveAsParquetFile("file:///tmp/foo")
>>
>>     val actualdata = sqlContext.createDataFrame(myrdd, myschema)
>>     actualdata.saveAsParquetFile("/home/bipin/stageddata/"+name)
>>
>> Schema structtype can be made manually, though I pull table's metadata and
>> make one. It is a simple string translation (see sql docs and/or spark
>> datatypes)
>>
>> That is how I created the parquet file. Any help to solve the issue is
>> appreciated.
>> Thanks
>> Bipin
>>
>>
>> On 9 June 2015 at 20:44, Cheng Lian <lian.cs.zju@gmail.com> wrote:
>>>
>>> Would you please provide a snippet that reproduce this issue? What
>>> version of Spark were you using?
>>>
>>> Cheng
>>>
>>> On 6/9/15 8:18 PM, bipin wrote:
>>>>
>>>> Hi,
>>>> When I try to save my data frame as a parquet file I get the following
>>>> error:
>>>>
>>>> java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
>>>> org.apache.spark.sql.types.Decimal
>>>>         at
>>>>
>>>> org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
>>>>         at
>>>>
>>>> org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
>>>>         at
>>>>
>>>> org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
>>>>         at
>>>>
>>>> org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
>>>>         at
>>>>
>>>> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
>>>>         at
>>>> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
>>>>         at
>>>> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
>>>>         at
>>>>
>>>> org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
>>>>         at
>>>>
>>>> org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
>>>>         at
>>>>
>>>> org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
>>>>         at
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>         at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>         at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> How to fix this problem ?
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message