cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cassa L <lcas...@gmail.com>
Subject Custom receiver for WebSocket in Spark not working
Date Wed, 02 Nov 2016 17:23:23 GMT
Hi,
I am using spark 1.6. I wrote a custom receiver to read from WebSocket. But
when I start my spark job, it  connects to the WebSocket but  doesn't get
any message. Same code, if I write as separate scala class, it works and
prints messages from WebSocket. Is anything missing in my Spark Code? There
are no errors in spark console.

Here is my receiver -

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket}

/**
  * Custom receiver for WebSocket
  */
class WebSocketReceiver extends
Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging
{

  private var webSocket: WebSocket = _

  @transient
  private var thread: Thread = _

  override def onStart(): Unit = {
    thread = new Thread(this)
    thread.start()
  }

  override def onStop(): Unit = {
    setWebSocket(null)
    thread.interrupt()
  }

  override def run(): Unit = {
    println("Received ----")
    receive()
  }

  private def receive(): Unit = {


    val connection = WebSocket().open("ws://localhost:3001")
    println("WebSocket  Connected ..." )
    println("Connected ------- " + connection)
    setWebSocket(connection)

   connection.listener(new TextListener {

         override def onMessage(message: String) {
                 System.out.println("Message in Spark client is --> " + message)
           }
    })


}

private def setWebSocket(newWebSocket: WebSocket) = synchronized {
if (webSocket != null) {
webSocket.shutDown
}
webSocket = newWebSocket
}

}


=====

Here is code for Spark job


object WebSocketTestApp {

  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("Test Web Socket")
      .setMaster("local[20]")
      .set("test", "")
    val ssc = new StreamingContext(conf, Seconds(5))


    val stream: ReceiverInputDStream[String] = ssc.receiverStream(new
WebSocketReceiver())
    stream.print()

    ssc.start()
    ssc.awaitTermination()
  }


==============
}


Thanks,

LCassa

Mime
View raw message