spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chandramouli Muthukumaran (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-21844) Checkpointing issue in Spark Streaming involving Dataframes
Date Sat, 26 Aug 2017 19:53:02 GMT
Chandramouli Muthukumaran created SPARK-21844:
-------------------------------------------------

             Summary: Checkpointing issue in Spark Streaming involving Dataframes
                 Key: SPARK-21844
                 URL: https://issues.apache.org/jira/browse/SPARK-21844
             Project: Spark
          Issue Type: Bug
          Components: DStreams
    Affects Versions: 2.1.0
         Environment: Spark 2.1.0 , Kafka  0.10
            Reporter: Chandramouli Muthukumaran


I have started spark streaming recently and implementing checkpoint. I'm storing the checkpoint
in HDFS. when the streaming failed it's able to go back to the last checkpoint but getting
NullPointerException and the streaming job is getting killed. I'm able to see the checkpoints
in HDFS. Not sure why I'm getting the exception even though there is chckpoint in HDFS. Any
inputs will be helpful. Not sure if it is a bug


{code:java}
package ca.twitter2
import org.apache.kafka.clients._
import org.apache.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import java.util.HashMap
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}


object NGINXLogProcessingWindowedwithcheckpointv2 {    
     case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method:
String, serverip2: String, responsetime: String, operation: String, application: String)
      val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint6"
      val WINDOW_LENGTH = Seconds(43200)
      val SLIDE_INTERVAL = Seconds(120)
      
   def creatingFunc(): StreamingContext = {
   println("Creating new context")
   val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
   
  .setMaster("local")
   val ssc = new StreamingContext(sparkConf, Seconds(120))
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
   //val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
	    
	  ssc.checkpoint(checkpointDir)
	  	
	 val spark = SparkSession
                   .builder()                  
                   .getOrCreate()
                   
       val topics = List("REST").toSet
       // Logger.getLogger("org").setLevel(Level.ERROR)
        //Logger.getLogger("akka").setLevel(Level.ERROR)
       val kafkaParams = Map[String, Object](
                         "bootstrap.servers" -> "10.24.18.36:6667",
                          //"bootstrap.servers" -> "10.71.52.119:9092",
          // "bootstrap.servers" -> "192.168.123.36:6667",
                         "group.id" -> "2",
                          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",
                          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
                          "auto.offset.reset" -> "latest",
                          "enable.auto.commit" -> (false: java.lang.Boolean)
                                             )
   
               // Create the direct stream with the Kafka parameters and topics
      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      val  kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,
consumerStrategy)
          //kafkaStream.checkpoint(Seconds(600))
      val lines = kafkaStream.map(_.value()).repartition(4)
      val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
      val lines2= lineswindowed.map(_.split(","))
      val lines4slide= lines2.map(p => AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString,
p(8).toString, p(7).toString, p(10), p(12)))
            lines4slide.foreachRDD { rdd2 =>
               if (!rdd2.isEmpty) {
                    val count = rdd2.count
                    println("count received " + count)
                    import org.apache.spark.sql.functions._
                    import spark.implicits._
                    rdd2.count
                   rdd2.checkpoint   
                    
                    val LogDF = rdd2.toDF()
                                         LogDF.createOrReplaceTempView("Log")
                    val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method,
' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method,
' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation,
'/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
                    LogDFslide.createOrReplaceTempView("LogDFslide")
                    //LogDFslide.printSchema()
                    //LogDFslide.show
                    val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as
request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3,
responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application
FROM LogDFslide")
                    Log2DFslide.createOrReplaceTempView("Log2DFslide")
                    val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode,
request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime,
httpsoapaction, application FROM Log2DFslide")                    // Log2DFslide.show   
                    //println("printing line3")
                    //Log2DFslideoutput.show
                   // Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
                    val   log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode,
request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2,
responsetime, httpsoapaction, application from  Log2DFslide where responsetime <>'-'
and responsetime <>'' ")  
                    log2DFFilter.createOrReplaceTempView("log2DFFilter")
                    //log2DFFilter.printSchema()
                    log2DFFilter.show
                    val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice,
round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 <>''
group by initcap(webservice2) ")

                    // val  Log3DFslide =  log2DFFilter.select(expr("initcap(webservice2)"),
expr("round(avg(responsetime),4)").as("Averageresponsetime")  ).groupBy(expr("initcap(webservice2)"))
                
                    // Log3DFslide.printSchema()
                    Log3DFslide.createOrReplaceTempView("Log3DFslide")
                            
                     Log3DFslide.show
                    //Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
                           
                                 }
               
                                  } 
                
								  ssc
   
      } 
       def main(args: Array[String]) { 
    
    
        val context = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc _)
   
        //val ssc = StreamingContext.getOrCreate(checkpointDir,() => { creatingFunc(checkpointDir)
})
        context.start() 
        context.awaitTermination() 
     }
        
      
}
{code}

I get the following error@


{code:java}
17/08/26 13:41:00 ERROR JobScheduler: Error running job streaming job 1503776400000 ms.0
java.lang.NullPointerException
	at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:165)
	at ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:69)
	at ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:60)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Exception in thread "main" java.lang.NullPointerException
	at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:165)
	at ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:69)
	at ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:60)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message