Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C93CD200BB3 for ; Wed, 2 Nov 2016 18:24:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C7D27160AFB; Wed, 2 Nov 2016 17:24:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BD044160AF0 for ; Wed, 2 Nov 2016 18:23:59 +0100 (CET) Received: (qmail 66430 invoked by uid 500); 2 Nov 2016 17:23:58 -0000 Mailing-List: contact user-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@cassandra.apache.org Delivered-To: mailing list user@cassandra.apache.org Received: (qmail 66419 invoked by uid 99); 2 Nov 2016 17:23:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Nov 2016 17:23:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id C9D92C6BD2 for ; Wed, 2 Nov 2016 17:23:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.629 X-Spam-Level: ** X-Spam-Status: No, score=2.629 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id zvT4rEWUjMNv for ; Wed, 2 Nov 2016 17:23:57 +0000 (UTC) Received: from mail-oi0-f54.google.com (mail-oi0-f54.google.com [209.85.218.54]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 9735E5FB0B for ; Wed, 2 Nov 2016 17:23:56 +0000 (UTC) Received: by mail-oi0-f54.google.com with SMTP id v84so26562292oie.3 for ; Wed, 02 Nov 2016 10:23:56 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:from:date:message-id:subject:to; bh=JkD/Dg5XwsW45J0NgIjwV2awVJxpI9GZ9iGPd4we/2I=; b=Znqw+Ou3XQ3sqKjWG+P5oVVgG55owUojQZZxHOnIFRrYn2Epox2GRm6eHPBlIt8K/8 jTS+S76Nj7ZyhqvEyp3a3ShJreVYr13h1zJzeUef8301w2kqsDIfv0mQut4usE4UQ9mi D1pM+gAIQTN4+E1uO1iejp5ARbFcGCnhWXC29aftxr0L/KloBktPLHC24FCyGP+tLZOa qlUFuozh7naiC71J9+x4LndTXXfyaXxMh6BbABcG2uuQQuMH8a0b65adPbiIcCZnXmyQ s+UkHFQHRy7RjpoSEhMZj6KG8ZI8qR8Jk7sNTrjDVA4TH0RNwL85It81PDo7ecvTxygA QURw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=JkD/Dg5XwsW45J0NgIjwV2awVJxpI9GZ9iGPd4we/2I=; b=Azf/fJtV4VLrIEEyIQm+6SXXrBjGvUeXaNdYGxFtvEVsreT+m7ClXoqrliCSKYWb1J xNlU+urxFT/QlDYU57zJLwZyfUrHPbJT9P9KuAd8t3ZVNpqY73qlUVnVucCkg1WYhLBH fKBmFSkW56/SzQISri0ib/7f0M0A8RS73uRk3wqnG9ueA0OAl0z5LAljOKOY8FT8Ne2M Mmis1LxV6+2PYu6d8poXU/Z8S+50OGufz5aHVz7tovjwssRoEecFhY766hui3DDYCeGB n76EYCqpgP5k3uSeJbzieuKK6VbA6ILNbhoFHvgdXo62KhZmOzOjrHi5s3kOhcRt20b6 F4Kw== X-Gm-Message-State: ABUngvclJKbtt4YRCf4aauvKjkTZOWP9WzCtPbIhyN8iAoz/MvwCHjX1hBXmcY4RUkZB2Ir8avfDvGMiIkLWwg== X-Received: by 10.107.142.134 with SMTP id q128mr5267489iod.109.1478107403721; Wed, 02 Nov 2016 10:23:23 -0700 (PDT) MIME-Version: 1.0 Received: by 10.107.51.77 with HTTP; Wed, 2 Nov 2016 10:23:23 -0700 (PDT) From: Cassa L Date: Wed, 2 Nov 2016 10:23:23 -0700 Message-ID: Subject: Custom receiver for WebSocket in Spark not working To: user , user@cassandra.apache.org Content-Type: multipart/alternative; boundary=94eb2c05a4dabd207c054054b235 archived-at: Wed, 02 Nov 2016 17:24:01 -0000 --94eb2c05a4dabd207c054054b235 Content-Type: text/plain; charset=UTF-8 Hi, I am using spark 1.6. I wrote a custom receiver to read from WebSocket. But when I start my spark job, it connects to the WebSocket but doesn't get any message. Same code, if I write as separate scala class, it works and prints messages from WebSocket. Is anything missing in my Spark Code? There are no errors in spark console. Here is my receiver - import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket} /** * Custom receiver for WebSocket */ class WebSocketReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging { private var webSocket: WebSocket = _ @transient private var thread: Thread = _ override def onStart(): Unit = { thread = new Thread(this) thread.start() } override def onStop(): Unit = { setWebSocket(null) thread.interrupt() } override def run(): Unit = { println("Received ----") receive() } private def receive(): Unit = { val connection = WebSocket().open("ws://localhost:3001") println("WebSocket Connected ..." ) println("Connected ------- " + connection) setWebSocket(connection) connection.listener(new TextListener { override def onMessage(message: String) { System.out.println("Message in Spark client is --> " + message) } }) } private def setWebSocket(newWebSocket: WebSocket) = synchronized { if (webSocket != null) { webSocket.shutDown } webSocket = newWebSocket } } ===== Here is code for Spark job object WebSocketTestApp { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("Test Web Socket") .setMaster("local[20]") .set("test", "") val ssc = new StreamingContext(conf, Seconds(5)) val stream: ReceiverInputDStream[String] = ssc.receiverStream(new WebSocketReceiver()) stream.print() ssc.start() ssc.awaitTermination() } ============== } Thanks, LCassa --94eb2c05a4dabd207c054054b235 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,
I am using spark 1.6. I wrote a custom receiver to= read from WebSocket. But when I start my spark job, it =C2=A0connects to t= he WebSocket but =C2=A0doesn't get any message. Same code, if I write a= s separate scala class, it works and prints messages from WebSocket. Is any= thing missing in my Spark Code? There are no errors in spark console.
=

Here is my receiver -
import org.apache.spark.Logging
import org.apache.spark.sto= rage.StorageLevel
im= port org.apache.spark.streaming.receiver.Receiver
import org.jfarcand.wcs.{Messag= eListener, TextListener, WebSocket}

/**
* Custom receiver for WebSocket
*/
class WebSocketReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging {

private var webSocket: WebSocke= t =3D _

@transient
= private var thread: Thread =3D _

override def onStart(): Unit = =3D {
thread= =3D new = Thread(this)
= thread.s= tart()
}

= override def onStop(): Unit =3D {
setWebSocket(null)
thread.interrupt()
}
=
override def
run(): Unit =3D {
println("Received ----&q= uot;)
receive()
}

private def receive(): Unit =3D {


= val connecti= on =3D WebSocket().open("ws://localhost:3001")
println("WebSocket Connected ...&quo= t; )
= println("Connected ------- " + c= onnection)
setWebSocket(connection)

connection.listener(new
TextListener {
         override def onMessage(message:=
 String) {
= System.out.= println("Message i= n Spark client is --> " + message)
}
})=

}

= private def setWebSocket(newWebSocket: WebSocket) =3D synchronized {=
if (webSocket
!=3D <= span style=3D"color:rgb(0,0,128);font-weight:bold">null) {
= webSocket.sh= utDown
}
webSocket =3D newWebSocket
}

}

=3D=3D=3D=3D=3D 
=
Here is =
code for Spark job

object WebSocketTestApp=
 {

def = main(args: Array[String]) { val conf = =3D new SparkCon= f()
.setAppName("Test Web Socket")
.setMaster("local[20]")
.= set("test", "")
val s= sc =3D new Strea= mingContext(conf, Seconds(5))


val stream: ReceiverInputDStream[String
] =3D ssc.receiverStream(new
WebSocketReceive= r())
stream.print()

ssc.start()
ssc.awaitTermination()
}

=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
}

Thanks,
LCassa
--94eb2c05a4dabd207c054054b235--