spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amol Talap <amol.ta...@gmail.com>
Subject Databricks Spark XML parsing exception while iterating
Date Mon, 10 Jul 2017 18:14:48 GMT
Hi All
Does anyone know a fix for below exception. The XML parsing function
works fine for unit test as you see in below code but fails while
using in RDD.

new_xml: org.apache.spark.rdd.RDD[List[(String, String)]] =
MapPartitionsRDD[119] at map at <console>:57
17/07/10 08:29:54 ERROR Executor: Exception in task 0.0 in stage 31.0 (TID 50)
java.lang.NullPointerException
	at $line103.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.xmlParse(<console>:52)
	at $line109.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:57)
	at $line109.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:57)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:875)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:875)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
17/07/10 08:29:54 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 51)
java.lang.NullPointerException

cat /home/spark/XML_Project/XML_Prog.scala
println(">>>>>>>START UnitTest for xmlParse")
import com.databricks.spark.xml.XmlReader
def xmlParse (xml:String) = {	
	var xRDD = sc.parallelize(Seq(xml))
	var df = new XmlReader().xmlRdd(spark.sqlContext,xRDD)	
	var out_rdd = df.withColumn("comment",
explode(df("Comments.Comment"))).select($"comment.Description",$"comment.Title").rdd
	out_rdd.collect.map(x=>(x(0).toString,x(1).toString)).toList
}

val xml1="<books><Comments><Comment><Title>Title1.1</Title><Description>Descript1.1</Description></Comment><Comment><Title>Title1.2</Title><Description>Descript1.2</Description></Comment><Comment><Title>Title1.3</Title><Description>Descript1.3</Description></Comment></Comments></books>"
val xml_parse = xmlParse(xml1)

println("<<<<<<<<END UnitTest for xmlParse")

val rdd = sc.textFile("file:///home/spark/XML_Project/data.txt")
val xml_pRDDs = rdd.map(x=>(x.split(',')(0).toInt, x.split(',')(3)))

val new_xml = xml_pRDDs.map({case (key,value)=>(xmlParse(value.toString))})
new_xml.foreach(println)


cat /home/spark/XML_Project/data.txt
1,Amol,Kolhapur,<books><Comments><Comment><Title>Title1.1</Title><Description>Descript1.1</Description></Comment><Comment><Title>Title1.2</Title><Description>Descript1.2</Description></Comment><Comment><Title>Title1.3</Title><Description>Descript1.3</Description></Comment></Comments></books>
2,Ameet,Bangalore,<books><Comments><Comment><Title>Title2.1</Title><Description>Descript2.1</Description></Comment><Comment><Title>Title2.2</Title><Description>Descript2.2</Description></Comment></Comments></books>
3,Rajesh,Jaipur,<books><Comments><Comment><Title>Title3.1</Title><Description>Descript3.1</Description></Comment><Comment><Title>Title3.2</Title><Description>Descript3.2</Description></Comment><Comment><Title>Title3.3</Title><Description>Descript3.3</Description></Comment><Comment><Title>Title3.4</Title><Description>Descript3.4</Description></Comment></Comments></books>

Regards,
Amol

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message