Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 31F81200B8C for ; Mon, 12 Sep 2016 22:47:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3045A160AB8; Mon, 12 Sep 2016 20:47:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 54318160AB2 for ; Mon, 12 Sep 2016 22:47:55 +0200 (CEST) Received: (qmail 22350 invoked by uid 500); 12 Sep 2016 20:47:52 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 22340 invoked by uid 99); 12 Sep 2016 20:47:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Sep 2016 20:47:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 50C6F180140 for ; Mon, 12 Sep 2016 20:47:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.279 X-Spam-Level: * X-Spam-Status: No, score=1.279 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=ippon-fr.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 9YaBYgW8qODQ for ; Mon, 12 Sep 2016 20:47:48 +0000 (UTC) Received: from mail-vk0-f41.google.com (mail-vk0-f41.google.com [209.85.213.41]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 9FCB35FABF for ; Mon, 12 Sep 2016 20:47:47 +0000 (UTC) Received: by mail-vk0-f41.google.com with SMTP id f76so155530985vke.0 for ; Mon, 12 Sep 2016 13:47:47 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=ippon-fr.20150623.gappssmtp.com; s=20150623; h=mime-version:from:date:message-id:subject:to; bh=JVGLvDYHZh4iYXqr3UhTuDmjuklokVLnkQmSPgTTdtQ=; b=PJ+NHgCa3XTYy28mg6S1Cf5i19pEK49oTy+ulJQxBzXF0LeLMI31xlCFcuz7OgnxHx a2nL6jUofKDiwgY11r0Gh0WHlBjVJtshraD4+wMr4iKNVxn+XirzTda2MF7LIG5SntG3 7no32YCLBcvqdtszaTtn5a5zNQAQihtrtYdPqZ/e01+dHhYyGNZ/Op//w9j04UkAmgqA kdWh0xuYGwmVCGOkrlHQlaWkw+JmEDV14puOl5vG79KH4iShBamAWfnEgtYe8Fs8IBGp FOqGeKDhE5rbAQclWO/ir3qSKm04TNTCoCI4oBgd30LLkEdNgu7IgWvjny0CeMYxx/Qc 6Zew== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=JVGLvDYHZh4iYXqr3UhTuDmjuklokVLnkQmSPgTTdtQ=; b=VOA9M489aImheeLgzPBUuc6gHgBpoq4v/WAO3vPH/NkZZN8caX6LZpr4jYB6/EPoje Z1Bd+HOo5oGk+JxcUXcaag5tyF1G4cULL2JB6JocmNIHpKQlyFNPzlnx+Q4bX4yrVg70 17HsHaz4Uv8U6E+zhbAqkJlLmYioCPujFL4UKZuFVdx1MvFaa4xJrwJGKDmPMlqJdiLR H5/C/ivTo4cM6VLfc++ea6O73Oy+Xg0jUf5l7opBrmTy8buwqTHsd7s1j6IPA2FaIWwe HcGx/Bt5xAhdhrUcwCnW49kVcP4VeMeMJSYSmt7loUSqx1SWkxVRrXLxZX2FCsjPkuRI krew== X-Gm-Message-State: AE9vXwN7RgzpHDzMg/uaFMfHK8kCf9L27/Ufb0nYxqLKfZwe1dvkKCZnwVoVtrx/OsMvE6slzf5fG42xyKS/4f2i X-Received: by 10.31.131.195 with SMTP id f186mr14650075vkd.147.1473713260660; Mon, 12 Sep 2016 13:47:40 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.75.215 with HTTP; Mon, 12 Sep 2016 13:47:40 -0700 (PDT) From: Alexis Seigneurin Date: Mon, 12 Sep 2016 16:47:40 -0400 Message-ID: Subject: Strings not converted when calling Scala code from a PySpark app To: user Content-Type: multipart/alternative; boundary=001a1143ce206723c5053c559baf archived-at: Mon, 12 Sep 2016 20:47:56 -0000 --001a1143ce206723c5053c559baf Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, *TL;DR - I have what looks like a DStream of Strings in a PySpark application. I want to send it as a DStream[String] to a Scala library. Strings are not converted by Py4j, though.* I'm working on a PySpark application that pulls data from Kafka using Spark Streaming. My messages are strings and I would like to call a method in Scala code, passing it a DStream[String] instance. However, I'm unable to receive proper JVM strings in the Scala code. It looks to me like the Python strings are not converted into Java strings but, instead, are serialized. My question would be: how to get Java strings out of the DStream object? Here is the simplest Python code I came up with: from pyspark.streaming import StreamingContext ssc =3D StreamingContext(sparkContext=3Dsc, batchDuration=3Dint(1)) from pyspark.streaming.kafka import KafkaUtils stream =3D KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"}) values =3D stream.map(lambda tuple: tuple[1]) ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream) ssc.start() I'm running this code in PySpark, passing it the path to my JAR: pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar On the Scala side, I have: package com.seigneurin import org.apache.spark.streaming.api.java.JavaDStream object MyPythonHelper { def doSomething(jdstream: JavaDStream[String]) =3D { val dstream =3D jdstream.dstream dstream.foreachRDD(rdd =3D> { rdd.foreach(println) }) } } Now, let's say I send some data into Kafka: echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN The println statement in the Scala code prints something that looks like: [B@758aa4d9 I expected to get foo bar instead. Now, if I replace the simple println statement in the Scala code with the following: rdd.foreach(v =3D> println(v.getClass.getCanonicalName)) I get: java.lang.ClassCastException: [B cannot be cast to java.lang.String This suggests that the strings are actually passed as arrays of bytes. If I simply try to convert this array of bytes into a string (I know I'm not even specifying the encoding): def doSomething(jdstream: JavaDStream[Array[Byte]]) =3D { val dstream =3D jdstream.dstream dstream.foreachRDD(rdd =3D> { rdd.foreach(bytes =3D> println(new String(bytes))) }) } I get something that looks like (special characters might be stripped off): =EF=BF=BD]qXfoo barqa. This suggests the Python string was serialized (pickled?). How could I retrieve a proper Java string instead? Thanks, Alexis --001a1143ce206723c5053c559baf Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,


=
TL;DR - I have what looks like a DStream of Strings in a PySpark ap= plication. I want to send it as a DStream[String] to a Scala library. Strin= gs are not converted by Py4j, though.


I'm working on a PySpark application that pulls data from Kafka = using Spark Streaming. My messages are strings and I would like to call a m= ethod in Scala code, passing it a DStream[String] instance. However, I'= m unable to receive proper JVM strings in the Scala code. It looks to me li= ke the Python strings are not converted into Java strings but, instead, are= serialized.

My question would be: how to get Java= strings out of the DStream object?


Here is the simplest Python code I came up with:

from pyspark.streaming import StreamingContext
ssc =3D StreamingContext= (sparkContext=3Dsc, batchDuration=3Dint(1))

from pyspark.streaming.kafka import KafkaUtils
stream =3D KafkaUtils.createDirectStream(ssc,= ["IN"], {"metadata.broker.list": "localhost:9092&= quot;})
values =3D str= eam.map(lambda tuple: tuple[1])

ssc._jv= m.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
<= div>
ssc.start()

=
I'm running this code in PySpark, passing it the path to my JAR:

pyspark --driver-class-pat= h ~/path/to/my/lib-0.1.1-SNAPSHOT.jar

On the Scala side, I have:

package com.seigneurin

import= org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
=C2=A0 def doSomething(jdstream: JavaDStream[String]) =3D {
=C2=A0 =C2=A0 val dstream= =3D jdstream.dstream
= =C2=A0 =C2=A0 dstream.foreachRDD(rdd =3D> {
=C2=A0 =C2=A0 =C2=A0 rdd.foreach(println)<= /div>
=C2=A0 =C2=A0 })
=
=C2=A0 }
}

= Now, let's say I send some data into Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-conso= le-producer.sh --broker-list localhost:9092 --topic IN

The println statement in the Scala code prints som= ething that looks like:

[= B@758aa4d9

I expected to get f= oo bar instead.

Now, if I replace the simple print= ln statement in the Scala code with the following:

rdd.foreach(v =3D> println(v.getClass.getCanon= icalName))

I get:
java.lang.ClassCastException: [B cann= ot be cast to java.lang.String

This suggests that the strings are actually passed as arrays of bytes.

If I simply try to convert this array of bytes into a= string (I know I'm not even specifying the encoding):

=C2=A0 =C2=A0 =C2=A0 def doSomething(jdst= ream: JavaDStream[Array[Byte]]) =3D {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 val dstream =3D jdstream.dstrea= m
=C2=A0 =C2=A0 =C2=A0= =C2=A0 dstream.foreachRDD(rdd =3D> {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 rdd.foreach(bytes =3D= > println(new String(bytes)))
=C2=A0 =C2=A0 =C2=A0 =C2=A0 })
=C2=A0 =C2=A0 =C2=A0 }
I get something that looks like (special characters might be s= tripped off):

=EF=BF=BD]q= Xfoo barqa.

This suggests the = Python string was serialized (pickled?). How could I retrieve a proper Java= string instead?


Thanks,
= Alexis
--001a1143ce206723c5053c559baf--