spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thakrar, Jayesh" <jthak...@conversantmedia.com>
Subject SparkContext - parameter for RDD, but not serializable, why?
Date Wed, 28 Feb 2018 18:03:06 GMT
Hi All,

I was just toying with creating a very rudimentary RDD datasource to understand the inner
workings of RDDs.

It seems that one of the constructors for RDD has a parameter of type SparkContext, but it
(apparently) exists on the driver only and is not serializable.

Consequently, any attempt to use SparkContext parameter inside your custom RDD generates a
runtime error of it not being serializable.

Just wondering what is the rationale behind this?
I.e. if it is not serializable/usable, why make it a parameter?
And if it needs to be a parameter, why not make it serializable (is it even possible?)

Below is my working code where I test a custom RDD.

scala> val mydata = spark.read.format("MyDataSourceProvider").load()
mydata: org.apache.spark.sql.DataFrame = [mydataStr: string]

scala> mydata.show(10, false)
+------------------------+
|mydataStr               |
+------------------------+
|Partition: 0, row 1 of 3|
|Partition: 0, row 2 of 3|
|Partition: 0, row 3 of 3|
+------------------------+

scala>


///// custom RDD


import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider, TableScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD


class MyDataSourceProvider extends DataSourceRegister
  with RelationProvider with Logging {

  override def shortName():String = { "mydata" }

  private val myDataSchema: StructType = new StructType(Array[StructField](new StructField("mydataStr",
StringType, false)))

  def sourceSchema(sqlContext: SQLContext,
                            schema: Option[StructType],
                            providerName: String,
                            parameters: Map[String, String]): (String, StructType) = {
    (shortName(), schema.get)
  }

  override def createRelation(sqlContext: SQLContext,
                              parameters: Map[String, String]): BaseRelation = {
    new MyDataRelation(sqlContext, myDataSchema, parameters)
  }

}


class MyDataRelation(override val sqlContext: SQLContext,
                     override val schema: StructType,
                     params: Map[String, String]) extends BaseRelation with TableScan with
Logging {

  override def buildScan(): org.apache.spark.rdd.RDD[Row] = {
    val rdd = new MyDataSourceRDD(sqlContext.sparkContext, sqlContext.getAllConfs)
    rdd
  }

  override def needConversion = true
}


class MyDataSourceRDD(sc: SparkContext, conf: Map[String, String]) extends RDD[Row](sc, Nil)
{

  override def getPartitions: Array[org.apache.spark.Partition] = {
    // sc.getConf.getAll.foreach(println) - this fails with SparkContext not serialiable error.
So what use is this parameter ?!
    val numPartitions = conf.getOrElse("spark.mydata.numpartitions", "1").toInt
    val rowsPerPartition = conf.getOrElse("spark.mydata.rowsperpartition", "3").toInt
    val partitions = 0 until numPartitions map(partition => new MyDataSourcePartition(partition,rowsPerPartition))
    partitions.toArray
  }

  override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
    val myDataSourcePartition = split.asInstanceOf[MyDataSourcePartition]
    val partitionId = myDataSourcePartition.index
    val rows = myDataSourcePartition.rowCount
    val partitionData = 1 to rows map(r => Row(s"Partition: ${partitionId}, row ${r} of
${rows}"))
    partitionData.iterator
  }

}


class MyDataSourcePartition(partitionId: Int, rows: Int) extends Partition with Serializable
{

  override def index:Int = partitionId

  def rowCount: Int = rows

Mime
View raw message