Trying to replicate it following change to ActorWordCount can reproduce it.

 lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).foreach {
      x => x foreach { x1 => con ! x1 }
    }

While this works
 lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).foreach {
      x => con ! x.collect()
    }

There is another important thing, for now it is not possible to pass ActorRef while creating a props instance for actorStream. The reason is, it serializes the actorRef and sends it to runJob(which is dumb to not have actor system). So alternative approach like by passing path(as string) or ActorPath and then instantiate the ActorRef, may be used. 

This should be fixable I suppose in future, but for that I have to learn how to hack into our runjob mechanism. Will try to check it ASAP. For now may be we can create these issues in jira ?

Please let me know if this fixes your problems. 



On Thu, Oct 3, 2013 at 12:27 PM, Prashant Sharma <scrapcodes@gmail.com> wrote:
Hi Paul, 

Forgive me, for I have not seen spray-io/akka-io at all and trying to understand what is done.
I feel the problem is in this statement
    stream foreach { rdd => rdd foreach { item => connection ! Tcp.Write(item) } }

AFAIR this use to work earlier. So in order to be sure about the above, I will try to write a minimalist sample that sends stuff to a remote actor processed by a dstream in foreachRdd. 

By any chance have you tried it ? 



On Wed, Oct 2, 2013 at 11:07 PM, Paul Snively <psnively@icloud.com> wrote:
Hi everyone,

I think I have a knack for trying things a bit outside the box. :-)

I'm trying to develop a TLS-aware streaming TCP server that creates a Spark Stream for each incoming connection. Since Akka 2.2's new IO module has been backported to Akka 2.0.5 thanks to <http://spray.io/documentation/1.0-M8.1/spray-io/>, it seemed the easiest thing to do would be to use Spark's ActorStream support to create a DStream from the ConnectionHandler Actor with the Receiver trait mixed in, and the Tcp.Received(data) case simply calling pushBlock(data). My server class just takes a StreamingContext and a Function2[ActorRef, DStream[ByteString], Unit] and, when a connection is accepted, constructs the DStream, then passes the connection and DStream to the callback.

I was able to write this, and it compiles OK, but my test throws an IllegalStateException because deserializing a task is attempting to deserialize an ActorRef, presumably the TCP connection, without an ActorSystem in scope. The exception message is helpful:

java.lang.IllegalStateException: Trying to deserialize a serialized ActorRef without an ActorSystem in scope. Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'

However, my attempts to, e.g. wrap my callback invocation in this are unsuccessful, since I don't have access to an ExtendedActorSystem to pass in.

A small project with code and my failing test is at <https://www.dropbox.com/s/k9n5z47vw96r8o3/spray-ssl-server.zip>. If someone has a bit of time to help me look into this, I'd be grateful. Note that this is a very generic task that I think more people than just me would be interested in. :-)

Thanks!
Paul



--
s



--
s