spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "hsy541@gmail.com" <hsy...@gmail.com>
Subject Re: How to do an interactive Spark SQL
Date Wed, 23 Jul 2014 00:55:35 GMT
For example, this is what I tested and work on local mode, what it does is
it get data and sql query both from kafka and do sql on each RDD and output
the result back to kafka again
I defined a var called *sqlS. * In the streaming part as you can see I
change the sql statement if it consumes a sql message from kafka then next
time when you do *sql(sqlS) *it execute the updated sql query.

But this code doesn't work in cluster because sqlS is not updated on all
the workers from what I understand.

So my question is how do I change the sqlS value at runtime and make all
the workers pick the latest value.


    *var sqlS = "select count(*) from records"*
    val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
args
    val sparkConf = new SparkConf().setAppName("KafkaSpark")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val sqlContext = new SQLContext(sc)

    // Importing the SQL context gives access to all the SQL functions and
implicit conversions.
    import sqlContext._
    import sqlContext.createSchemaRDD

    //    val tt = Time(5000)
    val topicpMap = collection.immutable.HashMap(topic -> numParts.toInt,
sqltopic -> 2)
    val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS =
t._2;* false } else true }).map(t => getRecord(t._2.split("#")))

    val zkClient = new ZkClient(zkQuorum, 30000, 30000, ZKStringSerializer)

    val brokerString =
ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",")

    KafkaSpark.props.put("metadata.broker.list", brokerString)
    val config = new ProducerConfig(KafkaSpark.props)
    val producer = new Producer[String, String](config)

    val result = recordsStream.foreachRDD((recRDD) => {
      val schemaRDD = sqlContext.createSchemaRDD(recRDD)
      schemaRDD.registerAsTable(tName)
      val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) => { s
+ r.mkString(",") + "\n" })
      producer.send(new KeyedMessage[String, String](outputTopic, s"SQL:
$sqlS \n $result"))
    })
    ssc.start()
    ssc.awaitTermination()


On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang <zongheng.y@gmail.com> wrote:

> Can you paste a small code example to illustrate your questions?
>
> On Tue, Jul 22, 2014 at 5:05 PM, hsy541@gmail.com <hsy541@gmail.com>
> wrote:
> > Sorry, typo. What I mean is sharing. If the sql is changing at runtime,
> how
> > do I broadcast the sql to all workers that is doing sql analysis.
> >
> > Best,
> > Siyuan
> >
> >
> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zongheng.y@gmail.com>
> wrote:
> >>
> >> Do you mean that the texts of the SQL queries being hardcoded in the
> >> code? What do you mean by "cannot shar the sql to all workers"?
> >>
> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy541@gmail.com <hsy541@gmail.com>
> >> wrote:
> >> > Hi guys,
> >> >
> >> > I'm able to run some Spark SQL example but the sql is static in the
> >> > code. I
> >> > would like to know is there a way to read sql from somewhere else
> (shell
> >> > for
> >> > example)
> >> >
> >> > I could read sql statement from kafka/zookeeper, but I cannot share
> the
> >> > sql
> >> > to all workers. broadcast seems not working for updating values.
> >> >
> >> > Moreover if I use some non-serializable class(DataInputStream etc) to
> >> > read
> >> > sql from other source, I always get "Task not serializable:
> >> > java.io.NotSerializableException"
> >> >
> >> >
> >> > Best,
> >> > Siyuan
> >
> >
>

Mime
View raw message