spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Junfeng Chen <darou...@gmail.com>
Subject Re: Nullpointerexception error when in repartition
Date Thu, 12 Apr 2018 10:02:36 GMT
Hi, Tathagata

I have tried structured streaming, but in line

> Dataset<Row> rowDataset = spark.read().json(jsondataset);


Always throw

> Queries with streaming sources must be executed with writeStream.start()


But what i need to do in this step is only transforming json string data to
Dataset . How to fix it?

Thanks!


Regard,
Junfeng Chen

On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <tathagata.das1565@gmail.com>
wrote:

> It's not very surprising that doing this sort of RDD to DF conversion
> inside DStream.foreachRDD has weird corner cases like this. In fact, you
> are going to have additional problems with partial parquet files (when
> there are failures) in this approach. I strongly suggest that you use
> Structured Streaming, which is designed to do this sort of processing. It
> will take care of tracking the written parquet files correctly.
>
> TD
>
> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darouwan@gmail.com> wrote:
>
>> I write a program to read some json data from kafka and purpose to save
>> them to parquet file on hdfs.
>> Here is my code:
>>
>>> JavaInputDstream stream = ...
>>> JavaDstream rdd = stream.map...
>>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String>
>>> stringjavardd->{
>>>     Dataset<Row> df = spark.read().json( stringjavardd ); // convert
>>> json to df
>>>     JavaRDD<Row> rowJavaRDD = df.javaRDD().map...  //add some new fields
>>>     StructType type = df.schema()...; // constuct new type for new added
>>> fields
>>>     Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type);
>>> //create new dataframe
>>>     newdf.repatition(taskNum).write().mode(SaveMode.Append).pati
>>> tionedBy("appname").parquet(savepath); // save to parquet
>>> })
>>
>>
>>
>> However, if I remove the repartition method of newdf in writing parquet
>> stage, the program always throw nullpointerexception error in json convert
>> line:
>>
>> Java.lang.NullPointerException
>>>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.
>>> scala:1783)
>>> ...
>>
>>
>> While it looks make no sense, writing parquet operation should be in
>> different stage with json transforming operation.
>> So how to solve it? Thanks!
>>
>> Regard,
>> Junfeng Chen
>>
>
>

Mime
View raw message