From user-return-543-archive-asf-public=cust-asf.ponee.io@arrow.apache.org Wed Jul 22 09:35:28 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mailroute1-lw-us.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id DB6FD180643 for ; Wed, 22 Jul 2020 11:35:27 +0200 (CEST) Received: from mail.apache.org (localhost [127.0.0.1]) by mailroute1-lw-us.apache.org (ASF Mail Server at mailroute1-lw-us.apache.org) with SMTP id 72223124829 for ; Wed, 22 Jul 2020 09:35:26 +0000 (UTC) Received: (qmail 52447 invoked by uid 500); 22 Jul 2020 09:35:25 -0000 Mailing-List: contact user-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@arrow.apache.org Delivered-To: mailing list user@arrow.apache.org Received: (qmail 52433 invoked by uid 99); 22 Jul 2020 09:35:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jul 2020 09:35:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 095C5181433 for ; Wed, 22 Jul 2020 09:35:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.013 X-Spam-Level: X-Spam-Status: No, score=0.013 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=0.001, RCVD_IN_MSPIKE_WL=0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, T_KAM_HTML_FONT_INVALID=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=dremio.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id n8V2c6JwtBLi for ; Wed, 22 Jul 2020 09:35:23 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.218.50; helo=mail-ej1-f50.google.com; envelope-from=rymurr@dremio.com; receiver= Received: from mail-ej1-f50.google.com (mail-ej1-f50.google.com [209.85.218.50]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id D98F8BB94A for ; Wed, 22 Jul 2020 09:35:22 +0000 (UTC) Received: by mail-ej1-f50.google.com with SMTP id lx13so1502140ejb.4 for ; Wed, 22 Jul 2020 02:35:22 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=dremio.com; s=google; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=watvzypntyhvHLG0cOP9mNIbQB9U949UzYls4hOdyFw=; b=nzHW+tcnNdc4wtiq70qtqPZDyyJF1UF+8fgCk/6LDxejP4YjNTBCQ5afGFwSPoIcDz fGcfmFj0Nnv+DAVN5OxgpPdG9O6VnSGnOvjiDwYWU4WC5B2cv//cSDeaK7GCLQZ/CXu+ Yrq3yg2T3642XT5lCoSigaCoGgvPv92Ds9C/mKTm+QBvj5KDP4a5rgbATeVvlr2YMOlK oFfx/Dsh0wvreE2pYYg6mMxDm7FJ+eE/5WdrHDCmhnve5AESF9zrcxR7ZYwCnHX0sZwK YtPI+Xj2v2AafYiVfQdC7dJMmnz0WkDeqV6EEeCg+i3N4k53k8sU/Dl4dghQI6ZTCmN8 F7WA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=watvzypntyhvHLG0cOP9mNIbQB9U949UzYls4hOdyFw=; b=R0jkDhNypJ/zdVDbLAs+EJ7c7WngHIFeU7zAFTF1/AbWDP01Zxi7Ig1w50xGiBAJm+ jII7FXXALPmrBcI8vuwd51XJ3ThAhziODcQQuosRlzy6bK5OhWgIaRRHPgkt4Tpm97KR BVfTw1oa1o+oved4r5zCh0xbh+P81AM4S1mKcyrX+8MAf+ay/sB7BS0d7rHpWY0t5Xo/ 9GH23me2oTqZP5oO2/4Q8x/CtfYq5GBX+iRFglW6E19VzTxKABdCrHUwlKbknCgODFfm +erfZPLhyNzoYiwIFE/fkxLyXCER4tr7S+IiCDm5/CKS96f7nuly/0MxxopOFphdKTtd LDZQ== X-Gm-Message-State: AOAM532gk6/2FxZoMPTwQqfOY/Tm60rEU3/CrQ7q+InkN+snovHB3Ipa TWyCf/YGrdj+HbQiULFPndjJgE/ty88lBg5pv1NNwRka X-Google-Smtp-Source: ABdhPJylrnByy5O/9U3Rh91Ynu1HepcxuGPaH+OUTHQ+o7dFDoSRbI2jXRNlCV5+IJR9wx4e3tCERrfWtCMNOH0w3eM= X-Received: by 2002:a17:906:4086:: with SMTP id u6mr5273606ejj.9.1595410521466; Wed, 22 Jul 2020 02:35:21 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Ryan Murray Date: Wed, 22 Jul 2020 10:35:10 +0100 Message-ID: Subject: Re: Python and Java interoperability To: user@arrow.apache.org Content-Type: multipart/alternative; boundary="000000000000673ebb05ab047540" --000000000000673ebb05ab047540 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hey Jesse, This looks like the buffer was not put correctly onto Redis. Java is not able to deserialize the message. I recommend putting the hex string from python into redis. You have to be careful about feeding the correct bytes to arrow from redis so double check the hex str -> bytes deserialization is ok too. Best, Ryan On Wed, Jul 22, 2020 at 6:49 AM Jesse Wang wrote: > Hi Ryan, > Thanks for your reply. > > On Tue, Jul 21, 2020 at 8:54 PM Ryan Murray wrote: > >> Hey Jiaxing, >> >> You want to use the IPC mechanism to pass arrow buffers between >> languages[1] >> >> First get a buffer: >> ``` >> import pyarrow as pa >> >> data =3D [ >> pa.array([1, 2, 3, 4]), >> pa.array(['foo', 'bar', 'baz', None]), >> pa.array([True, None, False, True]) >> ] >> batch =3D pa.record_batch(data, names=3D['f0', 'f1', 'f2']) >> sink =3D pa.BufferOutputStream() >> writer =3D pa.ipc.new_stream(sink, batch.schema) >> writer.write_batch(batch) >> writer.close() >> buf =3D sink.getvalue() >> ``` >> >> The buffer could be written to Redis, to a file etc. For redis I think >> `r.set("key", buf.hex())` is easiest, you don't have to worry about >> encoding. >> >> On the java side something like: >> ``` >> Jedis jedis =3D new Jedis(); >> String buf =3D jedis.get("key"); >> RootAllocator rootAllocator =3D new RootAllocator(Long.MAX_VALUE); >> ByteArrayInputStream in =3D new >> ByteArrayInputStream(hexStringToByteArray(buf)); >> ArrowStreamReader stream =3D new ArrowStreamReader(in, rootAllocator= ); >> VectorSchemaRoot vsr =3D stream.getVectorSchemaRoot(); >> stream.loadNextBatch() >> ``` >> And the VectorSchemaRoot holds the correct Arrow Buffer. >> > > I tested this and get the following exception thrown: > ``` > Exception in thread =E2=80=9Cmain=E2=80=9D java.lang.IllegalArgumentExcep= tion > at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) > at > org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(Message= Serializer.java:547) > at > org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(Message= ChannelReader.java:58) > at > org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReade= r.java:132) > at > org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:178) > at > org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.jav= a:169) > at > org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.j= ava:62) > at Main.main(Main.java:38) > ``` > > >> While Redis will work for this you might find a file or socket a bit mor= e >> ergonomic in Arrow. The Plasma object store is also an option[2] which y= ou >> can think of as a primitive Redis specifically for Arrow Buffers. Finall= y, >> if you are using Redis as a message bus you might find the Arrow RPC >> mechanism Arrow Flight is a good choice[3]. >> > As for the Plasma, It seems it currently limited to a single host... (Fro= m > its source code arrow/cpp/src/plasma/io.cc, it used AF_UNIX socket only) > > >> >> [1] >> https://arrow.apache.org/docs/python/ipc.html#writing-and-reading-stream= s >> [2] https://arrow.apache.org/docs/python/plasma.html >> [3] https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/ >> >> On Tue, Jul 21, 2020 at 10:57 AM Jesse Wang wrote: >> >>> Hi, >>> I want to have a Java process read the content of DataFrames produced b= y >>> a Python process. The Java and Python processes run on different hosts. >>> >>> The solution I can think of is to have the Python process serialize the >>> DataFrame and save it to redis, and have the Java process parse the dat= a. >>> >>> The solution I find serializes the DataFrame to 'pybytes': >>> (from >>> https://stackoverflow.com/questions/57949871/how-to-set-get-pandas-data= frames-into-redis-using-pyarrow >>> ) >>> ``` >>> import pandas as pd >>> >>> import pyarrow as paimport redis >>> >>> df=3Dpd.DataFrame({'A':[1,2,3]}) >>> r =3D redis.Redis(host=3D'localhost', port=3D6379, db=3D0) >>> >>> context =3D pa.default_serialization_context() >>> r.set("key", context.serialize(df).to_buffer().to_pybytes()) >>> context.deserialize(r.get("key")) >>> A0 11 22 3 >>> >>> ``` >>> >>> I wonder if this serialized 'pybytes' can be parsed at the Java end? If >>> not, how can I achieve this properly? >>> >>> Thanks! >>> >>> -- >>> >>> Best Regards, >>> Jiaxing Wang >>> >>> >> > > -- > > Best Regards, > Jiaxing Wang > > --000000000000673ebb05ab047540 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hey Jesse,

This looks like the buffer w= as not put correctly onto Redis. Java is not able to deserialize the messag= e. I recommend putting the hex string from python into redis. You have to b= e careful about feeding the correct bytes to arrow from redis so double=C2= =A0check the hex str -> bytes deserialization is ok too.

<= /div>
Best,
Ryan


On Wed, Jul 22, 2020 at 6:49 AM= Jesse Wang <hello.wjx@gmail.com<= /a>> wrote:
<= div dir=3D"ltr">
=C2=A0Hi=C2=A0Ryan,
Thanks for your reply.

Hey Jiaxing,

=
You want to use the IPC mechanism to pass arrow buffers between = languages[1]

First get a buffer:
```
= import pyarrow as pa

data =3D [
=C2=A0 =C2=A0 pa.array([1, 2, 3, = 4]),
=C2=A0 =C2=A0 pa.array(['foo', 'bar', 'baz'= , None]),
=C2=A0 =C2=A0 pa.array([True, None, False, True])
]
batch =3D pa.record_batch(data, names=3D['f0', 'f1',= 'f2'])
sink =3D pa.BufferOutputStream()
writer = =3D pa.ipc.new_stream(sink, batch.schema)
writer.write_batch(= batch)
writer.close()
buf =3D sink.getvalue()
```

The buffer could be written to Redis, to a f= ile etc. For redis I think `r.set("key", buf.hex())` is easiest, = you don't have to worry about encoding.

On the= java side something like:
```
=C2=A0 =C2=A0 Jedis jedi= s =3D new Jedis();
=C2=A0 =C2=A0 String buf =3D jedis.get("key"= ;);
=C2=A0 =C2=A0 RootAllocator rootAllocator =3D new RootAllocator(Long= .MAX_VALUE);
=C2=A0 =C2=A0 ByteArrayInputStream in =3D new ByteArrayInpu= tStream(hexStringToByteArray(buf));
=C2=A0 =C2=A0 ArrowStreamReader stre= am =3D new ArrowStreamReader(in, rootAllocator);
=C2=A0 =C2=A0 VectorSch= emaRoot vsr =3D stream.getVectorSchemaRoot();
=C2=A0 =C2=A0 stream.loadN= extBatch()
```
And the VectorSchemaRoot holds the corre= ct Arrow Buffer.

I tested=C2=A0this and get the following exception thrown:
```
Exception in thread =E2=80=9Cmain=E2=80=9D java.lang.Illeg= alArgumentException
=C2=A0 at java.nio.ByteBuffer.allocate(ByteBuffer.ja= va:334)
=C2=A0 at org.apache.arrow.vector.ipc.message.MessageSerializer.= readMessage(MessageSerializer.java:547)
=C2=A0 at org.apache.arrow.vecto= r.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)=C2=A0 at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowS= treamReader.java:132)
=C2=A0 at org.apache.arrow.vector.ipc.ArrowReader.= initialize(ArrowReader.java:178)
=C2=A0 at org.apache.arrow.vector.ipc.A= rrowReader.ensureInitialized(ArrowReader.java:169)
=C2=A0 at org.apache.= arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
= =C2=A0 at Main.main(Main.java:38)
```
=C2=A0
While Redis will= work for this you might find a file or socket a bit more ergonomic in Arro= w. The Plasma object store is also an option[2] which you can think of as a= primitive Redis specifically for Arrow Buffers. Finally, if you are using = Redis as a message bus you might find the Arrow RPC mechanism Arrow Flight = is a good choice[3].
As for = the Plasma, It seems it currently limited to a single host... (From its sou= rce code arrow/cpp/src/plasma/io.cc, it used AF_UNIX socket only)
=C2=A0

On Tue, Jul 21, 2020 at 10:57 AM Jesse Wa= ng <hello.wjx@g= mail.com> wrote:
Hi,
I want to have a Java process read t= he content of DataFrames produced by a Python process. The Java and Python = processes run=C2=A0on different hosts.

The solut= ion I can think of is to have the Python process serialize the DataFrame an= d save it to redis, and have the Java process parse the data.
The solution I find serializes the DataFrame to 'pybytes= 9;:
```
=C2=A0 =C2= =A0import=C2=A0pandas=C2=A0as=C2=A0pd
=
import pyarrow =
as pa
impo=
rt r=
edis

df=
=3Dp=
d.DataFrame({'A':[1,2,3]})
r =
=3D =
redis.Redis(host=3D'localhost', port=3D6379, db=3D0)

context =3D pa.d=
efault_serialization_context()
r.set("key&=
quot;, c=
ontext.s=
erialize(df)=
.to_=
buffer().to_pybytes())
context.=
deserialize(r.ge=
t("=
key"))
   A
0  1<=
span style=3D"margin:0px;padding:0px;border:0px;font-style:inherit;font-var=
iant:inherit;font-weight:inherit;font-stretch:inherit;line-height:inherit;f=
ont-family:inherit;vertical-align:baseline;box-sizing:inherit">
1  2<=
span style=3D"margin:0px;padding:0px;border:0px;font-style:inherit;font-var=
iant:inherit;font-weight:inherit;font-stretch:inherit;line-height:inherit;f=
ont-family:inherit;vertical-align:baseline;box-sizing:inherit">
2  3<=
/code>
```

I wonder if this serial= ized 'pybytes' can be parsed at the Java end? If not, how can I ach= ieve this properly?

Thanks!

--

Best Regards,
Jiaxing Wang
=C2=A0


--

Best Regards,
Jiaxing Wang
=C2=A0
<= /div>
--000000000000673ebb05ab047540--